Updated reactiv store code to suspend side effects
This commit is contained in:
parent
338f7b16f9
commit
918b893fe8
@ -7,6 +7,8 @@ import io.odin.Logger
|
|||||||
import monix.bio.Task
|
import monix.bio.Task
|
||||||
import monix.reactive.OverflowStrategy
|
import monix.reactive.OverflowStrategy
|
||||||
import monix.reactive.subjects.ConcurrentSubject
|
import monix.reactive.subjects.ConcurrentSubject
|
||||||
|
import monix.eval.Coeval
|
||||||
|
import monix.reactive.Observable
|
||||||
|
|
||||||
object Store {
|
object Store {
|
||||||
def createL[A, M](
|
def createL[A, M](
|
||||||
@ -21,8 +23,9 @@ object Store {
|
|||||||
Task {
|
Task {
|
||||||
val subject = ConcurrentSubject.publish[A](overflowStrategy)
|
val subject = ConcurrentSubject.publish[A](overflowStrategy)
|
||||||
|
|
||||||
val fold: ((A, M), A) => (A, M) = {
|
val fold: ((A, M), A) => Coeval[(A, M)] = {
|
||||||
case ((_, state), action) => {
|
case ((_, state), action) =>
|
||||||
|
Coeval {
|
||||||
val (newState, effects) = reducer(state, action)
|
val (newState, effects) = reducer(state, action)
|
||||||
|
|
||||||
effects.subscribe(subject.onNext _)
|
effects.subscribe(subject.onNext _)
|
||||||
@ -31,10 +34,14 @@ object Store {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val obs = subject
|
val obs = Observable.suspend(
|
||||||
.scan[(A, M)](initialAction -> initialState)(fold)
|
subject
|
||||||
|
.scanEval0F[Coeval, (A, M)](
|
||||||
|
Coeval.pure(initialAction -> initialState)
|
||||||
|
)(fold)
|
||||||
.behavior(initialAction -> initialState)
|
.behavior(initialAction -> initialState)
|
||||||
.refCount
|
.refCount
|
||||||
|
)
|
||||||
|
|
||||||
val res = middlewares.foldLeft(obs) {
|
val res = middlewares.foldLeft(obs) {
|
||||||
case (obs, middleware) => middleware(obs)
|
case (obs, middleware) => middleware(obs)
|
||||||
|
Loading…
Reference in New Issue
Block a user