From 5b0cfcae8c1f201ed8f4933959d0128eba1b2c89 Mon Sep 17 00:00:00 2001 From: Rohan Sircar Date: Tue, 7 Dec 2021 16:30:13 +0530 Subject: [PATCH] cleanup/refactorings --- build.sbt | 4 +- src/main/resources/application.conf | 2 +- src/main/scala/nova/monadic_sfx/App.scala | 350 +++++++++++++++++ .../{MainModule.scala => Loggers.scala} | 7 +- src/main/scala/nova/monadic_sfx/Main.scala | 61 +-- src/main/scala/nova/monadic_sfx/MainApp.scala | 361 ------------------ .../GUIExecutor.scala | 2 +- .../monadic_sfx/concurrent/Schedulers.scala | 45 +++ .../executors/ExecutorsModule.scala | 5 - .../monadic_sfx/executors/Schedulers.scala | 28 -- .../implicits/JavaFxMonixObservables.scala | 175 +++++---- .../nova/monadic_sfx/implicits/package.scala | 13 + .../{DefaultUI.scala => DefaultScene.scala} | 9 +- src/main/scala/nova/monadic_sfx/ui/FX.scala | 84 ++++ .../nova/monadic_sfx/ui/FXComponent.scala | 51 ++- .../scala/nova/monadic_sfx/ui/MyFxApp.scala | 60 --- .../nova/monadic_sfx/ui/MyFxAppOld.scala | 129 ------- .../scala/nova/monadic_sfx/ui/UiModule.scala | 37 -- .../ui/components/router/FXRouter.scala | 54 ++- .../ui/components/todo/TodoListStore.scala | 6 +- .../ui/components/todo/TodoListView.scala | 239 ++++++------ .../monadic_sfx/ui/screens/LoginScreen.scala | 2 +- .../scala/nova/monadic_sfx/util/History.scala | 2 +- .../nova/monadic_sfx/util/SynchedObject.scala | 44 --- .../util/controls/ActionObservable.scala | 173 +++++---- .../monadic_sfx/util/controls/JFXButton.scala | 2 +- .../monadic_sfx/util/controls/MenuItem.scala | 2 +- .../util/reactive/store/Sink.scala | 49 +++ .../util/reactive/store/Store.scala | 67 ++++ .../util/reactive/store/package.scala | 1 + src/test/scala/BackpressuredStoreTest.scala | 46 +++ src/test/scala/ObservableTest.scala | 104 +++++ src/test/scala/WebSocketTest.scala | 15 +- 33 files changed, 1215 insertions(+), 1014 deletions(-) create mode 100644 src/main/scala/nova/monadic_sfx/App.scala rename src/main/scala/nova/monadic_sfx/{MainModule.scala => Loggers.scala} (84%) delete mode 100644 src/main/scala/nova/monadic_sfx/MainApp.scala rename src/main/scala/nova/monadic_sfx/{executors => concurrent}/GUIExecutor.scala (98%) create mode 100644 src/main/scala/nova/monadic_sfx/concurrent/Schedulers.scala delete mode 100644 src/main/scala/nova/monadic_sfx/executors/ExecutorsModule.scala delete mode 100644 src/main/scala/nova/monadic_sfx/executors/Schedulers.scala rename src/main/scala/nova/monadic_sfx/ui/{DefaultUI.scala => DefaultScene.scala} (93%) create mode 100644 src/main/scala/nova/monadic_sfx/ui/FX.scala delete mode 100644 src/main/scala/nova/monadic_sfx/ui/MyFxApp.scala delete mode 100644 src/main/scala/nova/monadic_sfx/ui/MyFxAppOld.scala delete mode 100644 src/main/scala/nova/monadic_sfx/ui/UiModule.scala delete mode 100644 src/main/scala/nova/monadic_sfx/util/SynchedObject.scala create mode 100644 src/main/scala/nova/monadic_sfx/util/reactive/store/Sink.scala create mode 100755 src/test/scala/BackpressuredStoreTest.scala create mode 100755 src/test/scala/ObservableTest.scala diff --git a/build.sbt b/build.sbt index 5c5eeab..24e33c8 100644 --- a/build.sbt +++ b/build.sbt @@ -15,9 +15,7 @@ resolvers += "jitpack" at "https://jitpack.io" enablePlugins(JavaFxPlugin) libraryDependencies ++= Seq( - "org.typelevel" %% "cats-core" % "2.2.0", - "org.typelevel" %% "cats-effect" % "2.2.0", - "io.monix" %% "monix" % "3.3.0", + "io.monix" %% "monix" % "3.4.0", "io.monix" %% "monix-bio" % "1.1.0", "io.circe" %% "circe-core" % "0.13.0", "io.circe" %% "circe-generic" % "0.13.0", diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 863bcdd..0b904e8 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -1,6 +1,6 @@ javafx-dispatcher { type = "Dispatcher" - executor = "nova.monadic_sfx.executors.JavaFXEventThreadExecutorServiceConfigurator" + executor = "nova.monadic_sfx.concurrent.JavaFXEventThreadExecutorServiceConfigurator" throughput = 1 } akka.jvm-exit-on-fatal-error = on \ No newline at end of file diff --git a/src/main/scala/nova/monadic_sfx/App.scala b/src/main/scala/nova/monadic_sfx/App.scala new file mode 100644 index 0000000..23296b7 --- /dev/null +++ b/src/main/scala/nova/monadic_sfx/App.scala @@ -0,0 +1,350 @@ +package nova.monadic_sfx + +import cats.syntax.eq._ +import io.odin.Logger +import monix.bio.IO +import monix.bio.Task +import monix.eval.Coeval +import monix.execution.cancelables.CompositeCancelable +import monix.{eval => me} +import nova.monadic_sfx.concurrent.Schedulers +import nova.monadic_sfx.implicits._ +import nova.monadic_sfx.ui.FX +import nova.monadic_sfx.ui.components.router.FXRouter +import nova.monadic_sfx.ui.components.router.Page +import nova.monadic_sfx.ui.components.todo.TodoListStore +import nova.monadic_sfx.ui.components.todo.TodoListView +import nova.monadic_sfx.util.MediaPlayerResource +import nova.monadic_sfx.util.controls.JFXButton +import nova.monadic_sfx.util.controls.JFXDialog +import nova.monadic_sfx.util.controls.JFXTextField +import nova.monadic_sfx.util.controls.VideoView +import org.gerweck.scalafx.util._ +import org.kordamp.bootstrapfx.BootstrapFX +import scalafx.Includes._ +import scalafx.application.JFXApp3.PrimaryStage +import scalafx.beans.property.ObjectProperty +import scalafx.beans.property.StringProperty +import scalafx.collections.ObservableBuffer +import scalafx.geometry.Insets +import scalafx.geometry.Pos +import scalafx.scene.Node +import scalafx.scene.Parent +import scalafx.scene.Scene +import scalafx.scene.control.Label +import scalafx.scene.control.TableColumn +import scalafx.scene.control.TableView +import scalafx.scene.control.Tooltip +import scalafx.scene.layout.BorderPane +import scalafx.scene.layout.HBox +import scalafx.scene.layout.Priority +import scalafx.scene.layout.StackPane +import scalafx.util.Duration +import nova.monadic_sfx.util.controls.ActionObservableDsl + +import java.util.concurrent.TimeUnit +import scala.util.Random +import cats.effect.Resource +import nova.monadic_sfx.ui.FXComponent + +object App { + + val _scene = Coeval(new Scene { + root = new HBox { + padding = Insets(20) + // style = """| -fx-background-color: rgb(38, 38, 38); + // | -fx-text-fill: white;""".stripMargin + stylesheets ++= Seq( + BootstrapFX.bootstrapFXStylesheet, + os.rel / "static" / "css" / "main.css" + ) + } + }) + + val stage = Coeval(new PrimaryStage { + title = "Simple ScalaFX App" + scene = _scene.value() + minWidth = 700 + minHeight = 520 + width = 1280 + height = 720 + // resizable = false + }) +} +final class App( + // spawnProtocol: ActorSystem[SpawnProtocol.Command], + fx: FX, + schedulers: Schedulers, + startTime: Long, + mediaPlayer: MediaPlayerResource +)(implicit logger: Logger[Task]) { + + // val program = MyFxApp + // .resource(schedulers, stage) + // .evalMap { fxApp => + // new MainAppDelegate(schedulers, mediaPlayer, _scene).init + // .flatMap(mainSceneNode => Task(_scene.getChildren += mainSceneNode)) + // .executeOn(schedulers.fx) >> Task.pure(fxApp) + // } + // .use { fxApp => + // for { + // // _ <- Task(stage.resizable = false).executeOn(schedulers.fx) + // currentTime <- IO.clock.realTime(TimeUnit.MILLISECONDS) + // _ <- logger.info( + // s"Application started in ${(currentTime - startTime) / 1000f} seconds" + // ) + // // _ <- Task(CSSFX.start(stage)) + + val buttonStyle = """| -fx-padding: 0.7em 0.57em; + | -fx-font-size: 14px; + | -jfx-button-type: RAISED; + | -fx-background-color: rgb(77,102,204); + | -fx-pref-width: 200; + | -fx-text-fill: WHITE; """.stripMargin + val router = new FXRouter[Page] + + // val players = mediaPlayerFactory.mediaPlayers + // val videoPlayerController = players.newEmbeddedMediaPlayer() + // mediaPlayer.controller.videoSurface.set( + // videoSurfaceForImageView(videoView) + // ) + + // val videoPlayerControllerCleanup = + // Task(videoPlayerController.controls().stop()) >> + // Task(videoPlayerController.release()) + + val videoPage = Coeval { + new BorderPane { pane => + // val mp = new MediaPlayer( + // new Media( + // "https://download.oracle.com/otndocs/products/javafx/oow2010-2.flv" + // // "https://sample-videos.com/video123/mp4/720/big_buck_bunny_720p_5mb.mp4" + // ) + // ) { + // autoPlay = true + // } + // obs(pane).subscribe()(schedulers.async) + hgrow = Priority.Always + center = new VideoView(mediaPlayer.controller) { + alignmentInParent = Pos.Center + preserveRatio = true + // hgrow = Priority.Always + // this.prefWidth <== pane.prefWidth + // fitWidth = _scene.width.value - 40 + // _scene.width + // .map(_ - 40) + // .onChange((_, _, value) => fitWidth.value = value) + // (new DoubleProperty).<==(_scene.width.map(_ - 10)) + // fitWidth.<==(_scene.width.map(_ - 10)) + } + // videoPlayerController.video().videoDimension().setSize() + padding = Insets(0, 0, 5, 0) + bottom = new HBox { + val mrl = new StringProperty + spacing = 5 + children ++= Seq( + new JFXTextField { + text = "https://www.youtube.com/watch?v=0QKQlf8r7ls" + text ==> mrl + prefWidth = 100 + minWidth = 80 + }, + new JFXButton { + text = "Play Video" + style = buttonStyle + onAction = _ => { + if (mediaPlayer.controller.media().isValid()) + mediaPlayer.controller.controls().stop() + + mediaPlayer.controller + .media() + // .play( + // "https://download.oracle.com/otndocs/products/javafx/oow2010-2.flv" + // ) + // .play("https://www.youtube.com/watch?v=yZIummTz9mM") + .play(mrl.value) + } + }, + new JFXButton { + text = "Resume" + style = buttonStyle + onAction = _ => mediaPlayer.controller.controls().play() + }, + new JFXButton { + text = "Pause" + style = buttonStyle + onAction = _ => mediaPlayer.controller.controls().pause() + }, + new JFXButton { + text = "Stop" + style = buttonStyle + tooltip = new Tooltip { + text = "Stop" + showDelay = Duration(200) + } + onAction = _ => mediaPlayer.controller.controls().stop() + }, + new JFXButton { + text = "Get Status" + style = buttonStyle + onAction = _ => { + println(mediaPlayer.controller.status().state()) + } + } + ) + + } + } + } + + val init = for { + routerStore <- Resource.eval(router.store2(Page.Home, logger)) + todoStore <- Resource.eval(TodoListStore(logger)) + todoComponent <- TodoListView(todoStore) + videoPage <- Resource.eval(videoPage.to[Task]) + resolver: PartialFunction[Page, Parent] = { + case Page.Home => videoPage + // engine.load("https://www.youtube.com/embed/qmlegXdlnqI") + // engine.load("https://youtube.com/embed/aqz-KE-bpKQ") + // engine.load("http://www.youtube.com/embed/IyaFEBI_L24") + case Page.UserHome => + new Label { + styleClass ++= Seq("text-white") + text = s"User Home, Id = ${Random.nextInt()}" + } + case Page.Todo => todoComponent.node + } + routerNode <- FXComponent(implicit cc => + Task + .deferAction(implicit s => + Task(new HBox { box => + // implicit val cc = CompositeCancelable() + alignment = Pos.Center + //TODO find a better way to do this + // videoView.fitWidth <== box.prefWidth + children <-- router + .render2(resolver)(routerStore) + // call cancel on the old component to cancel all subscriptions + .scan[Parent](new Label("empty")) { case (a, b) => b } + .doOnNextF(s => logger.debug(s"Actual receive: $s")) + .map(_.delegate) + }) + ) + ) + + mainSceneNode <- FXComponent(implicit cc => + Task.deferAction(implicit s => + Task(new StackPane { root => + import ActionObservableDsl._ + alignment = Pos.Center + hgrow = Priority.Always + vgrow = Priority.Always + children = new BorderPane { + hgrow = Priority.Always + vgrow = Priority.Always + center = routerNode.node + bottom = new HBox { + // implicit val cc = CompositeCancelable() + alignment = Pos.Center + spacing = 20 + children = Seq( + new JFXButton { + text = "Forward" + style = buttonStyle + obsAction.map(_ => FXRouter.Forward) --> routerStore.sink + disable <-- routerStore.source.map { + case (_, FXRouter.State(_, h)) => + h.state.sp == h.state.values.size - 1 + } + }, + new JFXButton { + text = "Backward" + style = buttonStyle + + obsAction.mapEval(_ => + me.Task(println("Fired")) >> me.Task.pure(FXRouter.Backward) + ) --> routerStore.sink + disable <-- routerStore.source + .doOnNextF(b => Coeval(println(s"Received1: $b"))) + .map { + case (_, FXRouter.State(_, h)) => h.state.sp == 0 + } + }, + new JFXButton { + text = "Home" + style = buttonStyle + disable <-- routerStore.source + .map { case (_, FXRouter.State(p, _)) => p === Page.Home } + obsAction.map(_ => + FXRouter.Replace(Page.Home) + ) --> routerStore.sink + }, + new JFXButton { + text = "Todo" + style = buttonStyle + disable <-- routerStore.source + .map { case (_, FXRouter.State(p, _)) => p === Page.Todo } + obsAction.map(_ => + FXRouter.Replace(Page.Todo) + ) --> routerStore.sink + }, + new JFXButton { + text = "UserHome" + style = buttonStyle + disable <-- routerStore.source + .map { + case (_, FXRouter.State(p, _)) => p === Page.UserHome + } + obsAction.map(_ => + FXRouter.Replace(Page.UserHome) + ) --> routerStore.sink + }, + new JFXButton { + text = "Dialog" + style = buttonStyle + val d = new JFXDialog { + content = new HBox { + style = "-fx-background-color: black" + children = Seq(new Label { + styleClass ++= Seq("text-white") + text = "Sample Dialog" + }) + padding = Insets(20) + } + } + onAction = () => d.show(root) + } + ) + } + } + }) + ) + ) + } yield mainSceneNode + +} + +final class TestModel(_name: String, _age: Int) { + val name = StringProperty(_name).readOnly + val age = ObjectProperty(_age).readOnly +} + +object Test { + val items = ObservableBuffer( + new TestModel("hmm", 1), + new TestModel("hmm2", 2) + ) + + val ttv = new TableView[TestModel](items) { + columns ++= Seq( + new TableColumn[TestModel, String] { + text = "Name" + cellValueFactory = { _.value.name } + }, + new TableColumn[TestModel, Int] { + text = "Age" + cellValueFactory = { _.value.age } + } + ) + } +} diff --git a/src/main/scala/nova/monadic_sfx/MainModule.scala b/src/main/scala/nova/monadic_sfx/Loggers.scala similarity index 84% rename from src/main/scala/nova/monadic_sfx/MainModule.scala rename to src/main/scala/nova/monadic_sfx/Loggers.scala index b693d92..f1145fc 100644 --- a/src/main/scala/nova/monadic_sfx/MainModule.scala +++ b/src/main/scala/nova/monadic_sfx/Loggers.scala @@ -7,12 +7,9 @@ import cats.implicits._ import io.odin._ import io.odin.config._ import io.odin.syntax._ -import nova.monadic_sfx.actors.ActorModule -import nova.monadic_sfx.http.HttpModule -import nova.monadic_sfx.ui.UiModule import nova.monadic_sfx.util.reactive.store.Middlewares -trait MainModule extends ActorModule with UiModule with HttpModule { +object Loggers { def routerLogger(defaultLogger: Logger[Task], storeLogger: Logger[Task]) = enclosureRouting[Task]( "nova.monadic_sfx.util.reactive.store.Middlewares" -> storeLogger, @@ -37,5 +34,5 @@ trait MainModule extends ActorModule with UiModule with HttpModule { formatter = Middlewares.format ).withAsync(timeWindow = 10.millis) routerLogger <- routerLogger(defaultLogger, middlewareLogger) - } yield (routerLogger) + } yield routerLogger } diff --git a/src/main/scala/nova/monadic_sfx/Main.scala b/src/main/scala/nova/monadic_sfx/Main.scala index de643a6..5b46fcc 100644 --- a/src/main/scala/nova/monadic_sfx/Main.scala +++ b/src/main/scala/nova/monadic_sfx/Main.scala @@ -1,37 +1,52 @@ package nova.monadic_sfx -import _root_.monix.bio.BIOApp -import _root_.monix.bio.Task -import _root_.monix.bio.UIO -import _root_.monix.execution.Scheduler import cats.effect.ExitCode import cats.effect.Resource -import com.softwaremill.macwire._ -import io.odin._ -import nova.monadic_sfx.executors._ +import monix.bio.BIOApp +import monix.bio.Task +import monix.bio.UIO +import monix.execution.Scheduler +import nova.monadic_sfx.concurrent._ import nova.monadic_sfx.util.MediaPlayerResource +import nova.monadic_sfx.ui.FX // import nova.monadic_sfx.util.IOUtils._ // import sttp.client.httpclient.monix.HttpClientMonixBackend -object Main extends MainModule with BIOApp { - val schedulers = new Schedulers() +object Main extends BIOApp { + val schedulers = Schedulers.default - override def scheduler: Scheduler = schedulers.async + override def scheduler: Scheduler = schedulers.async.value - def appResource(startTime: Long) = - for { - implicit0(logger: Logger[Task]) <- makeLogger - - // backend and actorsystem are for future use - // backend <- Resource.make( - // toIO(HttpClientMonixBackend()(schedulers.async)) - // )(c => toIO(c.close())) - // actorSystem <- actorSystemResource(logger) - MediaPlayerResource <- MediaPlayerResource() - _ <- Resource.liftF(wire[MainApp].program) - } yield () + val appResource = for { + startTime <- Resource.eval(Task(System.currentTimeMillis())) + logger <- Loggers.makeLogger + // backend and actorsystem are for future use + // backend <- Resource.make( + // toIO(HttpClientMonixBackend()(schedulers.async)) + // )(c => toIO(c.close())) + // actorSystem <- actorSystemResource(logger) + fx <- FX.resource(schedulers, App.stage)(logger) + mediaPlayerResource <- MediaPlayerResource() + rootComponent = new App(fx, schedulers, startTime, mediaPlayerResource)( + logger + ).init + _ <- Resource.eval( + rootComponent + .evalMap(c => + for { + _ <- Task.unit + _ <- fx.addToScene(UIO.pure(c.node)) + _ <- fx.await + } yield () + ) + .use(_ => Task.unit) + .executeOn(schedulers.fx.value) + ) + // _ <- Resource.eval(f) + // _ <- Resource.eval(fx.await) + } yield () override def run(args: List[String]): UIO[ExitCode] = - appResource(System.currentTimeMillis()) + appResource .use(_ => Task.unit) .onErrorHandleWith(ex => UIO(ex.printStackTrace())) .as(ExitCode.Success) diff --git a/src/main/scala/nova/monadic_sfx/MainApp.scala b/src/main/scala/nova/monadic_sfx/MainApp.scala deleted file mode 100644 index b1813e2..0000000 --- a/src/main/scala/nova/monadic_sfx/MainApp.scala +++ /dev/null @@ -1,361 +0,0 @@ -package nova.monadic_sfx - -import java.util.concurrent.TimeUnit - -import scala.util.Random - -import cats.effect.Resource -import cats.syntax.eq._ -import cats.syntax.option._ -import com.softwaremill.macwire._ -import io.odin.Logger -import monix.bio.IO -import monix.bio.Task -import monix.eval.Coeval -import monix.execution.cancelables.CompositeCancelable -import monix.{eval => me} -import nova.monadic_sfx.executors.Schedulers -import nova.monadic_sfx.implicits._ -import nova.monadic_sfx.ui.MyFxApp -import nova.monadic_sfx.ui.components.router.FXRouter -import nova.monadic_sfx.ui.components.router.Page -import nova.monadic_sfx.ui.components.todo.TodoListStore -import nova.monadic_sfx.ui.components.todo.TodoListView -import nova.monadic_sfx.util.MediaPlayerResource -import nova.monadic_sfx.util.controls.JFXButton -import nova.monadic_sfx.util.controls.JFXDialog -import nova.monadic_sfx.util.controls.JFXTextField -import nova.monadic_sfx.util.controls.VideoView -import org.gerweck.scalafx.util._ -import org.kordamp.bootstrapfx.BootstrapFX -import scalafx.Includes._ -import scalafx.application.JFXApp3.PrimaryStage -import scalafx.beans.property.ObjectProperty -import scalafx.beans.property.StringProperty -import scalafx.collections.ObservableBuffer -import scalafx.geometry.Insets -import scalafx.geometry.Pos -import scalafx.scene.Node -import scalafx.scene.Parent -import scalafx.scene.Scene -import scalafx.scene.control.Button -import scalafx.scene.control.Label -import scalafx.scene.control.TableColumn -import scalafx.scene.control.TableView -import scalafx.scene.control.Tooltip -import scalafx.scene.layout.BorderPane -import scalafx.scene.layout.HBox -import scalafx.scene.layout.Priority -import scalafx.scene.layout.StackPane -import scalafx.util.Duration -class MainApp( - // spawnProtocol: ActorSystem[SpawnProtocol.Command], - schedulers: Schedulers, - startTime: Long, - mediaPlayer: MediaPlayerResource -)(implicit logger: Logger[Task]) { - - private lazy val _scene = new Scene { - root = new HBox { - padding = Insets(20) - // style = """| -fx-background-color: rgb(38, 38, 38); - // | -fx-text-fill: white;""".stripMargin - stylesheets ++= Seq( - BootstrapFX.bootstrapFXStylesheet, - os.rel / "static" / "css" / "main.css" - ) - } - } - - private lazy val stage = new PrimaryStage { - title = "Simple ScalaFX App" - scene = _scene - minWidth = 700 - minHeight = 520 - width = 1280 - height = 720 - // resizable = false - } - - (for { - (stopSignal, fxAppFib) <- MyFxApp.resource(schedulers, stage) - i <- Resource.make(Task(1))(_ => Task.unit) - } yield (stopSignal, fxAppFib, i)).use { - case (a, b, c) => Task.unit - } - - val program = MyFxApp - .resource(schedulers, stage) - .evalMap { - case (stopSignal, fxAppFib) => - wire[MainAppDelegate].init - .flatMap(mainSceneNode => Task(_scene.getChildren += mainSceneNode)) - .executeOn(schedulers.fx) >> Task.pure(stopSignal -> fxAppFib) - } - .use { - case (stopSignal, fxAppFib) => - for { - // _ <- Task(stage.resizable = false).executeOn(schedulers.fx) - currentTime <- IO.clock.realTime(TimeUnit.MILLISECONDS) - _ <- logger.info( - s"Application started in ${(currentTime - startTime) / 1000f} seconds" - ) - // _ <- Task(CSSFX.start(stage)) - _ <- fxAppFib.join - } yield () - } - -} - -class MainAppDelegate( - schedulers: Schedulers, - mediaPlayer: MediaPlayerResource, - _scene: Scene -)(implicit logger: Logger[Task]) { - val buttonStyle = """| -fx-padding: 0.7em 0.57em; - | -fx-font-size: 14px; - | -jfx-button-type: RAISED; - | -fx-background-color: rgb(77,102,204); - | -fx-pref-width: 200; - | -fx-text-fill: WHITE; """.stripMargin - val router = new FXRouter[Page] - - // val players = mediaPlayerFactory.mediaPlayers - // val videoPlayerController = players.newEmbeddedMediaPlayer() - // mediaPlayer.controller.videoSurface.set( - // videoSurfaceForImageView(videoView) - // ) - - // val videoPlayerControllerCleanup = - // Task(videoPlayerController.controls().stop()) >> - // Task(videoPlayerController.release()) - - val videoPage = new BorderPane { pane => - // val mp = new MediaPlayer( - // new Media( - // "https://download.oracle.com/otndocs/products/javafx/oow2010-2.flv" - // // "https://sample-videos.com/video123/mp4/720/big_buck_bunny_720p_5mb.mp4" - // ) - // ) { - // autoPlay = true - // } - // obs(pane).subscribe()(schedulers.async) - hgrow = Priority.Always - center = new VideoView(mediaPlayer.controller) { - alignmentInParent = Pos.Center - preserveRatio = true - // hgrow = Priority.Always - // this.prefWidth <== pane.prefWidth - fitWidth = _scene.width.value - 40 - _scene.width - .map(_ - 40) - .onChange((_, _, value) => fitWidth.value = value) - // (new DoubleProperty).<==(_scene.width.map(_ - 10)) - // fitWidth.<==(_scene.width.map(_ - 10)) - } - // videoPlayerController.video().videoDimension().setSize() - padding = Insets(0, 0, 5, 0) - bottom = new HBox { - val mrl = new StringProperty - spacing = 5 - children ++= Seq( - new JFXTextField { - text = "https://www.youtube.com/watch?v=0QKQlf8r7ls" - text ==> mrl - prefWidth = 100 - minWidth = 80 - }, - new JFXButton { - text = "Play Video" - style = buttonStyle - onAction = _ => { - if (mediaPlayer.controller.media().isValid()) - mediaPlayer.controller.controls().stop() - - mediaPlayer.controller - .media() - // .play( - // "https://download.oracle.com/otndocs/products/javafx/oow2010-2.flv" - // ) - // .play("https://www.youtube.com/watch?v=yZIummTz9mM") - .play(mrl.value) - } - }, - new JFXButton { - text = "Resume" - style = buttonStyle - onAction = _ => mediaPlayer.controller.controls().play() - }, - new JFXButton { - text = "Pause" - style = buttonStyle - onAction = _ => mediaPlayer.controller.controls().pause() - }, - new JFXButton { - text = "Stop" - style = buttonStyle - tooltip = new Tooltip { - text = "Stop" - showDelay = Duration(200) - } - onAction = _ => mediaPlayer.controller.controls().stop() - }, - new JFXButton { - text = "Get Status" - style = buttonStyle - onAction = _ => { - println(mediaPlayer.controller.status().state()) - } - } - ) - - } - } - - val init: Task[Node] = for { - routerStore <- router.store(Page.Home, logger) - todoStore <- TodoListStore(logger) - todoComponent <- TodoListView(todoStore) - resolver: PartialFunction[Page, Parent] = { - case Page.Home => videoPage - // engine.load("https://www.youtube.com/embed/qmlegXdlnqI") - // engine.load("https://youtube.com/embed/aqz-KE-bpKQ") - // engine.load("http://www.youtube.com/embed/IyaFEBI_L24") - case Page.UserHome => - new Label { - styleClass ++= Seq("text-white") - text = s"User Home, Id = ${Random.nextInt()}" - } - case Page.Todo => todoComponent - } - routerNode: Node <- - Task - .deferAction(implicit s => - Task(new HBox { box => - alignment = Pos.Center - //TODO find a better way to do this - // videoView.fitWidth <== box.prefWidth - children <-- router - .render(resolver)(routerStore) - // call cancel on the old component to cancel all subscriptions - .scan[Parent](new Label("empty")) { case (a, b) => b } - .doOnNextF(s => logger.debug(s"Actual receive: $s")) - .map(_.delegate) - }) - ) - - mainSceneNode <- Task.deferAction(implicit s => - Task(new StackPane { root => - alignment = Pos.Center - hgrow = Priority.Always - vgrow = Priority.Always - children = new BorderPane { - hgrow = Priority.Always - vgrow = Priority.Always - center = routerNode - bottom = new HBox { - implicit val cc = CompositeCancelable() - alignment = Pos.Center - spacing = 20 - children = Seq( - new JFXButton { - text = "Forward" - style = buttonStyle - obsAction.useLazyEval( - me.Task.pure(FXRouter.Forward) - ) --> routerStore - disable <-- routerStore.map { - case (_, FXRouter.State(_, h)) => - h.state.sp == h.state.values.size - 1 - } - }, - new JFXButton { - text = "Backward" - style = buttonStyle - - obsAction.useLazyEval( - me.Task(println("Fired")) >> me.Task.pure(FXRouter.Backward) - ) --> routerStore - disable <-- routerStore - .doOnNextF(b => Coeval(println(s"Received1: $b"))) - .map { - case (_, FXRouter.State(_, h)) => h.state.sp == 0 - } - }, - new JFXButton { - text = "Home" - style = buttonStyle - disable <-- routerStore - .map { case (_, FXRouter.State(p, _)) => p === Page.Home } - obsAction - .useLazyEval( - me.Task.pure(FXRouter.Replace(Page.Home)) - ) --> routerStore - }, - new JFXButton { - text = "Todo" - style = buttonStyle - disable <-- routerStore - .map { case (_, FXRouter.State(p, _)) => p === Page.Todo } - obsAction - .useLazyEval( - me.Task.pure(FXRouter.Replace(Page.Todo)) - ) --> routerStore - }, - new JFXButton { - text = "UserHome" - style = buttonStyle - disable <-- routerStore - .map { case (_, FXRouter.State(p, _)) => p == Page.UserHome } - obsAction - .useLazyEval( - me.Task.pure(FXRouter.Replace(Page.UserHome)) - ) --> routerStore - }, - new JFXButton { - text = "Dialog" - style = buttonStyle - val d = new JFXDialog { - content = new HBox { - style = "-fx-background-color: black" - children = Seq(new Label { - styleClass ++= Seq("text-white") - text = "Sample Dialog" - }) - padding = Insets(20) - } - } - onAction = () => d.show(root) - } - ) - } - } - }) - ) - } yield mainSceneNode -} - -class TestModel(_name: String, _age: Int) { - val name = StringProperty(_name).readOnly - val age = ObjectProperty(_age).readOnly -} - -object Test { - val items = ObservableBuffer( - new TestModel("hmm", 1), - new TestModel("hmm2", 2) - ) - - val ttv = new TableView[TestModel](items) { - columns ++= Seq( - new TableColumn[TestModel, String] { - text = "Name" - cellValueFactory = { _.value.name } - }, - new TableColumn[TestModel, Int] { - text = "Age" - cellValueFactory = { _.value.age } - } - ) - } -} diff --git a/src/main/scala/nova/monadic_sfx/executors/GUIExecutor.scala b/src/main/scala/nova/monadic_sfx/concurrent/GUIExecutor.scala similarity index 98% rename from src/main/scala/nova/monadic_sfx/executors/GUIExecutor.scala rename to src/main/scala/nova/monadic_sfx/concurrent/GUIExecutor.scala index 3affbb2..eb53283 100644 --- a/src/main/scala/nova/monadic_sfx/executors/GUIExecutor.scala +++ b/src/main/scala/nova/monadic_sfx/concurrent/GUIExecutor.scala @@ -1,4 +1,4 @@ -package nova.monadic_sfx.executors +package nova.monadic_sfx.concurrent import java.util.Collections import java.util.concurrent.AbstractExecutorService diff --git a/src/main/scala/nova/monadic_sfx/concurrent/Schedulers.scala b/src/main/scala/nova/monadic_sfx/concurrent/Schedulers.scala new file mode 100644 index 0000000..c12930f --- /dev/null +++ b/src/main/scala/nova/monadic_sfx/concurrent/Schedulers.scala @@ -0,0 +1,45 @@ +package nova.monadic_sfx.concurrent + +import com.typesafe.scalalogging.Logger +import monix.execution.Scheduler +import monix.execution.UncaughtExceptionReporter +import monix.execution.schedulers.TracingScheduler +import cats.effect.Blocker + +final case class Schedulers( + io: Schedulers.IoScheduler, + async: Schedulers.AsyncScheduler, + fx: Schedulers.FxScheduler +) + +object Schedulers { + val reporter = UncaughtExceptionReporter { ex => + val logger = Logger[Schedulers] + logger.error("Uncaught exception", ex) + } + + val default = Schedulers( + IoScheduler( + Scheduler + .io() + .withUncaughtExceptionReporter(Schedulers.reporter) + ), + AsyncScheduler( + Scheduler + .computation() + .withUncaughtExceptionReporter(Schedulers.reporter) + ), + FxScheduler( + TracingScheduler( + JFXExecutionContexts.fxScheduler + .withUncaughtExceptionReporter(Schedulers.reporter) + ) + ) + ) + + final case class AsyncScheduler(value: Scheduler) + final case class IoScheduler(value: Scheduler) { + val blocker = Blocker.liftExecutionContext(value) + } + final case class FxScheduler(value: Scheduler) +} diff --git a/src/main/scala/nova/monadic_sfx/executors/ExecutorsModule.scala b/src/main/scala/nova/monadic_sfx/executors/ExecutorsModule.scala deleted file mode 100644 index e672377..0000000 --- a/src/main/scala/nova/monadic_sfx/executors/ExecutorsModule.scala +++ /dev/null @@ -1,5 +0,0 @@ -package nova.monadic_sfx.executors - -trait ExecutorsModule { - lazy val schedulers = new Schedulers() -} diff --git a/src/main/scala/nova/monadic_sfx/executors/Schedulers.scala b/src/main/scala/nova/monadic_sfx/executors/Schedulers.scala deleted file mode 100644 index e19359a..0000000 --- a/src/main/scala/nova/monadic_sfx/executors/Schedulers.scala +++ /dev/null @@ -1,28 +0,0 @@ -package nova.monadic_sfx.executors - -import com.typesafe.scalalogging.Logger -import monix.execution.Scheduler -import monix.execution.UncaughtExceptionReporter -import monix.execution.schedulers.TracingScheduler - -class Schedulers( - val blocking: Scheduler = TracingScheduler( - Scheduler - .io() - .withUncaughtExceptionReporter(Schedulers.reporter) - ), - val async: Scheduler = Scheduler.traced - .withUncaughtExceptionReporter(Schedulers.reporter), - val fx: Scheduler = TracingScheduler( - JFXExecutionContexts.fxScheduler - .withUncaughtExceptionReporter(Schedulers.reporter) - ) -) - -object Schedulers { - val reporter = UncaughtExceptionReporter { ex => - val logger = Logger[Schedulers] - logger.error("Uncaught exception", ex) - } - -} diff --git a/src/main/scala/nova/monadic_sfx/implicits/JavaFxMonixObservables.scala b/src/main/scala/nova/monadic_sfx/implicits/JavaFxMonixObservables.scala index efc3896..96ba77c 100644 --- a/src/main/scala/nova/monadic_sfx/implicits/JavaFxMonixObservables.scala +++ b/src/main/scala/nova/monadic_sfx/implicits/JavaFxMonixObservables.scala @@ -27,6 +27,7 @@ import scalafx.event.subscriptions.Subscription import scalafx.scene.Scene import scalafx.scene.control.ButtonBase import scalafx.scene.control.MenuItem +import nova.monadic_sfx.util.reactive.store.Sink2 trait JavaFXMonixObservables { import JavaFXMonixObservables._ @@ -106,9 +107,9 @@ object JavaFXMonixObservables { final class PropertyExt[T, J](private val prop: Property[T, J]) extends AnyVal { - def -->[J1 >: J](sub: Observer[J1]) = { - prop.onChange((a, b, c) => if (c != null) sub.onNext(c)) - } + // def -->[J1 >: J](sub: Observer[J1]) = { + // prop.onChange((a, b, c) => if (c != null) sub.onNext(c)) + // } def ==>(op: Property[T, J]) = { op <== prop @@ -141,18 +142,24 @@ object JavaFXMonixObservables { final class ObjectPropertyExt[A](private val prop: ObjectProperty[A]) extends AnyVal { - def -->(sub: Observer[A]) = - prop.onChange((a, b, c) => - if (c != null) - if (sub.onNext(c) == Ack.Stop) throw new Exception("boom") - ) + // def -->(sub: Observer[A]) = + // prop.onChange((a, b, c) => + // if (c != null) + // if (sub.onNext(c) == Ack.Stop) throw new Exception("boom") + // ) + + def -->(sink: Sink2[A])(implicit s: Scheduler, c: CompositeCancelable) = + c += observableChange.doOnNextF(sink.offer).subscribe() - def ==>(op: Property[A, A]) = { - prop.onChange((a, b, c) => if (c != null) op() = c) + def ==>(op: Property[A, A])(implicit c: CompositeCancelable) = { + val canc = prop.onChange((a, b, c) => if (c != null) op() = c) + c += Cancelable(() => canc.cancel()) } - def <--(obs: Observable[A])(implicit s: Scheduler) = { - obs.doOnNextF(v => Coeval(prop() = v)).subscribe() + def <--( + obs: Observable[A] + )(implicit s: Scheduler, c: CompositeCancelable) = { + c += obs.doOnNextF(v => Coeval(prop() = v)).subscribe() } def observableChange[J1 >: A]: Observable[J1] = { @@ -170,9 +177,8 @@ object JavaFXMonixObservables { } } - final class ObservableListExt[A]( - private val buffer: ObservableBuffer[A] - ) extends AnyVal { + final class ObservableListExt[A](private val buffer: ObservableBuffer[A]) + extends AnyVal { // def -->(sub: Observer[A]) = // buffer.onChange((a, b, c) => if (c != null) sub.onNext(c)) @@ -181,8 +187,10 @@ object JavaFXMonixObservables { // buffer.onChange((a, b, c) => if (c != null) op() = c) // } - def <--(obs: Observable[A])(implicit s: Scheduler) = { - obs + def <--( + obs: Observable[A] + )(implicit s: Scheduler, c: CompositeCancelable) = { + c += obs .doOnNextF(v => for { _ <- Coeval(buffer.clear()) @@ -192,35 +200,35 @@ object JavaFXMonixObservables { .subscribe() } - def observableChange[J1 >: A]: Observable[J1] = { - import monix.execution.cancelables.SingleAssignCancelable - Observable.create(OverflowStrategy.Unbounded) { sub => - val c = SingleAssignCancelable() + // def observableChange[J1 >: A]: Observable[J1] = { + // import monix.execution.cancelables.SingleAssignCancelable + // Observable.create(OverflowStrategy.Unbounded) { sub => + // val c = SingleAssignCancelable() - implicit val s = sub.scheduler + // implicit val s = sub.scheduler - val canc = - buffer.onChange((buf, _) => - loop(sub, buf.toIterable.iterator, c).runToFuture - ) + // val canc = + // buffer.onChange((buf, _) => + // loop(sub, buf.toIterable.iterator, c).runToFuture + // ) - c := Cancelable(() => canc.cancel()) - c - } - } + // c := Cancelable(() => canc.cancel()) + // c + // } + // } - private def loop( - sub: Observer[A], - it: Iterator[A], - c: Cancelable - ): Task[Unit] = - if (it.hasNext) { - val next = it.next() - Task.deferFuture(sub.onNext(next)).flatMap { - case Ack.Continue => loop(sub, it, c) - case Ack.Stop => Task(c.cancel()) - } - } else Task.unit + // private def loop( + // sub: Observer[A], + // it: Iterator[A], + // c: Cancelable + // ): Task[Unit] = + // if (it.hasNext) { + // val next = it.next() + // Task.deferFuture(sub.onNext(next)).flatMap { + // case Ack.Continue => loop(sub, it, c) + // case Ack.Stop => Task(c.cancel()) + // } + // } else Task.unit } final class StringObservableListExt( @@ -235,9 +243,9 @@ object JavaFXMonixObservables { final class ReadOnlyPropertyExt[T, J]( private val prop: ReadOnlyProperty[T, J] ) extends AnyVal { - def -->[J1 >: J](sub: Observer[J1]) = { - prop.onChange((a, b, c) => if (c != null) sub.onNext(c)) - } + // def -->[J1 >: J](sub: Observer[J1]) = { + // prop.onChange((a, b, c) => if (c != null) sub.onNext(c)) + // } def ==>(op: Property[T, J]) = { op <== prop @@ -261,45 +269,50 @@ object JavaFXMonixObservables { final class ObjectPropertyObservableListExt[A]( private val prop: ObjectProperty[ObservableList[A]] ) extends AnyVal { - def <--(obs: Observable[Seq[A]])(implicit s: Scheduler) = { - obs.doOnNext(v => me.Task(prop() = ObservableBuffer.from(v))).subscribe() - } - def -->(sub: Observer[A])(implicit s: Scheduler) = { - val c = SingleAssignCancelable() - val subs: Subscription = prop.onChange((a, b, c1) => - if (c1 != null) - Iterant[Task] - .fromIterable(c1.toIterable) - .consume - .use(consume(sub, c, _)) - .runToFuture - ) - c := Cancelable(() => subs.cancel()) + def <--( + obs: Observable[Seq[A]] + )(implicit s: Scheduler, c: CompositeCancelable) = { + c += obs + .doOnNext(v => me.Task(prop() = ObservableBuffer.from(v))) + .subscribe() } - private def loop(sub: Observer[A], it: Iterator[A]): Task[Unit] = - if (it.hasNext) { - val next = it.next() - Task.deferFuture(sub.onNext(next)).flatMap { - case Ack.Continue => loop(sub, it) - case Ack.Stop => Task.unit - } - } else Task.unit - - private def consume( - sub: Observer[A], - c: Cancelable, - consumer: Iterant.Consumer[Task, A] - ): Task[Unit] = - consumer.pull.flatMap { - case Left(value) => Task.unit - case Right(value) => - Task.deferFuture(sub.onNext(value)).flatMap { - case Ack.Continue => consume(sub, c, consumer) - case Ack.Stop => Task(c.cancel()) - } - } + // def -->(sub: Observer[A])(implicit s: Scheduler) = { + // val c = SingleAssignCancelable() + // val subs: Subscription = prop.onChange((a, b, c1) => + // if (c1 != null) + // Iterant[Task] + // .fromIterable(c1.toIterable) + // .consume + // .use(consume(sub, c, _)) + // .runToFuture + // ) + // c := Cancelable(() => subs.cancel()) + // } + + // private def loop(sub: Observer[A], it: Iterator[A]): Task[Unit] = + // if (it.hasNext) { + // val next = it.next() + // Task.deferFuture(sub.onNext(next)).flatMap { + // case Ack.Continue => loop(sub, it) + // case Ack.Stop => Task.unit + // } + // } else Task.unit + + // private def consume( + // sub: Observer[A], + // c: Cancelable, + // consumer: Iterant.Consumer[Task, A] + // ): Task[Unit] = + // consumer.pull.flatMap { + // case Left(value) => Task.unit + // case Right(value) => + // Task.deferFuture(sub.onNext(value)).flatMap { + // case Ack.Continue => consume(sub, c, consumer) + // case Ack.Stop => Task(c.cancel()) + // } + // } } diff --git a/src/main/scala/nova/monadic_sfx/implicits/package.scala b/src/main/scala/nova/monadic_sfx/implicits/package.scala index 2cda7d2..1e423a2 100644 --- a/src/main/scala/nova/monadic_sfx/implicits/package.scala +++ b/src/main/scala/nova/monadic_sfx/implicits/package.scala @@ -2,10 +2,23 @@ package nova.monadic_sfx import monix.eval.TaskLike import monix.reactive.Observable +import monix.bio.IO package object implicits extends MySfxObservableImplicits with JavaFXMonixObservables { + + implicit final class MonixEvalTaskExt[T](private val task: monix.eval.Task[T]) + extends AnyVal { + def toIO = IO.deferAction(implicit s => IO.from(task)) + } + + implicit final class MonixBioTaskExt[T](private val task: monix.bio.Task[T]) + extends AnyVal { + def toTask = + monix.eval.Task.deferAction(implicit s => monix.eval.Task.from(task)) + } + implicit class SttpWsOps[F[_]](private val ws: sttp.client.ws.WebSocket[F]) extends AnyVal { def observableSource(implicit F: TaskLike[F]) = diff --git a/src/main/scala/nova/monadic_sfx/ui/DefaultUI.scala b/src/main/scala/nova/monadic_sfx/ui/DefaultScene.scala similarity index 93% rename from src/main/scala/nova/monadic_sfx/ui/DefaultUI.scala rename to src/main/scala/nova/monadic_sfx/ui/DefaultScene.scala index edd2f4e..70fd4f3 100644 --- a/src/main/scala/nova/monadic_sfx/ui/DefaultUI.scala +++ b/src/main/scala/nova/monadic_sfx/ui/DefaultScene.scala @@ -10,10 +10,11 @@ import scalafx.scene.layout.VBox import scalafx.scene.paint.Color._ import scalafx.scene.paint._ import scalafx.scene.text.Text +import monix.eval.Coeval -object DefaultUI { - val scene = - new Scene { +object DefaultScene { + def apply() = + Coeval(new Scene { fill = Color.rgb(38, 38, 38) content = new VBox { alignment = Pos.Center @@ -48,5 +49,5 @@ object DefaultUI { } ) } - } + }) } diff --git a/src/main/scala/nova/monadic_sfx/ui/FX.scala b/src/main/scala/nova/monadic_sfx/ui/FX.scala new file mode 100644 index 0000000..8234fa7 --- /dev/null +++ b/src/main/scala/nova/monadic_sfx/ui/FX.scala @@ -0,0 +1,84 @@ +package nova.monadic_sfx.ui + +import scala.concurrent.duration._ + +import cats.effect.Resource +import io.odin.Logger +import monix.bio.Task +import monix.execution.CancelablePromise +import nova.monadic_sfx.concurrent.Schedulers +import nova.monadic_sfx.ui.DefaultScene +import scalafx.application.JFXApp3 +import scalafx.application.JFXApp3.PrimaryStage +import monix.bio.UIO +import monix.eval.Coeval +import scalafx.scene.Scene +import scalafx.scene.Parent +import scalafx.Includes._ +import monix.bio.IO +import scalafx.scene.Node + +final class FX private ( + schedulers: Schedulers, + delegate: JFXApp3, + val await: Task[Unit], + val awaitStop: Task[Unit] +)(implicit logger: Logger[Task]) { + logger.debug("whoopie") + + def runOnFx[E, A](io: IO[E, A]) = io.executeOn(schedulers.fx.value) + + def addToScene(node: UIO[Node]) = + runOnFx { + for { + p <- node + _ <- UIO(delegate.stage.scene().getChildren += p) + } yield () + } +} + +object FX { + def resource( + schedulers: Schedulers, + stage: Coeval[PrimaryStage], + initialScene: Coeval[Scene] = DefaultScene(), + transitionDelay: FiniteDuration = 500.millis + )(implicit + logger: Logger[Task] + ): Resource[Task, FX] = + Resource + .make(for { + _ <- logger.info("Starting FX App") + makePromise = UIO(CancelablePromise[Unit]()) + startSignal <- makePromise + stopSignal <- makePromise + delegate <- Task(new JFXApp3 { + def start(): Unit = { + stage = new PrimaryStage { + scene = initialScene.value() + } + startSignal.success(()) + } + + override def stopApp(): Unit = { + stopSignal.success(()) + } + }) + fib <- + Task(delegate.main(Array.empty)).start.executeOn(schedulers.io.value) + fxApp <- Task( + new FX( + schedulers, + delegate, + fib.join, + Task.fromCancelablePromise(stopSignal) + ) + ) + _ <- Task.fromCancelablePromise(startSignal) + _ <- Task.sleep(transitionDelay) + _ <- Task(delegate.stage = stage.value()) + .executeOn(schedulers.fx.value) + .delayExecution(transitionDelay) + } yield fxApp -> fib) { case _ -> fib => fib.cancel } + .map { case a -> _ => a } +} diff --git a/src/main/scala/nova/monadic_sfx/ui/FXComponent.scala b/src/main/scala/nova/monadic_sfx/ui/FXComponent.scala index 825969c..860d662 100644 --- a/src/main/scala/nova/monadic_sfx/ui/FXComponent.scala +++ b/src/main/scala/nova/monadic_sfx/ui/FXComponent.scala @@ -14,43 +14,42 @@ import scalafx.scene.text.Font import cats.effect.Resource final class FXComponent private ( - val rootNode: Parent, - val cancelable: Cancelable -) + val node: Parent, + private val cancelable: Cancelable +) { + +// def toNode(implicit c: CompositeCancelable): Node = { +// c += cancelable +// rootNode +// } +} object FXComponent { - def acquire(f: CompositeCancelable => Task[Parent]) = + private def acquire(f: CompositeCancelable => Task[Parent]) = for { c <- Task(CompositeCancelable()) p <- f(c) } yield new FXComponent(p, c) - def fxComponent2Node( - component: FXComponent - )(implicit c: CompositeCancelable): Node = { - c += component.cancelable - component.rootNode - } - - def resource(f: CompositeCancelable => Task[Parent]) = + def apply(f: CompositeCancelable => Task[Parent]) = Resource.make(acquire(f))(comp => Task(comp.cancelable.cancel())) } object TestFXComp { - val testComp = - FXComponent.resource { implicit c => - Task.deferAction { implicit s => - Task { - val sub = ConcurrentSubject.publish[jfxc.text.Font] - val f = ObjectProperty(Font("hmm")) - sub.onNext(f()) - new TextField { - font <-- sub - font <== f - } - } - } - } + // val testComp = + // FXComponent.resource { implicit c => + // Task.deferAction { implicit s => + // Task { + // val sub = ConcurrentSubject.publish[jfxc.text.Font] + // val f = ObjectProperty(Font("hmm")) + // sub.onNext(f()) + // new TextField { + // font <-- sub + // font <== f + // } + // } + // } + // } // val x = for { // comp <- testComp diff --git a/src/main/scala/nova/monadic_sfx/ui/MyFxApp.scala b/src/main/scala/nova/monadic_sfx/ui/MyFxApp.scala deleted file mode 100644 index 24a58cb..0000000 --- a/src/main/scala/nova/monadic_sfx/ui/MyFxApp.scala +++ /dev/null @@ -1,60 +0,0 @@ -package nova.monadic_sfx.ui - -import scala.concurrent.duration._ - -import cats.effect.Resource -import io.odin.Logger -import monix.bio.Fiber -import monix.bio.Task -import monix.execution.CancelablePromise -import nova.monadic_sfx.executors.Schedulers -import nova.monadic_sfx.ui.DefaultUI -import scalafx.application.JFXApp3 -import scalafx.application.JFXApp3.PrimaryStage - -class MyFxApp(val schedulers: Schedulers)(implicit logger: Logger[Task]) { - - private def internal( - startSignal: CancelablePromise[Unit], - stopSignal: CancelablePromise[Unit] - ) = - new JFXApp3 { - def start(): Unit = { - stage = new PrimaryStage { - scene = DefaultUI.scene - } - startSignal.success(()) - } - - override def stopApp(): Unit = { - stopSignal.success(()) - } - } - -} - -object MyFxApp { - def resource( - schedulers: Schedulers, - stage: => PrimaryStage, - transitionDelay: FiniteDuration = 500.millis - )(implicit - logger: Logger[Task] - ): Resource[Task, (Task[Unit], Fiber[Throwable, Unit])] = - Resource.make(for { - _ <- logger.info("Starting FX App") - fxApp <- Task(new MyFxApp(schedulers)) - startSignal <- Task(CancelablePromise[Unit]()) - stopSignal <- Task(CancelablePromise[Unit]()) - delegate <- Task(fxApp.internal(startSignal, stopSignal)) - fib <- - Task(delegate.main(Array.empty)).start.executeOn(schedulers.blocking) - _ <- Task.fromCancelablePromise(startSignal) - _ <- Task.sleep(transitionDelay) - _ <- Task(delegate.stage = stage) - .executeOn(schedulers.fx) - .delayExecution(transitionDelay) - } yield Task.fromCancelablePromise(stopSignal) -> fib) { - case _ -> fib => fib.cancel - } -} diff --git a/src/main/scala/nova/monadic_sfx/ui/MyFxAppOld.scala b/src/main/scala/nova/monadic_sfx/ui/MyFxAppOld.scala deleted file mode 100644 index 66556a9..0000000 --- a/src/main/scala/nova/monadic_sfx/ui/MyFxAppOld.scala +++ /dev/null @@ -1,129 +0,0 @@ -package nova.monadic_sfx.ui - -import scala.concurrent.duration._ - -import akka.actor.typed._ -import akka.util.Timeout -import com.softwaremill.macwire._ -import io.odin.Logger -import monix.eval.Task -import monix.execution.Callback -import monix.execution.Scheduler -import nova.monadic_sfx.AppTypes -import nova.monadic_sfx.actors.Counter -import nova.monadic_sfx.executors.Schedulers -import nova.monadic_sfx.http.Requesters -import scalafx.application.JFXApp - -class MyFxAppOld( - logger: Logger[Task], - backend: AppTypes.HttpBackend, - actorSystem: ActorSystem[SpawnProtocol.Command], - requesters: Requesters, - schedulers: Schedulers -) extends JFXApp { - - implicit lazy val defaultScheduler: Scheduler = schedulers.fx - - // lazy val fxActor: Task[ActorRef[Counter.Command]] = wireWith( - // MyFxApp.makeCounterActor _ - // ) - - lazy val application = - for { - appStage <- Task(wireWith(UiModule.makePrimaryStage _)) - - // _ <- Task { - // val counterActor = testActor(actorSystem) - // counterActor ! (Counter.Increment) - // } - // ta <- testActor2(actorSystem) - // actor <- - // actorTask.bracket(actor => Task(actor ! (Counter.Increment)))(actor => - // Task(actor ! (Counter.Stop)) - // ) - // actor <- actorTask - // actor <- fxActor - // _ <- Task(actor ! Counter.Increment) - _ <- Task { stage = appStage } - _ <- Task.sleep(2.seconds) - // loginScene <- wire[LoginScreen].render - // _ <- Task { - // // appStage.maximized = true - // appStage.height = 800 - // appStage.width = 800 - // appStage - // .scene() - // .setRoot( - // loginScene - // ) - // } - } yield () - -// def testActor( -// system: ActorSystem -// ): akka.actor.typed.ActorRef[Counter.Command] = { -// val behaviour: Behavior[Counter.Command] = -// Behaviors.setup(context => wire[Counter]) -// system.spawn( -// behaviour, -// "CounterActor", -// DispatcherSelector.fromConfig("javafx-dispatcher") -// ) -// } - - application.timed.runAsync( - new Callback[Throwable, (FiniteDuration, Unit)] { - - override def onSuccess(value: (FiniteDuration, Unit)): Unit = { - val (duration, _) = value - println( - s"Application started successfully in ${duration.toSeconds} seconds" - ) - } - - override def onError(e: Throwable): Unit = { - println("Application start failed. Reason -") - e.printStackTrace() - } - - } - ) - - override def stopApp() = { - // val stop = for { - // actor <- fxActor - // _ <- logger.info("Stopping actor counter") - // // _ <- Task.fromFuture { actor.ask[Counter.Value](Counter.GetValue) } - // t <- Task(actor ! Counter.Stop) - // // _ <- Task.sleep(1.second) - // _ <- logger.info("Counter actor stopped") - // } yield () - // stop.runAsyncAndForget - // // Platform.exit() - } -} - -object MyFxAppOld { - def makeCounterActor( - system: ActorSystem[SpawnProtocol.Command], - logger: Logger[Task] - ): Task[ActorRef[Counter.Command]] = { - import akka.actor.typed.scaladsl.AskPattern._ - import scala.concurrent.ExecutionContext - - implicit val timeout: Timeout = Timeout(3.seconds) - implicit val ec: ExecutionContext = system.executionContext - implicit val scheduler = system.scheduler - Task.fromFuture { - system.ask( - SpawnProtocol.Spawn( - behavior = wireWith(Counter.apply _), - name = "counterActor", - DispatcherSelector.fromConfig("javafx-dispatcher"), - _ - ) - ) - } - } -} diff --git a/src/main/scala/nova/monadic_sfx/ui/UiModule.scala b/src/main/scala/nova/monadic_sfx/ui/UiModule.scala deleted file mode 100644 index b1f6f0d..0000000 --- a/src/main/scala/nova/monadic_sfx/ui/UiModule.scala +++ /dev/null @@ -1,37 +0,0 @@ -package nova.monadic_sfx.ui - -import akka.actor.typed._ -import cats.effect.Resource -import com.softwaremill.macwire._ -import io.odin.Logger -import monix.eval.Task -import nova.monadic_sfx.AppTypes -import nova.monadic_sfx.executors.Schedulers -import nova.monadic_sfx.http.Requesters -import scalafx.application.JFXApp -import scalafx.application.JFXApp.PrimaryStage - -trait UiModule { - def fxAppResource( - logger: Logger[Task], - backend: AppTypes.HttpBackend, - actorSystem: ActorSystem[SpawnProtocol.Command], - requesters: Requesters, - schedulers: Schedulers - ): Resource[Task, JFXApp] = - Resource.make(for { - _ <- logger.info("Creating FX Application") - app <- Task { wire[MyFxAppOld] } - } yield (app))(app => logger.info("Stopping FX Application")) -} - -object UiModule { - def makePrimaryStage( - backend: AppTypes.HttpBackend, - actorSystem: ActorSystem[SpawnProtocol.Command] - ) = { - new PrimaryStage { - scene = DefaultUI.scene - } - } -} diff --git a/src/main/scala/nova/monadic_sfx/ui/components/router/FXRouter.scala b/src/main/scala/nova/monadic_sfx/ui/components/router/FXRouter.scala index 82a395a..affb040 100644 --- a/src/main/scala/nova/monadic_sfx/ui/components/router/FXRouter.scala +++ b/src/main/scala/nova/monadic_sfx/ui/components/router/FXRouter.scala @@ -21,6 +21,7 @@ import nova.monadic_sfx.util.reactive.store.Middlewares import nova.monadic_sfx.util.reactive.store.Reducer import nova.monadic_sfx.util.reactive.store.Store import scalafx.scene.Parent +import nova.monadic_sfx.util.reactive.store.MyStore object FXRouter { @@ -42,9 +43,11 @@ object FXRouter { type FXStore[P] = Store[Action[P], State[P]] + type FXStore2[P] = MyStore[Action[P], State[P]] + } -class FXRouter[P]()(implicit E: Encoder[P], D: Decoder[P]) { +final class FXRouter[P]()(implicit E: Encoder[P], D: Decoder[P]) { import FXRouter._ def store(initialPage: P, logger: Logger[Task]): Task[FXStore[P]] = @@ -64,6 +67,24 @@ class FXRouter[P]()(implicit E: Encoder[P], D: Decoder[P]) { } yield store ) + def store2(initialPage: P, logger: Logger[Task]) = + Task.deferAction(implicit s => + for { + mw <- Middlewares.actionLoggerMiddleware[Action[P], State[P]]( + logger, + "RouterStore" + ) + store <- Store.backpressured[Action[P], State[P]]( + Replace(initialPage), + State(initialPage, History(initialPage)), + Reducer.withOptionalEffects[Task, Action[P], State[P]](reducer _), + logger, + Seq(mw) + // Seq(classOf[HistoryEvent[P]]) + ) + } yield store + ) + def reducer( state: State[P], action: Action[P] @@ -113,6 +134,35 @@ class FXRouter[P]()(implicit E: Encoder[P], D: Decoder[P]) { ) } + def render2( + resolver: P => Parent, + transitionDelay: FiniteDuration = 500.millis + )(implicit store: FXStore2[P]) = + store.source + .filter { + case (a, _) => a =!= FXRouter.Forward + } + .filter { + case (a, _) => a =!= FXRouter.Backward + } + .distinctUntilChanged + .flatMap { + case (_, FXRouter.State(p, _)) => + Observable.from(Coeval(new JFXSpinner)) ++ Observable.from( + IOUtils.toTask( + Task + .racePair( + Task.sleep(transitionDelay), + Task(resolver(p)) + ) + .flatMap { + case Left(_ -> fib) => fib.join + case Right(fib -> res) => fib.join >> Task.pure(res) + } + ) + ) + } + def link( page: P, store: FXStore[P] @@ -128,7 +178,7 @@ object Page { final case object UserHome extends Page final case object Todo extends Page - implicit val eqForPage = Eq.fromUniversalEquals[Page] + implicit val eq = Eq.fromUniversalEquals[Page] } // case class State() diff --git a/src/main/scala/nova/monadic_sfx/ui/components/todo/TodoListStore.scala b/src/main/scala/nova/monadic_sfx/ui/components/todo/TodoListStore.scala index b7ed727..8498a23 100644 --- a/src/main/scala/nova/monadic_sfx/ui/components/todo/TodoListStore.scala +++ b/src/main/scala/nova/monadic_sfx/ui/components/todo/TodoListStore.scala @@ -8,6 +8,7 @@ import monix.bio.Task import nova.monadic_sfx.util.reactive.store.Middlewares.actionLoggerMiddleware import nova.monadic_sfx.util.reactive.store.Reducer import nova.monadic_sfx.util.reactive.store.Store +import nova.monadic_sfx.util.reactive.store.MyStore case class Todo(id: Int, content: String) object Todo { @@ -68,16 +69,17 @@ object TodoListStore { case End => (state, None) } - def apply(logger: Logger[Task]): Task[Store[Action, State]] = + def apply(logger: Logger[Task]): Task[MyStore[Action, State]] = Task.deferAction(implicit s => for { logMware <- actionLoggerMiddleware[Action, State](logger, "TodoStore") store <- Store - .createL[Action, State]( + .backpressured[Action, State]( Init, State(Vector.empty[Todo], 0), Reducer.withOptionalEffects(reducer(logger) _), + logger, Seq(logMware) ) } yield store diff --git a/src/main/scala/nova/monadic_sfx/ui/components/todo/TodoListView.scala b/src/main/scala/nova/monadic_sfx/ui/components/todo/TodoListView.scala index 476ab00..61825d6 100644 --- a/src/main/scala/nova/monadic_sfx/ui/components/todo/TodoListView.scala +++ b/src/main/scala/nova/monadic_sfx/ui/components/todo/TodoListView.scala @@ -28,136 +28,147 @@ import scalafx.scene.layout.BorderPane import scalafx.scene.layout.HBox import scalafx.scene.layout.Priority import scalafx.scene.paint.Color +import nova.monadic_sfx.util.controls.ActionObservableDsl +import nova.monadic_sfx.ui.FXComponent object TodoListView { def apply( - store: Store[TodoListStore.Action, TodoListStore.State] - ): Task[Parent] = - Task.deferAction(implicit s => - Task { - implicit val cc = CompositeCancelable() - val todos = store - .map { case (_, state) => state.todos } - .distinctUntilChanged - .map(ObservableBuffer.from) - .doOnNextF(item => Coeval(println(s"Received item: $item"))) - val _selectedItems = ObjectProperty(Seq.empty[Todo]) + store: MyStore[TodoListStore.Action, TodoListStore.State] + ) = + FXComponent(implicit cc => + Task.deferAction(implicit s => + Task { + import ActionObservableDsl._ + // implicit val cc = CompositeCancelable() + val todos = store.source + .map { case (_, state) => state.todos } + .distinctUntilChanged + .map(ObservableBuffer.from) + .doOnNextF(item => Coeval(println(s"Received item: $item"))) + val _selectedItems = ObjectProperty(Seq.empty[Todo]) - new BorderPane { - padding = Insets(5) - hgrow = Priority.Always - val _content = StringProperty("") - center = new HBox { + new BorderPane { padding = Insets(5) - children ++= Seq(new JFXListView[Todo] { - id = "todoList" - hgrow = Priority.Always - def selectedItems = selectionModel().selectedItems.view - styleClass ++= Seq("text-white", "clear-list-view") - selectionModel().selectionMode = SelectionMode.Multiple - selectionModel().selectedItems.observableSeqValue ==> _selectedItems + hgrow = Priority.Always + val _content = StringProperty("") + center = new HBox { + padding = Insets(5) + children ++= Seq(new JFXListView[Todo] { + id = "todoList" + hgrow = Priority.Always + def selectedItems = selectionModel().selectedItems.view + styleClass ++= Seq("text-white", "clear-list-view") + selectionModel().selectionMode = SelectionMode.Multiple + selectionModel().selectedItems.observableSeqValue ==> _selectedItems - items <-- todos.map(_.delegate) + items <-- todos.map(_.delegate) - val emptyCell = ObjectProperty(new HBox) - cellFactory = _ => - new ListCell[Todo] { - val _text = StringProperty("") - val _graphic = ObjectProperty( - new HBox { - styleClass ++= Seq("text-white", "strong", "todo-cell") - children = Seq( - new FontIcon { - iconSize = 20 - iconLiteral = IconLiteral.Gmi10k - fill = Color.White - }, - new Label { - style = "-fx-text-fill: white " - styleClass ++= Seq("text-white", "strong") - text <== _text - } - ) - } - ) + val emptyCell = ObjectProperty(new HBox) + cellFactory = _ => + new ListCell[Todo] { + val _text = StringProperty("") + val _graphic = ObjectProperty( + new HBox { + styleClass ++= Seq("text-white", "strong", "todo-cell") + children = Seq( + new FontIcon { + iconSize = 20 + iconLiteral = IconLiteral.Gmi10k + fill = Color.White + }, + new Label { + style = "-fx-text-fill: white " + styleClass ++= Seq("text-white", "strong") + text <== _text + } + ) + } + ) - item.asOption.map( - _.fold("")(todo => s"${todo.id} - ${todo.content}") - ) ==> _text + item.asOption.map( + _.fold("")(todo => s"${todo.id} - ${todo.content}") + ) ==> _text - graphic <== item.asOption.flatMap( - _.fold(emptyCell)(_ => _graphic) - ) + graphic <== item.asOption.flatMap( + _.fold(emptyCell)(_ => _graphic) + ) - } + } - contextMenu = new ContextMenu { - items ++= Seq( - new MenuItem { - text = "Delete" - obsAction.useIterableEval(_ => - selectedItems - .map(todo => TodoListStore.Delete(todo.id)) - .toList - ) --> store + contextMenu = new ContextMenu { + items ++= Seq( + new MenuItem { + text = "Delete" + // obsAction.(_ => + // selectedItems + // .map(todo => TodoListStore.Delete(todo.id)) + // .toList + // ) --> store.sink - // obsAction.split( - // _.useLazyEval(me.Task(TodoListStore.Delete(0))) --> store, - // _.useLazyEval(me.Task(TodoListStore.Delete(0))) --> store, - // _.useLazyEval(me.Task(TodoListStore.Delete(0))) --> store - // ) - }, - new MenuItem { - text = "Edit" - } - ) - } - }) + obsAction.flatMapIterable(_ => + selectedItems + .map(todo => TodoListStore.Delete(todo.id)) + .toList + ) --> store.sink - } + // obsAction.split( + // _.useLazyEval(me.Task(TodoListStore.Delete(0))) --> store, + // _.useLazyEval(me.Task(TodoListStore.Delete(0))) --> store, + // _.useLazyEval(me.Task(TodoListStore.Delete(0))) --> store + // ) + }, + new MenuItem { + text = "Edit" + } + ) + } + }) - bottom = new HBox { - spacing = 5 - padding = Insets(5) - children = Seq( - new JFXTextField { - id = "todoInputField" - style = "-fx-background-color: rgb(38,38,38);" - styleClass += "text-white" - text ==> _content - vgrow = Priority.Always - }, - new JFXButton { - id = "todoAddButton" - text = "Add" - alignment = Pos.Center - // disable <== _selectedItems.map(_.length > 0) - styleClass = Seq("btn", "btn-primary") - obsAction - .useLazyEval( - me.Task(TodoListStore.Add(_content())) - ) --> store - }, - new JFXButton { - id = "todoEditButton" - text = "Edit" - alignment = Pos.Center - disable <== _selectedItems.map(_.length > 1) - styleClass = Seq("btn", "btn-info") - obsAction.useLazyEval( - me.Task( - TodoListStore.Edit( - _selectedItems - .map(_.headOption.map(_.id).getOrElse(-1)) - .value, - _content() + } + + bottom = new HBox { + spacing = 5 + padding = Insets(5) + children = Seq( + new JFXTextField { + id = "todoInputField" + style = "-fx-background-color: rgb(38,38,38);" + styleClass += "text-white" + text ==> _content + vgrow = Priority.Always + }, + new JFXButton { + id = "todoAddButton" + text = "Add" + alignment = Pos.Center + // disable <== _selectedItems.map(_.length > 0) + styleClass = Seq("btn", "btn-primary") + obsAction + .mapEval(_ => + me.Task(TodoListStore.Add(_content())) + ) --> store.sink + }, + new JFXButton { + id = "todoEditButton" + text = "Edit" + alignment = Pos.Center + disable <== _selectedItems.map(_.length > 1) + styleClass = Seq("btn", "btn-info") + obsAction.mapEval(_ => + me.Task( + TodoListStore.Edit( + _selectedItems + .map(_.headOption.map(_.id).getOrElse(-1)) + .value, + _content() + ) ) - ) - ) --> store - } - ) + ) --> store.sink + } + ) + } } } - } + ) ) } diff --git a/src/main/scala/nova/monadic_sfx/ui/screens/LoginScreen.scala b/src/main/scala/nova/monadic_sfx/ui/screens/LoginScreen.scala index aa7f6c7..df925b3 100644 --- a/src/main/scala/nova/monadic_sfx/ui/screens/LoginScreen.scala +++ b/src/main/scala/nova/monadic_sfx/ui/screens/LoginScreen.scala @@ -3,7 +3,7 @@ import akka.actor.typed._ import io.odin.Logger import monix.eval.Task import nova.monadic_sfx.AppTypes -import nova.monadic_sfx.executors.Schedulers +import nova.monadic_sfx.concurrent.Schedulers import nova.monadic_sfx.http.Requesters import nova.monadic_sfx.http.requests.DummyRequest import nova.monadic_sfx.ui.screens.Screen diff --git a/src/main/scala/nova/monadic_sfx/util/History.scala b/src/main/scala/nova/monadic_sfx/util/History.scala index 270f9f6..2d983b4 100644 --- a/src/main/scala/nova/monadic_sfx/util/History.scala +++ b/src/main/scala/nova/monadic_sfx/util/History.scala @@ -31,7 +31,7 @@ final class MutHistory[T](initValue: T) { } } -final class History[T] private (val state: History.State[T]) { +final case class History[T] private (val state: History.State[T]) { def current = state.values(state.sp) def push(v: T) = { val nextState = diff --git a/src/main/scala/nova/monadic_sfx/util/SynchedObject.scala b/src/main/scala/nova/monadic_sfx/util/SynchedObject.scala deleted file mode 100644 index 856d96d..0000000 --- a/src/main/scala/nova/monadic_sfx/util/SynchedObject.scala +++ /dev/null @@ -1,44 +0,0 @@ -package nova.monadic_sfx.util - -import monix.bio.Task -import monix.bio.UIO -import monix.catnap.MVar - -/** - * Synchronization wrapper for a mutable object - * - * @param obj the mutable object - * @param lock lock for synchronization - */ -class SynchedObject[A](obj: A, lock: MLock) { - - def modify(f: A => Task[Unit]): Task[Unit] = - lock.greenLight(f(obj)) - - def get: Task[A] = lock.greenLight(Task(obj)) -} - -object SynchedObject { - def apply[A](obj: A) = - MVar[Task] - .of(()) - .map(m => new MLock(m)) - .flatMap(lock => Task(new SynchedObject(obj, lock))) -} - -final class MLock(mvar: MVar[Task, Unit]) { - def acquire: Task[Unit] = - mvar.take - - def release: Task[Unit] = - mvar.put(()) - - def greenLight[A](fa: Task[A]): Task[A] = - for { - _ <- acquire - a <- fa.doOnCancel( - release.onErrorHandleWith(ex => UIO(println(ex.getMessage()))) - ) - _ <- release - } yield a -} diff --git a/src/main/scala/nova/monadic_sfx/util/controls/ActionObservable.scala b/src/main/scala/nova/monadic_sfx/util/controls/ActionObservable.scala index 3b8ebfe..8fd0143 100644 --- a/src/main/scala/nova/monadic_sfx/util/controls/ActionObservable.scala +++ b/src/main/scala/nova/monadic_sfx/util/controls/ActionObservable.scala @@ -6,93 +6,112 @@ import monix.execution.cancelables.CompositeCancelable import monix.reactive.Observable import monix.reactive.Observer import monix.{eval => me} +import nova.monadic_sfx.util.reactive.store.Sink2 +import nova.monadic_sfx.implicits._ -class ActionObservableExecutor[T]( - private val delegate: Observable[T] -) extends AnyVal { - def -->(sub: Observer[T])(implicit s: Scheduler) = - delegate - .doOnNext(el => me.Task.deferFuture(sub.onNext(el)) >> me.Task.unit) - .subscribe() +object ActionObservableDsl { + implicit final class ActionObservableExecutor[T]( + private val delegate: Observable[T] + ) { + // def -->[G](sub: G)(implicit s: Scheduler, G: Sink[G, T]) = + // // delegate + // // .doOnNext(el => me.Task.deferFuture(sub.onNext(el)) >> me.Task.unit) + // // .subscribe() + // delegate + // .doOnNext(el => G.offer(sub, el)) + // .subscribe() + def -->( + sink: Sink2[T] + )(implicit s: Scheduler, c: CompositeCancelable) = + // delegate + // .doOnNext(el => me.Task.deferFuture(sub.onNext(el)) >> me.Task.unit) + // .subscribe() + c += delegate + .doOnNext(el => sink.offer(el).toTask) + .subscribe() + + } } -class ActionObservableBuilder[A]( - private val observableAction: Observable[A] -) extends AnyVal { - def useLazyEval[T](v: => me.Task[T]) = - new ActionObservableExecutor[T](observableAction.mapEval(_ => v)) +// final class ActionObservableBuilder[A]( +// private val observableAction: Observable[A] +// ) extends AnyVal { +// def useLazyEval[T](v: me.Task[T]) = +// new ActionObservableExecutor[T](observableAction.mapEval(_ => v)) - def useEval[T](cb: A => me.Task[T]) = - new ActionObservableExecutor[T]( - observableAction.mapEval(cb) - ) +// def useEval[T](cb: A => me.Task[T]) = +// new ActionObservableExecutor[T]( +// observableAction.mapEval(cb) +// ) - def useIterableEval[T](cb: A => collection.immutable.Iterable[T]) = - new ActionObservableExecutor[T]( - observableAction.flatMap(a => - Observable.suspend(Observable.fromIterable(cb(a))) - ) - ) +// def useIterableEval[T](cb: A => collection.immutable.Iterable[T]) = +// new ActionObservableExecutor[T]( +// observableAction.flatMap(a => +// Observable.suspend(Observable.fromIterable(cb(a))) +// ) +// ) - def doOnNext(cb: A => me.Task[Unit]): ActionObservableBuilder[A] = - new ActionObservableBuilder(observableAction.doOnNext(cb)) +// def use = new ActionObservableExecutor(observableAction) - def mapEval[B](cb: A => me.Task[B]) = - new ActionObservableBuilder(observableAction.mapEval(cb)) +// def doOnNext(cb: A => me.Task[Unit]): ActionObservableBuilder[A] = +// new ActionObservableBuilder(observableAction.doOnNext(cb)) - def underlying = observableAction +// def mapEval[B](cb: A => me.Task[B]) = +// new ActionObservableBuilder(observableAction.mapEval(cb)) - // Caution: Experimental stuff below.. +// def underlying = observableAction - def useEval2[B, C](f: A => me.Task[B], g: A => me.Task[C]) = - new ActionObservableExecutor[(B, C)]( - observableAction.publishSelector(conn => - conn - .mapEval(f) - .switchMap(b => - conn.mapEval(a => - for { - c <- g(a) - } yield (b, c) - ) - ) - ) - ) +// // Caution: Experimental stuff below.. - def bifurcate[B, C]( - f: ActionObservableBuilder[A] => B, - g: ActionObservableBuilder[A] => C - )(implicit s: Scheduler) = - observableAction - .publishSelector(conn => - Observable( - Observable.unit.doOnNext(_ => - me.Task(f(new ActionObservableBuilder[A](conn))) >> me.Task.unit - ), - Observable.unit.doOnNext(_ => - me.Task(g(new ActionObservableBuilder[A](conn))) >> me.Task.unit - ) - ).merge - ) - .subscribe() +// def useEval2[B, C](f: A => me.Task[B], g: A => me.Task[C]) = +// new ActionObservableExecutor[(B, C)]( +// observableAction.publishSelector(conn => +// conn +// .mapEval(f) +// .switchMap(b => +// conn.mapEval(a => +// for { +// c <- g(a) +// } yield (b, c) +// ) +// ) +// ) +// ) - def split( - lst: (ActionObservableBuilder[A] => Cancelable)* - )(implicit s: Scheduler): Cancelable = { - val comp = CompositeCancelable() - comp += observableAction - .publishSelector(conn => - Observable( - lst.map(f => - Observable.unit.doOnNext(_ => - me.Task( - comp += f(new ActionObservableBuilder[A](conn)) - ) >> me.Task.unit - ) - ): _* - ).merge - ) - .subscribe() - } +// // def bifurcate[B, C]( +// // f: ActionObservableBuilder[A] => B, +// // g: ActionObservableBuilder[A] => C +// // )(implicit s: Scheduler) = +// // observableAction +// // .publishSelector(conn => +// // Observable( +// // Observable.unit.doOnNext(_ => +// // me.Task(f(new ActionObservableBuilder[A](conn))) >> me.Task.unit +// // ), +// // Observable.unit.doOnNext(_ => +// // me.Task(g(new ActionObservableBuilder[A](conn))) >> me.Task.unit +// // ) +// // ).merge +// // ) +// // .subscribe() -} +// def split( +// lst: (ActionObservableBuilder[A] => Cancelable)* +// )(implicit s: Scheduler): Cancelable = { +// val cc = CompositeCancelable() +// cc += observableAction +// .publishSelector(conn => +// Observable( +// lst.map(f => +// Observable.unit.doOnNext(_ => +// me.Task( +// cc += f(new ActionObservableBuilder[A](conn)) +// ) >> me.Task.unit +// ) +// ): _* +// ).merge +// ) +// .subscribe() +// } + +// } diff --git a/src/main/scala/nova/monadic_sfx/util/controls/JFXButton.scala b/src/main/scala/nova/monadic_sfx/util/controls/JFXButton.scala index 952c6e2..26ef0cc 100644 --- a/src/main/scala/nova/monadic_sfx/util/controls/JFXButton.scala +++ b/src/main/scala/nova/monadic_sfx/util/controls/JFXButton.scala @@ -34,6 +34,6 @@ class JFXButton( def ripplerFill_=(b: jfxsp.Paint): Unit = ripplerFill() = b - def obsAction = new ActionObservableBuilder(this.observableAction) + def obsAction = this.observableAction } diff --git a/src/main/scala/nova/monadic_sfx/util/controls/MenuItem.scala b/src/main/scala/nova/monadic_sfx/util/controls/MenuItem.scala index cb5fefa..81568ed 100644 --- a/src/main/scala/nova/monadic_sfx/util/controls/MenuItem.scala +++ b/src/main/scala/nova/monadic_sfx/util/controls/MenuItem.scala @@ -3,5 +3,5 @@ package nova.monadic_sfx.util.controls import nova.monadic_sfx.implicits._ import scalafx.scene.{control => sfxc} class MenuItem extends sfxc.MenuItem { - def obsAction = new ActionObservableBuilder(this.observableAction) + def obsAction = this.observableAction } diff --git a/src/main/scala/nova/monadic_sfx/util/reactive/store/Sink.scala b/src/main/scala/nova/monadic_sfx/util/reactive/store/Sink.scala new file mode 100644 index 0000000..d3556d6 --- /dev/null +++ b/src/main/scala/nova/monadic_sfx/util/reactive/store/Sink.scala @@ -0,0 +1,49 @@ +package nova.monadic_sfx.util.reactive.store + +import monix.eval.Task +import monix.catnap.ConcurrentQueue +import monix.reactive.Observer + +trait Sink[A, -B] { + def offer(inst: A, b: B): Task[Unit] +} + +object Sink { + implicit def sinkForCq[B] = + new Sink[ConcurrentQueue[Task, B], B] { + + override def offer(queue: ConcurrentQueue[Task, B], b: B): Task[Unit] = + queue.offer(b) + + } + + implicit def sinkForObserver[B] = + new Sink[Observer[B], B] { + + override def offer(inst: Observer[B], b: B): Task[Unit] = + Task.deferFuture(inst.onNext(b)).void + + } + + implicit def sinkForStore[B, C] = + new Sink[Store[B, C], B] { + + override def offer(inst: Store[B, C], b: B): Task[Unit] = + Task.deferFuture(inst.onNext(b)).void + + } + +// implicitly[Sink[Store[Int, Int], Int]] + +} + +trait Sink2[-B] { + def offer(b: B): monix.bio.Task[Unit] +} + +object Sink2 { + def concurrentQueue[B](queue: ConcurrentQueue[monix.bio.Task, B]): Sink2[B] = + new Sink2[B] { + def offer(b: B): monix.bio.Task[Unit] = queue.offer(b) + } +} diff --git a/src/main/scala/nova/monadic_sfx/util/reactive/store/Store.scala b/src/main/scala/nova/monadic_sfx/util/reactive/store/Store.scala index f27cc06..ba506fc 100644 --- a/src/main/scala/nova/monadic_sfx/util/reactive/store/Store.scala +++ b/src/main/scala/nova/monadic_sfx/util/reactive/store/Store.scala @@ -5,6 +5,13 @@ import monix.eval.Coeval import monix.reactive.Observer import monix.reactive.OverflowStrategy import monix.reactive.subjects.ConcurrentSubject +import cats.effect.Resource +import monix.{eval => me} +import monix.catnap.ConcurrentQueue +import nova.monadic_sfx.util.IOUtils +import monix.reactive.Observable +import io.odin.Logger +import nova.monadic_sfx.implicits._ object Store { def createL[A, M]( @@ -54,4 +61,64 @@ object Store { } } + // : Resource[Task, Store[A, M]] + def backpressured[A, M]( + initialAction: A, + initialState: M, + reducer: Reducer[A, M], + logger: Logger[Task], + middlewares: Seq[Middleware[A, M]] = Seq.empty + ) = { + + for { + queue <- ConcurrentQueue[Task].bounded[A](10) + source <- Task.deferAction(implicit s => + Task { + val fold: ((A, M), A) => Task[(A, M)] = { + case ((_, state), action) => + for { + _ <- Task.unit + (newState, effects) = reducer(state, action) + _ <- + effects + .doOnNextF(queue.offer) + .completedL + .toIO + // .start + + } yield action -> newState + } + + val obs = Observable + .repeatEvalF(queue.poll) + .scanEval0F[Task, (A, M)]( + Task.pure(initialAction -> initialState) + )(fold) + + val res = + // middlewares + // .foldLeft(obs) { + // case (obs, middleware) => middleware(obs) + // } + obs + .doOnNextF(i => logger.debug(s"Emitted item 1: $i")) + .behavior(initialAction -> initialState) + .refCount + + // res.subscribe(Observer.empty) + + // .doOnNextF(i => Coeval(println(s"Emitted item 2: $i"))) + + res + } + ) + } yield new MyStore(Sink2.concurrentQueue(queue), source) + + } + } + +final class MyStore[A, M]( + val sink: Sink2[A], + val source: Observable[(A, M)] +) diff --git a/src/main/scala/nova/monadic_sfx/util/reactive/store/package.scala b/src/main/scala/nova/monadic_sfx/util/reactive/store/package.scala index d0fe3f0..50c05d2 100644 --- a/src/main/scala/nova/monadic_sfx/util/reactive/store/package.scala +++ b/src/main/scala/nova/monadic_sfx/util/reactive/store/package.scala @@ -15,4 +15,5 @@ package object store { * @tparam M The Model Type */ type Reducer[A, M] = (M, A) => (M, Observable[A]) + } diff --git a/src/test/scala/BackpressuredStoreTest.scala b/src/test/scala/BackpressuredStoreTest.scala new file mode 100755 index 0000000..1b6cd4d --- /dev/null +++ b/src/test/scala/BackpressuredStoreTest.scala @@ -0,0 +1,46 @@ +import org.scalatest.funsuite.AnyFunSuite +import scala.concurrent.duration._ +import monix.bio.Task +import nova.monadic_sfx.util.reactive.store.Store +import nova.monadic_sfx.util.History +import nova.monadic_sfx.util.reactive.store.Reducer +import nova.monadic_sfx.ui.components.router.Page +import nova.monadic_sfx.util.IOUtils.toIO +import monix.reactive.Observable +import cats.effect.Resource +import nova.monadic_sfx.ui.components.router.FXRouter +import io.odin.consoleLogger +class BackpressuredStoreTest extends AnyFunSuite { + import monix.execution.Scheduler.Implicits.global + val logger = consoleLogger[Task]() + test("backpressed store test") { + (for { + _ <- + Resource + .eval(new FXRouter[Page].store2(Page.Home, logger)) + .use { myStore => + for { + _ <- toIO( + myStore.source + .doOnNextF(_ => Task.sleep(1.seconds)) + .doOnNextF(item => logger.debug(s"Task1: Got Item $item")) + .completedL + ).startAndForget + _ <- toIO( + myStore.source + .doOnNextF(_ => Task.sleep(3.seconds)) + .doOnNextF(item => logger.debug(s"Task2: Got Item $item")) + .completedL + ).startAndForget + _ <- myStore.sink.offer(FXRouter.Replace(Page.Home)) + _ <- myStore.sink.offer(FXRouter.Replace(Page.Todo)) + _ <- myStore.sink.offer(FXRouter.Replace(Page.UserHome)) + _ <- myStore.sink.offer(FXRouter.Backward) + _ <- Task.sleep(25.seconds) + } yield () + + } + // _ <- Task.sleep(2.seconds) + } yield ()).runSyncUnsafe(26.seconds) + } +} diff --git a/src/test/scala/ObservableTest.scala b/src/test/scala/ObservableTest.scala new file mode 100755 index 0000000..0bec256 --- /dev/null +++ b/src/test/scala/ObservableTest.scala @@ -0,0 +1,104 @@ +import org.scalatest.funsuite.AnyFunSuite +import monix.catnap.ConcurrentQueue +import monix.eval.Task +import monix.reactive.Observable +import scala.concurrent.duration._ + +class ObservableTest extends AnyFunSuite { + import monix.execution.Scheduler.Implicits.global + test("observable state machine") { + (for { + _ <- Task.unit + sm <- MonixStateMachine() + _ <- + Task + .parSequence( + List( + sm.source + .doOnNext(item => Task(println(s"Task 1: Got $item"))) + .completedL, + sm.source + .doOnNext(item => Task(println(s"Task 2: Got $item"))) + .completedL, + sm.tell(MonixStateMachine.Start) >> + Observable + // .interval(1.second) + .interval(500.millis) + .doOnNext(_ => sm.tell(MonixStateMachine.Incr)) + .takeUntil(Observable.unit.delayExecution(5.seconds)) + .completedL >> + sm.tell(MonixStateMachine.Stop) >> + sm.tell(MonixStateMachine.Incr) + ) + ) + .void + .start + .bracket(_ => Task.sleep(8.seconds))(_.cancel) + } yield ()).runSyncUnsafe(10.seconds) + } +} + +class MonixStateMachine( + queue: ConcurrentQueue[Task, MonixStateMachine.Command], + val source: Observable[(MonixStateMachine.State, MonixStateMachine.Data)] +) { + import MonixStateMachine._ + + def tell(item: Command) = queue.offer(item) + +} +object MonixStateMachine { + + sealed trait State + case object Idle extends State + case object Active extends State + + sealed trait Command + case object Incr extends Command + case object Start extends Command + case object Stop extends Command + + case class Data(num: Int) + + private def source(queue: ConcurrentQueue[Task, Command]) = + Task.deferAction(implicit s => + Task( + Observable + .repeatEvalF(queue.poll) + .scan((Idle: State, Data(0))) { + case ((state, data), command) => + state match { + case Idle => + println("Entered idle") + command match { + case Incr => + println("Not active ") + (Idle, data) + case Start => (Active, data) + case Stop => + println("Already stopped") + (Idle, data) + + } + case Active => + println("Entered Active") + command match { + case Incr => (Active, data.copy(num = data.num + 1)) + case Start => + println("Already started") + (Active, data) + case Stop => (Idle, data) + } + } + } + .publish + .refCount + ) + ) + + def apply() = + for { + queue <- ConcurrentQueue.bounded[Task, Command](10) + source <- source(queue) + } yield new MonixStateMachine(queue, source) +} diff --git a/src/test/scala/WebSocketTest.scala b/src/test/scala/WebSocketTest.scala index cb41ab0..cf06785 100644 --- a/src/test/scala/WebSocketTest.scala +++ b/src/test/scala/WebSocketTest.scala @@ -9,6 +9,7 @@ import sttp.model.ws.WebSocketFrame import scala.concurrent.duration._ import nova.monadic_sfx.implicits._ import monix.catnap.ConcurrentQueue +import cats.syntax.all._ class WebSocketTest extends AnyFunSuite with BeforeAndAfterAll { implicit val sched = Scheduler.global implicit val backend = HttpClientMonixBackend().runSyncUnsafe() @@ -18,6 +19,11 @@ class WebSocketTest extends AnyFunSuite with BeforeAndAfterAll { .map(_.result) .runSyncUnsafe() + override def afterAll() = { + ws.close.runSyncUnsafe() + backend.close() + } + test("open websocket") { (for { isOpen <- ws.isOpen @@ -39,8 +45,8 @@ class WebSocketTest extends AnyFunSuite with BeforeAndAfterAll { queue <- ConcurrentQueue.bounded[Task, Int](10) _ <- ws.observableSource - .filter(_.isRight) - .map(_.right.get) + .map(_.toOption) + .mapFilter(identity) .doOnNext(s => Task(println(s"Received item: $s")) >> s.toIntOption.fold(Task.unit)(queue.offer) @@ -53,9 +59,4 @@ class WebSocketTest extends AnyFunSuite with BeforeAndAfterAll { _ <- Task(assert(items != Seq(1, 2, 3, 4, 5, 6))) } yield ()).runSyncUnsafe(10.seconds) } - - override def afterAll() = { - ws.close.runSyncUnsafe() - backend.close() - } }