You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
104 lines
3.0 KiB
104 lines
3.0 KiB
import org.scalatest.funsuite.AnyFunSuite
|
|
import monix.catnap.ConcurrentQueue
|
|
import monix.eval.Task
|
|
import monix.reactive.Observable
|
|
import scala.concurrent.duration._
|
|
|
|
class ObservableTest extends AnyFunSuite {
|
|
import monix.execution.Scheduler.Implicits.global
|
|
test("observable state machine") {
|
|
(for {
|
|
_ <- Task.unit
|
|
sm <- MonixStateMachine()
|
|
_ <-
|
|
Task
|
|
.parSequence(
|
|
List(
|
|
sm.source
|
|
.doOnNext(item => Task(println(s"Task 1: Got $item")))
|
|
.completedL,
|
|
sm.source
|
|
.doOnNext(item => Task(println(s"Task 2: Got $item")))
|
|
.completedL,
|
|
sm.tell(MonixStateMachine.Start) >>
|
|
Observable
|
|
// .interval(1.second)
|
|
.interval(500.millis)
|
|
.doOnNext(_ => sm.tell(MonixStateMachine.Incr))
|
|
.takeUntil(Observable.unit.delayExecution(5.seconds))
|
|
.completedL >>
|
|
sm.tell(MonixStateMachine.Stop) >>
|
|
sm.tell(MonixStateMachine.Incr)
|
|
)
|
|
)
|
|
.void
|
|
.start
|
|
.bracket(_ => Task.sleep(8.seconds))(_.cancel)
|
|
} yield ()).runSyncUnsafe(10.seconds)
|
|
}
|
|
}
|
|
|
|
class MonixStateMachine(
|
|
queue: ConcurrentQueue[Task, MonixStateMachine.Command],
|
|
val source: Observable[(MonixStateMachine.State, MonixStateMachine.Data)]
|
|
) {
|
|
import MonixStateMachine._
|
|
|
|
def tell(item: Command) = queue.offer(item)
|
|
|
|
}
|
|
object MonixStateMachine {
|
|
|
|
sealed trait State
|
|
case object Idle extends State
|
|
case object Active extends State
|
|
|
|
sealed trait Command
|
|
case object Incr extends Command
|
|
case object Start extends Command
|
|
case object Stop extends Command
|
|
|
|
case class Data(num: Int)
|
|
|
|
private def source(queue: ConcurrentQueue[Task, Command]) =
|
|
Task.deferAction(implicit s =>
|
|
Task(
|
|
Observable
|
|
.repeatEvalF(queue.poll)
|
|
.scan((Idle: State, Data(0))) {
|
|
case ((state, data), command) =>
|
|
state match {
|
|
case Idle =>
|
|
println("Entered idle")
|
|
command match {
|
|
case Incr =>
|
|
println("Not active ")
|
|
(Idle, data)
|
|
case Start => (Active, data)
|
|
case Stop =>
|
|
println("Already stopped")
|
|
(Idle, data)
|
|
|
|
}
|
|
case Active =>
|
|
println("Entered Active")
|
|
command match {
|
|
case Incr => (Active, data.copy(num = data.num + 1))
|
|
case Start =>
|
|
println("Already started")
|
|
(Active, data)
|
|
case Stop => (Idle, data)
|
|
}
|
|
}
|
|
}
|
|
.publish
|
|
.refCount
|
|
)
|
|
)
|
|
|
|
def apply() =
|
|
for {
|
|
queue <- ConcurrentQueue.bounded[Task, Command](10)
|
|
source <- source(queue)
|
|
} yield new MonixStateMachine(queue, source)
|
|
}
|