You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

98 lines
2.6 KiB

  1. package nova.monadic_sfx.implicits
  2. import monix.execution.Cancelable
  3. import monix.execution.Scheduler
  4. import monix.execution.cancelables.CompositeCancelable
  5. import monix.reactive.Observable
  6. import monix.reactive.Observer
  7. import monix.{eval => me}
  8. class ActionObservableExecutor[T](
  9. private val delegate: Observable[T]
  10. ) extends AnyVal {
  11. def -->(sub: Observer[T])(implicit s: Scheduler) =
  12. delegate
  13. .doOnNext(el => me.Task(sub.onNext(el)))
  14. .subscribe()
  15. }
  16. class ActionObservableBuilder[A](
  17. private val observableAction: Observable[A]
  18. ) extends AnyVal {
  19. def useLazyEval[T](v: => me.Task[T]) =
  20. new ActionObservableExecutor[T](observableAction.mapEval(_ => v))
  21. def useEval[T](cb: A => me.Task[T]) =
  22. new ActionObservableExecutor[T](
  23. observableAction.mapEval(cb)
  24. )
  25. def useIterableEval[T](cb: A => collection.immutable.Iterable[T]) =
  26. new ActionObservableExecutor[T](
  27. observableAction.flatMap(a =>
  28. Observable.suspend(Observable.fromIterable(cb(a)))
  29. )
  30. )
  31. def doOnNext(cb: A => me.Task[Unit]): ActionObservableBuilder[A] =
  32. new ActionObservableBuilder(observableAction.doOnNext(cb))
  33. def mapEval[B](cb: A => me.Task[B]) =
  34. new ActionObservableBuilder(observableAction.mapEval(cb))
  35. def underlying = observableAction
  36. // Caution: Experimental stuff below..
  37. def useEval2[B, C](f: A => me.Task[B], g: A => me.Task[C]) =
  38. new ActionObservableExecutor[(B, C)](
  39. observableAction.publishSelector(conn =>
  40. conn
  41. .mapEval(f)
  42. .switchMap(b =>
  43. conn.mapEval(a =>
  44. for {
  45. c <- g(a)
  46. } yield (b, c)
  47. )
  48. )
  49. )
  50. )
  51. def bifurcate[B, C](
  52. f: ActionObservableBuilder[A] => B,
  53. g: ActionObservableBuilder[A] => C
  54. )(implicit s: Scheduler) =
  55. observableAction
  56. .publishSelector(conn =>
  57. Observable(
  58. Observable.unit.doOnNext(_ =>
  59. me.Task(f(new ActionObservableBuilder[A](conn))) >> me.Task.unit
  60. ),
  61. Observable.unit.doOnNext(_ =>
  62. me.Task(g(new ActionObservableBuilder[A](conn))) >> me.Task.unit
  63. )
  64. ).merge
  65. )
  66. .subscribe()
  67. def split(
  68. lst: (ActionObservableBuilder[A] => Cancelable)*
  69. )(implicit s: Scheduler): Cancelable = {
  70. val comp = CompositeCancelable()
  71. comp += observableAction
  72. .publishSelector(conn =>
  73. Observable(
  74. lst.map(f =>
  75. Observable.unit.doOnNext(_ =>
  76. me.Task(
  77. comp += f(new ActionObservableBuilder[A](conn))
  78. ) >> me.Task.unit
  79. )
  80. ): _*
  81. ).merge
  82. )
  83. .subscribe()
  84. }
  85. }