Many changes

Updated store code - wrapped fold function in Task
Added JSON logging functionality to store middleware
Initial attempt at creating filter combinator for fx observables
Made ListStore code use Effects
Made a router using the store pattern
Misc updates to fx monix implicits
This commit is contained in:
Rohan Sircar 2020-12-19 19:14:22 +05:30
parent 857fd03bf1
commit 935ca358e6
15 changed files with 519 additions and 116 deletions

View File

@ -23,16 +23,16 @@ libraryDependencies ++= Seq(
"com.softwaremill.sttp.client" %% "core" % "2.2.9",
"com.softwaremill.sttp.client" %% "monix" % "2.2.9",
"com.softwaremill.sttp.client" %% "circe" % "2.2.9",
"com.softwaremill.sttp.client" %% "async-http-client-backend-monix" % "2.2.9",
"com.softwaremill.sttp.client" %% "httpclient-backend-monix" % "2.2.9",
"com.softwaremill.quicklens" %% "quicklens" % "1.6.1",
"com.github.valskalla" %% "odin-monix" % "0.8.1",
"com.typesafe.akka" %% "akka-actor-typed" % "2.6.8",
"com.softwaremill.macwire" %% "util" % "2.3.7",
"com.softwaremill.macwire" %% "macros" % "2.3.6" % "provided",
"com.softwaremill.macwire" %% "macrosakka" % "2.3.6" % "provided",
"com.github.valskalla" %% "odin-slf4j" % "0.8.1",
"com.github.valskalla" %% "odin-monix" % "0.9.1",
"com.github.valskalla" %% "odin-slf4j" % "0.9.1",
"com.github.valskalla" %% "odin-json" % "0.9.1",
"com.github.valskalla" %% "odin-extras" % "0.9.1",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.2",
"com.jfoenix" % "jfoenix" % "9.0.10",
"org.kordamp.ikonli" % "ikonli-core" % "12.0.0",

View File

@ -1,16 +1,12 @@
package nova.monadic_sfx
import scala.concurrent.duration._
import _root_.monix.bio.BIOApp
import _root_.monix.bio.Task
import _root_.monix.bio.UIO
import cats.effect.ExitCode
import cats.effect.Resource
import cats.implicits._
import com.softwaremill.macwire._
import io.odin._
import io.odin.syntax._
import nova.monadic_sfx.executors._
// import nova.monadic_sfx.util.IOUtils._
// import sttp.client.httpclient.monix.HttpClientMonixBackend
@ -18,10 +14,7 @@ object Main extends MainModule with BIOApp {
def appResource(startTime: Long) =
for {
implicit0(logger: Logger[Task]) <-
consoleLogger().withAsync(timeWindow = 1.millis) |+| fileLogger(
"application.log"
).withAsync()
implicit0(logger: Logger[Task]) <- makeLogger
schedulers = new Schedulers()
// backend <- Resource.make(
// toIO(HttpClientMonixBackend()(schedulers.async))

View File

@ -7,6 +7,8 @@ import nova.monadic_sfx.executors.Schedulers
import nova.monadic_sfx.implicits.JFXButton
import nova.monadic_sfx.implicits.JavaFXMonixObservables._
import nova.monadic_sfx.ui.MyFxApp
import nova.monadic_sfx.ui.components.router.BrainNotWorking
import nova.monadic_sfx.ui.components.router.FXRouter
import nova.monadic_sfx.ui.components.todo.TodoListStore
import nova.monadic_sfx.ui.components.todo.TodoListView
import org.gerweck.scalafx.util._
@ -33,7 +35,7 @@ class MainApp(
text = "Add"
}
lazy val addTodoObs = addTodoButton.observableAction()
lazy val addTodoObs = addTodoButton.observableAction
// lazy val todoListView = TodoListView.defaultListView
@ -46,14 +48,14 @@ class MainApp(
fill = Color.DeepSkyBlue
}
children ++= Seq(
new JFXButton {
text = "DummyButton"
},
new JFXButton {
text = "DummyButton2"
},
addTodoButton,
Test.ttv
// new JFXButton {
// text = "DummyButton"
// },
// new JFXButton {
// text = "DummyButton2"
// },
// addTodoButton,
// Test.ttv
// todoListView
)
}
@ -62,7 +64,7 @@ class MainApp(
private lazy val stage = new PrimaryStage {
title = "Simple ScalaFX App"
scene = _scene
width = 800
width = 1000
height = 400
}
@ -85,6 +87,30 @@ class MainApp(
// .startAndForget
// )
_ <- createTodoComponent.executeOn(schedulers.fx)
router <- Task(BrainNotWorking.router)
routerStore <- router.store(BrainNotWorking.Page.Home, logger)
routerNode <- for {
node <-
Task
.deferAction(implicit s =>
Task(new HBox {
children <-- router
.render(BrainNotWorking.resolver)(routerStore)
.map(_.delegate)
})
)
.executeOn(schedulers.fx)
_ <- Task.deferFuture(
routerStore.onNext(FXRouter.Replace(BrainNotWorking.Page.UserHome(1)))
)
} yield node
// _ <-
// BrainNotWorking
// .routerTask(logger)
// .flatMap(node => Task(_scene.getChildren += node))
// .executeOn(schedulers.fx)
_ <- Task(_scene.getChildren += routerNode).executeOn(schedulers.fx)
_ <- logger.info(
s"Application started in ${(System.currentTimeMillis() - startTime) / 1000f} seconds"
)

View File

@ -1,7 +1,41 @@
package nova.monadic_sfx
import scala.concurrent.duration._
import _root_.monix.bio.Task
import cats.implicits._
import io.odin._
import io.odin.config._
import io.odin.syntax._
import nova.monadic_sfx.actors.ActorModule
import nova.monadic_sfx.http.HttpModule
import nova.monadic_sfx.ui.UiModule
import nova.monadic_sfx.util.reactive.Middlewares
trait MainModule extends ActorModule with UiModule with HttpModule
trait MainModule extends ActorModule with UiModule with HttpModule {
def routerLogger(defaultLogger: Logger[Task], storeLogger: Logger[Task]) =
enclosureRouting[Task](
"nova.monadic_sfx.util.reactive.Middlewares" -> storeLogger,
"nova.monadic_sfx.util.reactive.Store" -> storeLogger
)
.withFallback(defaultLogger)
.withAsync()
def makeLogger =
for {
defaultLogger <- consoleLogger[Task]()
.withAsync(timeWindow = 1.millis) |+| fileLogger[Task](
"application.log"
).withAsync()
middlewareLogger <-
consoleLogger[
Task
](formatter = Middlewares.format)
.withMinimalLevel(Level.Trace)
.withAsync() |+| fileLogger[Task](
"stores.log",
formatter = Middlewares.format
).withAsync()
routerLogger <- routerLogger(defaultLogger, middlewareLogger)
} yield (routerLogger)
}

View File

@ -5,7 +5,7 @@ import java.nio.ByteBuffer
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.SttpBackend
import sttp.client.asynchttpclient.WebSocketHandler
import sttp.client.httpclient.WebSocketHandler
trait AppTypes {
type HttpBackend =

View File

@ -7,9 +7,11 @@ import javafx.event.EventHandler
import javafx.scene.{input => jfxsi}
import javafx.{event => jfxe}
import monix.bio.Task
import monix.eval.Coeval
import monix.execution.Ack
import monix.execution.Cancelable
import monix.execution.Scheduler
import monix.execution.cancelables.SingleAssignCancelable
import monix.reactive.Observable
import monix.reactive.Observer
import monix.reactive.OverflowStrategy
@ -19,8 +21,8 @@ import org.gerweck.scalafx.util._
import scalafx.Includes._
import scalafx.beans.property.Property
import scalafx.beans.property.ReadOnlyProperty
import scalafx.beans.value.ObservableValue
import scalafx.collections.ObservableBuffer
import scalafx.event.subscriptions.Subscription
import scalafx.scene.Scene
import scalafx.scene.control.ButtonBase
import scalafx.scene.control.MenuItem
@ -84,17 +86,20 @@ object JavaFXMonixObservables {
}
def <--(obs: Observable[T])(implicit s: Scheduler) = {
obs.doOnNext(v => me.Task(prop.value = v)).subscribe()
obs.doOnNextF(v => Coeval(prop.value = v)).subscribe()
}
def asOption = prop.map(Option(_))
def observableChange[J1 >: J](): Observable[J1] = {
def observableChange[J1 >: J]: Observable[J1] = {
import monix.execution.cancelables.SingleAssignCancelable
Observable.create(OverflowStrategy.Unbounded) { sub =>
val c = SingleAssignCancelable()
val canc = prop.onChange((a, b, c) => sub.onNext(c))
val canc =
prop.onChange((a, b, c1) =>
if (c1 != null && sub.onNext(c1) == Ack.Stop) c.cancel()
)
c := Cancelable(() => canc.cancel())
c
@ -113,16 +118,17 @@ object JavaFXMonixObservables {
}
def <--(obs: Observable[A])(implicit s: Scheduler) = {
obs.doOnNext(v => me.Task(prop() = v)).subscribe()
obs.doOnNextF(v => Coeval(prop() = v)).subscribe()
}
def observableChange[J1 >: A]()
: Observable[(ObservableValue[A, A], J1, J1)] = {
def observableChange[J1 >: A]: Observable[J1] = {
import monix.execution.cancelables.SingleAssignCancelable
Observable.create(OverflowStrategy.Unbounded) { sub =>
val c = SingleAssignCancelable()
val canc = prop.onChange((a, b, c) => sub.onNext((a, b, c)))
val canc = prop.onChange((_, _, c1) =>
if (c1 != null && sub.onNext(c1) == Ack.Stop) c.cancel()
)
c := Cancelable(() => canc.cancel())
c
@ -130,6 +136,59 @@ object JavaFXMonixObservables {
}
}
implicit final class ObservableListExt[A](
private val buffer: ObservableList[A]
) extends AnyVal {
// def -->(sub: Observer[A]) =
// buffer.onChange((a, b, c) => if (c != null) sub.onNext(c))
// def -->(op: Property[A, A]) = {
// buffer.onChange((a, b, c) => if (c != null) op() = c)
// }
def <--(obs: Observable[A])(implicit s: Scheduler) = {
obs
.doOnNextF(v =>
for {
_ <- Coeval(buffer.clear())
_ <- Coeval(buffer += v)
} yield ()
)
.subscribe()
}
def observableChange[J1 >: A]: Observable[J1] = {
import monix.execution.cancelables.SingleAssignCancelable
Observable.create(OverflowStrategy.Unbounded) { sub =>
val c = SingleAssignCancelable()
implicit val s = sub.scheduler
val canc =
buffer.onChange((buf, _) =>
loop(sub, buf.toIterable.iterator, c).runToFuture
)
c := Cancelable(() => canc.cancel())
c
}
}
private def loop(
sub: Observer[A],
it: Iterator[A],
c: Cancelable
): Task[Unit] =
if (it.hasNext) {
val next = it.next()
Task.deferFuture(sub.onNext(next)).flatMap {
case Ack.Continue => loop(sub, it, c)
case Ack.Stop => Task(c.cancel())
}
} else Task.unit
}
implicit final class BindObs3[T, J](private val prop: ReadOnlyProperty[T, J])
extends AnyVal {
def -->(op: Observer[T]) = {
@ -140,12 +199,14 @@ object JavaFXMonixObservables {
op <== prop
}
def observableChange[J1 >: J](): Observable[J1] = {
def observableChange[J1 >: J]: Observable[J1] = {
import monix.execution.cancelables.SingleAssignCancelable
Observable.create(OverflowStrategy.Unbounded) { sub =>
val c = SingleAssignCancelable()
val canc = prop.onChange((a, b, c) => sub.onNext(c))
val canc = prop.onChange((a, b, c1) =>
if (c1 != null && sub.onNext(c1) == Ack.Stop) c.cancel()
)
c := Cancelable(() => canc.cancel())
c
@ -160,15 +221,18 @@ object JavaFXMonixObservables {
obs.doOnNext(v => me.Task(prop() = ObservableBuffer.from(v))).subscribe()
}
def -->(sub: Observer[A])(implicit s: Scheduler) =
prop.onChange((a, b, c) =>
if (c != null)
def -->(sub: Observer[A])(implicit s: Scheduler) = {
val c = SingleAssignCancelable()
val subs: Subscription = prop.onChange((a, b, c1) =>
if (c1 != null)
Iterant[Task]
.fromIterable(c.toIterable)
.fromIterable(c1.toIterable)
.consume
.use(consume(sub, _))
.use(consume(sub, c, _))
.runToFuture
)
c := Cancelable(() => subs.cancel())
}
private def loop(sub: Observer[A], it: Iterator[A]): Task[Unit] =
if (it.hasNext) {
@ -181,14 +245,15 @@ object JavaFXMonixObservables {
private def consume(
sub: Observer[A],
c: Cancelable,
consumer: Iterant.Consumer[Task, A]
): Task[Unit] =
consumer.pull.flatMap {
case Left(value) => Task.unit
case Right(value) =>
Task.deferFuture(sub.onNext(value)).flatMap {
case Ack.Continue => consume(sub, consumer)
case Ack.Stop => Task.unit
case Ack.Continue => consume(sub, c, consumer)
case Ack.Stop => Task(c.cancel())
}
}
@ -212,7 +277,7 @@ object JavaFXMonixObservables {
private val button: ButtonBase
) extends AnyVal {
def observableAction(): Observable[jfxe.ActionEvent] = {
def observableAction: Observable[jfxe.ActionEvent] = {
import monix.execution.cancelables.SingleAssignCancelable
Observable.create(OverflowStrategy.Unbounded) { sub =>
val c = SingleAssignCancelable()
@ -238,7 +303,7 @@ object JavaFXMonixObservables {
private val item: MenuItem
) extends AnyVal {
def observableAction(): Observable[jfxe.ActionEvent] = {
def observableAction: Observable[jfxe.ActionEvent] = {
import monix.execution.cancelables.SingleAssignCancelable
Observable.create(OverflowStrategy.Unbounded) { sub =>
val c = SingleAssignCancelable()

View File

@ -5,6 +5,5 @@ import scalafx.scene.{control => sfxc}
import JavaFXMonixObservables._
class MenuItem extends sfxc.MenuItem {
def obsAction =
new ActionObservableBuilder(this.observableAction())
def obsAction = new ActionObservableBuilder(this.observableAction)
}

View File

@ -21,14 +21,14 @@ class MyFxApp(val schedulers: Schedulers)(implicit logger: Logger[Task]) {
// def stage_=(stage: PrimaryStage) = Task(internal.stage = stage)
def useInternal[T](f: JFXApp => Task[T]): Task[T] =
for {
_ <- logger.debug("Request for using internal value")
res <- f(internal).executeOn(schedulers.fx)
_ <- logger.debug(s"Result was ${res.toString()}")
} yield (res)
// def useInternal[T](f: JFXApp => Task[T]): Task[T] =
// for {
// _ <- logger.debug("Request for using internal value")
// res <- f(internal).executeOn(schedulers.fx)
// _ <- logger.debug(s"Result was ${res.toString()}")
// } yield (res)
def init(stage: => PrimaryStage, delay: FiniteDuration = 2000.millis) =
def init(stage: => PrimaryStage, delay: FiniteDuration = 200.millis) =
for {
_ <- logger.info("Starting FX App")
fib <- Task(internal.main(Array.empty)).start

View File

@ -0,0 +1,107 @@
package nova.monadic_sfx.ui.components.router
import enumeratum._
import io.circe.Encoder
import io.circe.generic.JsonCodec
import io.odin.Logger
import monix.bio.Task
import nova.monadic_sfx.util.IOUtils
import nova.monadic_sfx.util.reactive.Reducer
import nova.monadic_sfx.util.reactive.Store
import scalafx.scene.Parent
import scalafx.scene.control.Label
object FXRouter {
final case class State[P](page: P)
@JsonCodec
sealed abstract class Action[T]
// final case object Init extends Action
final case class Replace[T](p: T) extends Action[T]
type FXStore[P] = Store[Action[P], State[P]]
// def resolver2 = resolver.lift.andThen(_.getOrElse(notFound))
// def resolver: PartialFunction[P <: Enum[P]][P, Parent] = {
// case Home => new TextField
// }
}
class FXRouter[P <: EnumEntry](
)(implicit E: Encoder[P]) {
import FXRouter._
def store(initialPage: P, logger: Logger[Task]) =
Task.deferAction(implicit s =>
Store.createL[Action[P], State[P]](
Replace(initialPage),
State(initialPage),
Reducer.withOptionalEffects[Task, Action[P], State[P]](reducer _)
// Seq(actionLoggerMiddleware(logger, "RouterStore"))
)
)
def reducer(
state: State[P],
action: Action[P]
): (State[P], Option[Task[Action[P]]]) =
action match {
// case Init => (state, None)
case Replace(p) =>
(state.copy(page = p), None)
}
def render(
resolver: P => Task[Parent]
)(implicit store: FXStore[P]) =
store.mapEval { case (_, FXRouter.State(p)) => IOUtils.toTask(resolver(p)) }
def link(
page: P,
store: FXStore[P]
) = {
store.onNext(Replace(page))
}
}
object BrainNotWorking {
@JsonCodec
sealed trait Page extends EnumEntry
object Page extends Enum[Page] {
val values = findValues
final case object Home extends Page
final case class UserHome(id: Int) extends Page
}
def resolver: PartialFunction[Page, Task[Parent]] = {
case Page.Home =>
Task(new Label {
text = "HomePage"
})
case Page.UserHome(id0) =>
Task(new Label {
text = s"User Home, Id = $id0"
})
}
val router = new FXRouter[Page]
}
// case class State()
// object RouterStore {
// sealed trait Action
// case object Init extends Action
// def reducer(state: State, action: Action) =
// action match {
// case Init => state
// }
// def apply() =
// Store.createL[Action, State](Init, State(), Reducer(reducer _), Seq.empty)
// }

View File

@ -13,40 +13,77 @@ case class Todo(id: Int, content: String)
object TodoListStore {
@JsonCodec
sealed trait Command
case object Init extends Command
case class Add(content: String) extends Command
case class Edit(id: Int, content: String) extends Command
case class Delete(id: Int) extends Command
sealed trait Action
case object Init extends Action
case class Add(content: String) extends Action
case class Edit(id: Int, content: String) extends Action
case class Delete(id: Int) extends Action
private case class InternalAdd(content: String) extends Action
private case object End extends Action
case class State(todos: Vector[Todo], counter: Int)
def reducer(
def reducer(logger: Logger[Task])(
state: State,
action: Command
) =
action: Action
): (State, Option[Task[Action]]) =
action match {
case Init => state
case Init => (state, None)
case Add(content) =>
state.copy(
val nextAction = Some(for {
// _ <- logger.debug(s"Received $content")
res <- Task.pure(InternalAdd(content))
} yield res)
(state, nextAction)
case Edit(_id, content) =>
val condition: Todo => Boolean = _.id == _id
val nextState = state
.modify(_.todos.eachWhere(condition))
.using(_.copy(content = content))
(nextState, None)
case Delete(id) =>
(state.copy(state.todos.filterNot(_.id == id)), None)
case InternalAdd(content) =>
val nextState = state.copy(
todos = state.todos :+ Todo(state.counter, content),
counter = state.counter + 1
)
case Edit(_id, content) =>
val condition: Todo => Boolean = _.id == _id
state
.modify(_.todos.eachWhere(condition))
.using(_.copy(content = content))
case Delete(id) =>
state.copy(state.todos.filterNot(_.id == id))
(nextState, Some(logger.debug(s"Received $content") >> Task.pure(End)))
case End => (state, None)
}
def apply(logger: Logger[Task]) =
Task.deferAction(implicit s =>
for {
logMware <- actionLoggerMiddleware[Action, State](logger, "TodoStore")
store <-
Store
.createL[Command, State](
.createL[Action, State](
Init,
State(Vector.empty[Todo], 0),
Reducer(reducer _),
Seq(actionLoggerMiddleware(logger))
Reducer.withOptionalEffects(reducer(logger) _),
Seq(
// actionLoggerMiddleware(logger, "TodoStore2")
logMware
)
)
} yield (store)
)
}
// Task.deferAction(implicit s =>
// Store
// .createJsonL[Action, State](
// Init,
// State(Vector.empty[Todo], 0),
// Reducer.withOptionalEffects(reducer(logger) _),
// "TodoStore",
// logger
// // Seq(
// // actionLoggerMiddleware(logger, "TodoStore")
// // // actionLoggerMiddleware(logger, "TodoStore2")
// // )
// )
// )

View File

@ -2,6 +2,7 @@ package nova.monadic_sfx.ui.components.todo
import monix.bio.Task
import monix.execution.cancelables.CompositeCancelable
import monix.{eval => me}
import nova.monadic_sfx.implicits.FontIcon
import nova.monadic_sfx.implicits.IconLiteral
import nova.monadic_sfx.implicits.JFXButton
@ -21,13 +22,12 @@ import scalafx.scene.control.ListCell
import scalafx.scene.control.SelectionMode
import scalafx.scene.layout.HBox
import scalafx.scene.text.Text
import monix.{eval => me}
object TodoListView {
def apply(
store: MonixProSubject[
TodoListStore.Command,
(TodoListStore.Command, TodoListStore.State)
TodoListStore.Action,
(TodoListStore.Action, TodoListStore.State)
]
): Task[Node] =
Task.deferAction(implicit s =>
@ -48,6 +48,9 @@ object TodoListView {
new JFXListView[Todo] {
def selectedItems = selectionModel().selectedItems.view
selectionModel().selectionMode = SelectionMode.Multiple
selectionModel().selectedItems.observableSeqValue ==> _selectedItems
cc += items <-- todos
val emptyCell = ObjectProperty(new HBox)
@ -78,9 +81,6 @@ object TodoListView {
}
selectionModel().selectionMode = SelectionMode.Multiple
selectionModel().selectedItems.observableSeqValue ==> _selectedItems
contextMenu = new ContextMenu {
items ++= Seq(
new MenuItem {
@ -94,6 +94,12 @@ object TodoListView {
.map(todo => TodoListStore.Delete(todo.id))
.toList
) --> store
// obsAction.split(
// _.useLazyEval(me.Task(TodoListStore.Delete(0))) --> store,
// _.useLazyEval(me.Task(TodoListStore.Delete(0))) --> store,
// _.useLazyEval(me.Task(TodoListStore.Delete(0))) --> store
// )
},
new MenuItem {
text = "Edit"
@ -103,7 +109,7 @@ object TodoListView {
},
new JFXButton {
text = "Add"
disable <== _selectedItems.map(_.length > 0)
// disable <== _selectedItems.map(_.length > 0)
obsAction
.useLazyEval(me.Task(TodoListStore.Add(_content()))) --> store
},

View File

@ -0,0 +1,63 @@
package nova.monadic_sfx.util
import scalafx.beans.property.ObjectProperty
import scalafx.beans.property.ReadOnlyObjectProperty
import scalafx.beans.value.ObservableValue
object Misc {
implicit final class MyRichObservable[A, C](val self: ObservableValue[A, C])
extends AnyVal {
def filter(f: A => Boolean): ReadOnlyObjectProperty[A] =
Method.filter(self)(f)
def filterNull: ReadOnlyObjectProperty[A] = Method.filterNull(self)
}
}
object Method {
type Observable[A] = ObservableValue[A, _]
def filter[B](
a: Observable[B]
)(f: B => Boolean): ReadOnlyObjectProperty[B] = {
val prop = ObjectProperty[B](a.value)
def changeHandler() =
prop.synchronized {
if (f(a.value)) {
prop.value = a.value
}
}
a onChange changeHandler()
prop
}
/**
* Simply creates a new observable that mirrors the source observable but
* doesn't emit null values. JavaFX likes to work with null values in scene
* nodes/properties (shrugs) and observables by default emit null values
* that can cause crashes. ScalaFX does not offer any *fixes* for this
*
* @param a
* @return
*/
def filterNull[B](
a: Observable[B]
): ReadOnlyObjectProperty[B] = {
val prop = ObjectProperty[B](a.value)
def changeHandler() =
prop.synchronized {
if (a.value != null) {
prop.value = a.value
}
}
a onChange changeHandler()
prop
}
}

View File

@ -1,27 +1,75 @@
package nova.monadic_sfx.util.reactive
import java.time.LocalDateTime
import io.circe.Encoder
import io.circe.Printer
import io.circe.generic.JsonCodec
import io.circe.syntax._
import io.odin.Logger
import io.odin.LoggerMessage
import io.odin.formatter.options.PositionFormat
import io.odin.formatter.options.ThrowableFormat
import io.odin.meta.Render
import monix.bio.Task
import monix.reactive.Observable
import nova.monadic_sfx.util.IOUtils._
// object Middleware {
// def apply[A,M,T](ob: Observable[(A,M)], cb: (A,M) => T): Observable[(A,M)] = ob
// }
object Middlewares {
def actionStateLoggerMiddleware[A, M](
logger: Logger[Task]
): Middleware[A, M] =
(obs: Observable[(A, M)]) =>
obs.doOnNext {
case (a, m) => toTask(logger.debug(s"Received action $a with state $m"))
@JsonCodec
final case class StoreInfo[A](
name: String,
action: A,
time: LocalDateTime = LocalDateTime.now()
)
object StoreInfo {
val printer = Printer.noSpaces
implicit def render[T: Encoder]: Render[StoreInfo[T]] =
new Render[StoreInfo[T]] {
override def render(m: StoreInfo[T]): String = printer.print(m.asJson)
}
}
def actionLoggerMiddleware[A, M](
object Middlewares {
// val encoder: Encoder[LoggerMessage] =
// Encoder.forProduct1("message")(m => m.message.value)
val format = create(ThrowableFormat.Default, PositionFormat.Full)
def create(
throwableFormat: ThrowableFormat,
positionFormat: PositionFormat
): io.odin.formatter.Formatter = {
// val encoder: Encoder[LoggerMessage] =
// Encoder.forProduct1("message")(m => m.message.value)
(msg: LoggerMessage) => msg.message.value
}
def actionStateLoggerMiddleware[A, M](
logger: Logger[Task]
): Middleware[A, M] =
(obs: Observable[(A, M)]) =>
obs.doOnNext {
case (a, _) => toTask(logger.debug(s"Received action $a "))
): Task[Middleware[A, M]] =
Task.deferAction(implicit s =>
Task((obs: Observable[(A, M)]) =>
obs.doOnNextF {
case (a, m) =>
logger.debug(s"Received action $a with state $m")
}
)
)
def actionLoggerMiddleware[A: Encoder, M](
logger: Logger[Task],
name: String
): Task[Middleware[A, M]] =
Task.deferAction(implicit s =>
Task((obs: Observable[(A, M)]) =>
obs.doOnNextF {
case (a, _) =>
logger.debug(StoreInfo(name, a))
}
)
)
}

View File

@ -1,9 +1,8 @@
package nova.monadic_sfx.util.reactive
import cats.effect.Sync
import io.circe.Encoder
import io.odin.Logger
import monix.bio.Task
import monix.execution.Scheduler
import monix.reactive.Observable
import monix.reactive.OverflowStrategy
import monix.reactive.subjects.ConcurrentSubject
@ -15,7 +14,7 @@ object Store {
middlewares: Seq[Middleware[A, M]] = Seq.empty,
overflowStrategy: OverflowStrategy.Synchronous[A] =
OverflowStrategy.DropOld(50)
) =
): Task[Store[A, M]] =
Task.deferAction { implicit s =>
Task {
val subject = ConcurrentSubject.publish[A](overflowStrategy)
@ -35,7 +34,7 @@ object Store {
.behavior(initialAction -> initialState)
.refCount
val res = middlewares.view.reverse.foldLeft(obs) {
val res = middlewares.foldLeft(obs) {
case (obs, middleware) => middleware(obs)
}
@ -46,16 +45,23 @@ object Store {
}
}
def create[F[_], A, M](
def createJsonL[A: Encoder, M](
initialAction: A,
initialState: M,
reducer: Reducer[A, M]
)(implicit s: Scheduler, F: Sync[F]): F[Observable[(A, M)]] =
F.delay {
val subject = ConcurrentSubject.publish[A]
reducer: Reducer[A, M],
storeName: String,
logger: Logger[Task],
middlewares: Seq[Middleware[A, M]] = Seq.empty,
overflowStrategy: OverflowStrategy.Synchronous[A] =
OverflowStrategy.DropOld(50)
): Task[Store[A, M]] =
Task.deferAction { implicit s =>
Task {
val subject = ConcurrentSubject.publish[A](overflowStrategy)
val fold: ((A, M), A) => (A, M) = {
case ((_, state), action) => {
val fold: ((A, M), A) => Task[(A, M)] = {
case ((_, state), action) =>
Task {
val (newState, effects) = reducer(state, action)
effects.subscribe(subject.onNext _)
@ -64,10 +70,28 @@ object Store {
}
}
subject
.scan[(A, M)](initialAction -> initialState)(fold)
val obs = subject
.doOnNextF(action =>
logger.debug(
StoreInfo(storeName, action)
) // .executeOn(Scheduler.global)
)
// .doOnNextF(action => Coeval(println(action)))
.scanEvalF[Task, (A, M)](Task.pure(initialAction -> initialState))(
fold
)
.behavior(initialAction -> initialState)
.refCount
// val res = middlewares.foldLeft(obs) {
// case (obs, middleware) => middleware(obs)
// }
MonixProSubject.from(
subject,
obs
)
}
}
}

View File

@ -6,6 +6,7 @@ import monix.reactive.Observer
package object reactive {
type MonixProSubject[-I, +O] = Observable[O] with Observer[I]
type Middleware[A, M] = Observable[(A, M)] => Observable[(A, M)]
type Store[A, M] = MonixProSubject[A, (A, M)]
/**
* A Function that applies an Action onto the Stores current state.