From 918b893fe8c42f6e32c8aeee7fc81f147d257551 Mon Sep 17 00:00:00 2001 From: Rohan Sircar Date: Mon, 21 Dec 2020 13:17:15 +0530 Subject: [PATCH] Updated reactiv store code to suspend side effects --- .../util/reactive/store/Store.scala | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/main/scala/nova/monadic_sfx/util/reactive/store/Store.scala b/src/main/scala/nova/monadic_sfx/util/reactive/store/Store.scala index 2beeed7..ba592ed 100644 --- a/src/main/scala/nova/monadic_sfx/util/reactive/store/Store.scala +++ b/src/main/scala/nova/monadic_sfx/util/reactive/store/Store.scala @@ -7,6 +7,8 @@ import io.odin.Logger import monix.bio.Task import monix.reactive.OverflowStrategy import monix.reactive.subjects.ConcurrentSubject +import monix.eval.Coeval +import monix.reactive.Observable object Store { def createL[A, M]( @@ -21,20 +23,25 @@ object Store { Task { val subject = ConcurrentSubject.publish[A](overflowStrategy) - val fold: ((A, M), A) => (A, M) = { - case ((_, state), action) => { - val (newState, effects) = reducer(state, action) + val fold: ((A, M), A) => Coeval[(A, M)] = { + case ((_, state), action) => + Coeval { + val (newState, effects) = reducer(state, action) - effects.subscribe(subject.onNext _) + effects.subscribe(subject.onNext _) - action -> newState - } + action -> newState + } } - val obs = subject - .scan[(A, M)](initialAction -> initialState)(fold) - .behavior(initialAction -> initialState) - .refCount + val obs = Observable.suspend( + subject + .scanEval0F[Coeval, (A, M)]( + Coeval.pure(initialAction -> initialState) + )(fold) + .behavior(initialAction -> initialState) + .refCount + ) val res = middlewares.foldLeft(obs) { case (obs, middleware) => middleware(obs)