You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

108 lines
2.8 KiB

  1. package nova.monadic_sfx.util.reactive.store
  2. import java.time.LocalDateTime
  3. import io.circe.Encoder
  4. import io.odin.Logger
  5. import monix.bio.Task
  6. import monix.eval.Coeval
  7. import monix.reactive.Observable
  8. import monix.reactive.OverflowStrategy
  9. import monix.reactive.subjects.ConcurrentSubject
  10. object Store {
  11. def createL[A, M](
  12. initialAction: A,
  13. initialState: M,
  14. reducer: Reducer[A, M],
  15. middlewares: Seq[Middleware[A, M]] = Seq.empty,
  16. overflowStrategy: OverflowStrategy.Synchronous[A] =
  17. OverflowStrategy.DropOld(50)
  18. ): Task[Store[A, M]] =
  19. Task.deferAction { implicit s =>
  20. Task {
  21. val subject = ConcurrentSubject.publish[A](overflowStrategy)
  22. val fold: ((A, M), A) => Coeval[(A, M)] = {
  23. case ((_, state), action) =>
  24. Coeval {
  25. val (newState, effects) = reducer(state, action)
  26. effects.subscribe(subject.onNext _)
  27. action -> newState
  28. }
  29. }
  30. val obs = Observable.suspend(
  31. subject
  32. .scanEval0F[Coeval, (A, M)](
  33. Coeval.pure(initialAction -> initialState)
  34. )(fold)
  35. .behavior(initialAction -> initialState)
  36. .refCount
  37. )
  38. val res = middlewares.foldLeft(obs) {
  39. case (obs, middleware) => middleware(obs)
  40. }
  41. MonixProSubject.from(
  42. subject,
  43. res
  44. )
  45. }
  46. }
  47. def createJsonL[A: Encoder, M](
  48. initialAction: A,
  49. initialState: M,
  50. reducer: Reducer[A, M],
  51. storeName: String,
  52. logger: Logger[Task],
  53. middlewares: Seq[Middleware[A, M]] = Seq.empty,
  54. overflowStrategy: OverflowStrategy.Synchronous[A] =
  55. OverflowStrategy.DropOld(50)
  56. ): Task[Store[A, M]] =
  57. Task.deferAction { implicit s =>
  58. Task {
  59. val subject = ConcurrentSubject.publish[A](overflowStrategy)
  60. val fold: ((A, M), A) => Task[(A, M)] = {
  61. case ((_, state), action) =>
  62. Task {
  63. val (newState, effects) = reducer(state, action)
  64. effects.subscribe(subject.onNext _)
  65. action -> newState
  66. }
  67. }
  68. val obs = subject
  69. .doOnNextF(action =>
  70. Task(LocalDateTime.now()).flatMap(curTime =>
  71. logger.debug(
  72. StoreInfo(storeName, action, curTime)
  73. )
  74. )
  75. )
  76. // .doOnNextF(action => Coeval(println(action)))
  77. .scanEvalF[Task, (A, M)](Task.pure(initialAction -> initialState))(
  78. fold
  79. )
  80. .behavior(initialAction -> initialState)
  81. .refCount
  82. // val res = middlewares.foldLeft(obs) {
  83. // case (obs, middleware) => middleware(obs)
  84. // }
  85. MonixProSubject.from(
  86. subject,
  87. obs
  88. )
  89. }
  90. }
  91. }