You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

124 lines
3.2 KiB

3 years ago
3 years ago
3 years ago
3 years ago
  1. package nova.monadic_sfx.util.reactive.store
  2. import monix.bio.Task
  3. import monix.eval.Coeval
  4. import monix.reactive.Observer
  5. import monix.reactive.OverflowStrategy
  6. import monix.reactive.subjects.ConcurrentSubject
  7. import cats.effect.Resource
  8. import monix.{eval => me}
  9. import monix.catnap.ConcurrentQueue
  10. import nova.monadic_sfx.util.IOUtils
  11. import monix.reactive.Observable
  12. import io.odin.Logger
  13. import nova.monadic_sfx.implicits._
  14. object Store {
  15. def createL[A, M](
  16. initialAction: A,
  17. initialState: M,
  18. reducer: Reducer[A, M],
  19. middlewares: Seq[Middleware[A, M]] = Seq.empty,
  20. overflowStrategy: OverflowStrategy.Synchronous[A] =
  21. OverflowStrategy.DropOld(50)
  22. ): Task[Store[A, M]] =
  23. Task.deferAction { implicit s =>
  24. Task {
  25. val subject = ConcurrentSubject.publish[A](overflowStrategy)
  26. val fold: ((A, M), A) => Coeval[(A, M)] = {
  27. case ((_, state), action) =>
  28. Coeval {
  29. val (newState, effects) = reducer(state, action)
  30. effects.subscribe(subject.onNext _)
  31. action -> newState
  32. }
  33. }
  34. val obs = subject
  35. .scanEval0F[Coeval, (A, M)](
  36. Coeval.pure(initialAction -> initialState)
  37. )(fold)
  38. val res = middlewares
  39. .foldLeft(obs) {
  40. case (obs, middleware) => middleware(obs)
  41. }
  42. .doOnNextF(i => Coeval(println(s"Emitted item 1: $i")))
  43. .behavior(initialAction -> initialState)
  44. .refCount
  45. res.subscribe(Observer.empty)
  46. // .doOnNextF(i => Coeval(println(s"Emitted item 2: $i")))
  47. MonixProSubject.from(
  48. subject,
  49. res
  50. )
  51. }
  52. }
  53. // : Resource[Task, Store[A, M]]
  54. def backpressured[A, M](
  55. initialAction: A,
  56. initialState: M,
  57. reducer: Reducer[A, M],
  58. logger: Logger[Task],
  59. middlewares: Seq[Middleware[A, M]] = Seq.empty
  60. ) = {
  61. for {
  62. queue <- ConcurrentQueue[Task].bounded[A](10)
  63. source <- Task.deferAction(implicit s =>
  64. Task {
  65. val fold: ((A, M), A) => Task[(A, M)] = {
  66. case ((_, state), action) =>
  67. for {
  68. _ <- Task.unit
  69. (newState, effects) = reducer(state, action)
  70. _ <-
  71. effects
  72. .doOnNextF(queue.offer)
  73. .completedL
  74. .toIO
  75. // .start
  76. } yield action -> newState
  77. }
  78. val obs = Observable
  79. .repeatEvalF(queue.poll)
  80. .scanEval0F[Task, (A, M)](
  81. Task.pure(initialAction -> initialState)
  82. )(fold)
  83. val res =
  84. // middlewares
  85. // .foldLeft(obs) {
  86. // case (obs, middleware) => middleware(obs)
  87. // }
  88. obs
  89. .doOnNextF(i => logger.debug(s"Emitted item 1: $i"))
  90. .behavior(initialAction -> initialState)
  91. .refCount
  92. // res.subscribe(Observer.empty)
  93. // .doOnNextF(i => Coeval(println(s"Emitted item 2: $i")))
  94. res
  95. }
  96. )
  97. } yield new MyStore(Sink2.concurrentQueue(queue), source)
  98. }
  99. }
  100. final class MyStore[A, M](
  101. val sink: Sink2[A],
  102. val source: Observable[(A, M)]
  103. )