You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

104 lines
3.0 KiB

import org.scalatest.funsuite.AnyFunSuite
import monix.catnap.ConcurrentQueue
import monix.eval.Task
import monix.reactive.Observable
import scala.concurrent.duration._
class ObservableTest extends AnyFunSuite {
import monix.execution.Scheduler.Implicits.global
test("observable state machine") {
(for {
_ <- Task.unit
sm <- MonixStateMachine()
_ <-
Task
.parSequence(
List(
sm.source
.doOnNext(item => Task(println(s"Task 1: Got $item")))
.completedL,
sm.source
.doOnNext(item => Task(println(s"Task 2: Got $item")))
.completedL,
sm.tell(MonixStateMachine.Start) >>
Observable
// .interval(1.second)
.interval(500.millis)
.doOnNext(_ => sm.tell(MonixStateMachine.Incr))
.takeUntil(Observable.unit.delayExecution(5.seconds))
.completedL >>
sm.tell(MonixStateMachine.Stop) >>
sm.tell(MonixStateMachine.Incr)
)
)
.void
.start
.bracket(_ => Task.sleep(8.seconds))(_.cancel)
} yield ()).runSyncUnsafe(10.seconds)
}
}
class MonixStateMachine(
queue: ConcurrentQueue[Task, MonixStateMachine.Command],
val source: Observable[(MonixStateMachine.State, MonixStateMachine.Data)]
) {
import MonixStateMachine._
def tell(item: Command) = queue.offer(item)
}
object MonixStateMachine {
sealed trait State
case object Idle extends State
case object Active extends State
sealed trait Command
case object Incr extends Command
case object Start extends Command
case object Stop extends Command
case class Data(num: Int)
private def source(queue: ConcurrentQueue[Task, Command]) =
Task.deferAction(implicit s =>
Task(
Observable
.repeatEvalF(queue.poll)
.scan((Idle: State, Data(0))) {
case ((state, data), command) =>
state match {
case Idle =>
println("Entered idle")
command match {
case Incr =>
println("Not active ")
(Idle, data)
case Start => (Active, data)
case Stop =>
println("Already stopped")
(Idle, data)
}
case Active =>
println("Entered Active")
command match {
case Incr => (Active, data.copy(num = data.num + 1))
case Start =>
println("Already started")
(Active, data)
case Stop => (Idle, data)
}
}
}
.publish
.refCount
)
)
def apply() =
for {
queue <- ConcurrentQueue.bounded[Task, Command](10)
source <- source(queue)
} yield new MonixStateMachine(queue, source)
}