You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

83 lines
2.7 KiB

3 years ago
  1. import org.scalatest.funsuite.AnyFunSuite
  2. import monix.eval.Task
  3. import monix.reactive.subjects.ConcurrentSubject
  4. import monix.eval.Coeval
  5. import com.typesafe.scalalogging.LazyLogging
  6. import monix.execution.Scheduler.global
  7. import nova.monadic_sfx.util.reactive.store.MonixProSubject
  8. import scala.concurrent.duration._
  9. import scala.concurrent.Await
  10. import monix.reactive.Observable
  11. class ProSubjectTest extends AnyFunSuite with LazyLogging {
  12. test("task1") {
  13. implicit val s = global
  14. val x = Await.result(task1.runToFuture, 10.seconds)
  15. assert(x == 1)
  16. }
  17. test("task2") {
  18. implicit val s = global
  19. val x = Await.result(task2.runToFuture, 10.seconds)
  20. }
  21. def task1 =
  22. Task
  23. .deferAction(implicit s =>
  24. Task {
  25. val sub = ConcurrentSubject.publish[Int]
  26. // val obs = sub.scan0(0)(_ + _).behavior(2).refCount
  27. val obs =
  28. sub
  29. .scanEval0F(Task.pure(0))((a, b) => Task(a + b))
  30. .behavior(2)
  31. .refCount
  32. .doOnNextF(i => Coeval(println(s"Emitted1: $i")))
  33. // type MonixProSubject[-I, +O] = Observable[O] with Observer[I]
  34. // def from[I, O](
  35. // observer: Observer[I],
  36. // observable: Observable[O]
  37. // ): MonixProSubject[I, O] =
  38. // new Observable[O] with Observer[I] {
  39. // override def onNext(elem: I): Future[Ack] = observer.onNext(elem)
  40. // override def onError(ex: Throwable): Unit = observer.onError(ex)
  41. // override def onComplete(): Unit = observer.onComplete()
  42. // override def unsafeSubscribeFn(
  43. // subscriber: Subscriber[O]
  44. // ): Cancelable =
  45. // observable.unsafeSubscribeFn(subscriber)
  46. // }
  47. MonixProSubject.from(sub, obs)
  48. }.flatMap {
  49. case ps =>
  50. Task {
  51. ps
  52. .doOnNextF(i => Coeval(println(s"Emitted item 1: $i")))
  53. .subscribe()
  54. } >> Task {
  55. ps
  56. .doOnNextF(i => Coeval(println(s"Emitted item 2: $i")))
  57. .subscribe()
  58. } >> Task {
  59. (0 to 5).foreach { i => ps.onNext(i) }
  60. } >> Task(1)
  61. }
  62. )
  63. val task2 = Task.deferAction(implicit s =>
  64. Task(
  65. Observable(1, 2, 3, 4, 5)
  66. .scan0(0)(_ + _)
  67. .behavior(2)
  68. .refCount
  69. // .doOnNextF(i => Coeval(println(s"Emitted2: $i")))
  70. .delayExecution(10.millis)
  71. )
  72. .flatMap(res =>
  73. Task(res.doOnNextF(i => Coeval(println(s"1: $i"))).subscribe()).start >>
  74. Task(res.doOnNextF(i => Coeval(println(s"2: $i"))).subscribe()).start
  75. )
  76. )
  77. }