You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

83 lines
2.4 KiB

  1. package nova.monadic_sfx.implicits
  2. import monix.execution.Scheduler
  3. import monix.reactive.Observable
  4. import monix.reactive.Observer
  5. import monix.{eval => me}
  6. class ActionObservableExecutor[T](
  7. private val delegate: Observable[T]
  8. ) extends AnyVal {
  9. def -->(sub: Observer[T])(implicit s: Scheduler) =
  10. delegate
  11. .doOnNext(el => me.Task(sub.onNext(el)))
  12. .subscribe()
  13. }
  14. class ActionObservableBuilder[A](
  15. private val observableAction: Observable[A]
  16. ) extends AnyVal {
  17. def useLazyEval[T](v: => me.Task[T]) =
  18. new ActionObservableExecutor[T](observableAction.mapEval(_ => v))
  19. def useEval[T](cb: A => me.Task[T]) =
  20. new ActionObservableExecutor[T](
  21. observableAction.mapEval(cb)
  22. )
  23. def useIterableEval[T](cb: A => collection.immutable.Iterable[T]) =
  24. new ActionObservableExecutor[T](
  25. observableAction.flatMap(a =>
  26. Observable.suspend(Observable.fromIterable(cb(a)))
  27. )
  28. )
  29. def doOnNext(cb: A => me.Task[Unit]): ActionObservableBuilder[A] =
  30. new ActionObservableBuilder(observableAction.doOnNext(cb))
  31. def mapEval[B](cb: A => me.Task[B]) =
  32. new ActionObservableBuilder(observableAction.mapEval(cb))
  33. def underlying = observableAction
  34. // def publish[B](f: Observable[A] => Observable[B]) =
  35. // new ActionObservableBuilder(observableAction.publishSelector(f))
  36. def useEval2[B, C](f: A => me.Task[B], g: A => me.Task[C]) =
  37. new ActionObservableExecutor[(B, C)](
  38. observableAction.publishSelector(conn =>
  39. conn
  40. .mapEval(f)
  41. .switchMap(b =>
  42. conn.mapEval(a =>
  43. for {
  44. c <- g(a)
  45. } yield (b, c)
  46. )
  47. )
  48. )
  49. )
  50. def bifurcate[B, C](
  51. f: ActionObservableBuilder[A] => B,
  52. g: ActionObservableBuilder[A] => C
  53. )(implicit s: Scheduler) =
  54. observableAction
  55. .publishSelector(conn =>
  56. Observable(
  57. Observable.unit.doOnNext(_ =>
  58. me.Task(f(new ActionObservableBuilder[A](conn))) >> me.Task.unit
  59. ),
  60. Observable.unit.doOnNext(_ =>
  61. me.Task(g(new ActionObservableBuilder[A](conn))) >> me.Task.unit
  62. )
  63. ).merge
  64. )
  65. .subscribe()
  66. // def useEval2[B,C](a1: ActionObservableBuilder[A] => B, a2: ActionObservableBuilder[A] => C) = observableAction.publishSelector(conn =>
  67. // new ActionObservableBuilder[]
  68. // )
  69. }
  70. // class MappedActionExecutor(actionMap: Map[])