From e2f5dc15c37cbe5a04c71b43b5f9e50ba7e3496e Mon Sep 17 00:00:00 2001 From: Rohan Sircar Date: Thu, 3 Sep 2020 22:07:09 +0530 Subject: [PATCH] Made changes to actors Change actor system from untyped to typed Made fx actor initialization in FxApp to enable proper shutdown --- src/main/resources/application.conf | 3 +- src/main/scala/nova/monadic_sfx/Main.scala | 7 +- .../nova/monadic_sfx/actors/ActorModule.scala | 87 ++++++++++++------- .../nova/monadic_sfx/actors/TestActor.scala | 16 ++++ .../nova/monadic_sfx/http/HttpModule.scala | 5 +- .../scala/nova/monadic_sfx/ui/MyFxApp.scala | 83 +++++++++++++----- .../scala/nova/monadic_sfx/ui/UiModule.scala | 14 +-- .../monadic_sfx/ui/screens/HomeScreen.scala | 5 +- .../monadic_sfx/ui/screens/LoginScreen.scala | 6 +- 9 files changed, 156 insertions(+), 70 deletions(-) diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 929257c..863bcdd 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -2,4 +2,5 @@ javafx-dispatcher { type = "Dispatcher" executor = "nova.monadic_sfx.executors.JavaFXEventThreadExecutorServiceConfigurator" throughput = 1 -} \ No newline at end of file +} +akka.jvm-exit-on-fatal-error = on \ No newline at end of file diff --git a/src/main/scala/nova/monadic_sfx/Main.scala b/src/main/scala/nova/monadic_sfx/Main.scala index 1ec02ea..a81c0e4 100644 --- a/src/main/scala/nova/monadic_sfx/Main.scala +++ b/src/main/scala/nova/monadic_sfx/Main.scala @@ -13,6 +13,7 @@ import io.odin.monix._ import monix.eval.TaskApp import cats.effect.ExitCode import cats.implicits._ +import com.softwaremill.macwire._ object Main extends MainModule with TaskApp { @@ -24,10 +25,10 @@ object Main extends MainModule with TaskApp { // clock <- Resource.liftF(Task(Task.clock)) logger <- consoleLogger().withAsync() backend <- AsyncHttpClientMonixBackend.resource() - actorSystem <- actorResource(logger) - reqs <- Resource.liftF(Task(requesters(backend, actorSystem))) + actorSystem <- actorSystemResource(logger) + reqs <- Resource.liftF(Task(wireWith(requesters _))) schedulers <- Resource.liftF(Task(new Schedulers())) - fxApp <- fxAppResource(logger, backend, actorSystem, reqs, schedulers) + fxApp <- wireWith(fxAppResource _) } yield (fxApp) appResource .use(fxApp => Task(fxApp.main(args.toArray))) diff --git a/src/main/scala/nova/monadic_sfx/actors/ActorModule.scala b/src/main/scala/nova/monadic_sfx/actors/ActorModule.scala index 091b036..9e535ca 100644 --- a/src/main/scala/nova/monadic_sfx/actors/ActorModule.scala +++ b/src/main/scala/nova/monadic_sfx/actors/ActorModule.scala @@ -3,47 +3,72 @@ package nova.monadic_sfx.actors import io.odin.Logger import monix.eval.Task import cats.effect.Resource -import akka.actor._ -import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.Behaviors import com.softwaremill.macwire._ -import akka.actor.typed.Behavior -import akka.actor.typed.DispatcherSelector +import akka.util.Timeout +import scala.concurrent.duration._ +import scala.concurrent.Future +import akka.actor.typed._ +import akka.actor.typed.scaladsl.AskPattern._ +import scala.concurrent.Await +import nova.monadic_sfx.executors.Schedulers trait ActorModule { - def actorResource(logger: Logger[Task]): Resource[Task, ActorSystem] = + import scala.concurrent.ExecutionContext + + implicit val timeout: Timeout = Timeout(3.seconds) + + def actorSystemResource( + logger: Logger[Task] + ): Resource[Task, ActorSystem[SpawnProtocol.Command]] = Resource.make(logger.info("Creating Actor System") >> Task { - ActorSystem( - name = "FXActorSystem" - ) + ActorSystem(HelloWorldMain(), name = "FXActorSystem") })(sys => - logger.info("Shutting down actor system") >> Task.fromFuture( + logger.info("Shutting down actor system") >> Task( sys.terminate() ) >> logger.info("Actor System terminated") ) - def testActor( - system: ActorSystem - ): akka.actor.typed.ActorRef[Counter.Command] = { - val behaviour: Behavior[Counter.Command] = - Behaviors.setup(context => wire[Counter]) - system.spawn( - behaviour, - "CounterActor", - DispatcherSelector.fromConfig("javafx-dispatcher") - ) - } + // def actorsResource( + // system: ActorSystem[SpawnProtocol.Command], + // logger: Logger[Task], + // schedulers: Schedulers + // ): Resource[Task, Task[ActorRef[Counter.Command]]] = { + // implicit val ec: ExecutionContext = system.executionContext + // implicit val scheduler = system.scheduler + // Resource.make( + // Task { + // val actor = Task.deferFuture { + // system.ask[ActorRef[Counter.Command]]( + // SpawnProtocol.Spawn( + // behavior = Counter(), + // name = "counterActor", + // // DispatcherSelector.fromConfig("javafx-dispatcher"), + // // Props.empty, + // _ + // ) + // ) + // } + // // system. + // actor + // } + // )(actorTask => + // for { + // actor <- actorTask + // _ <- logger.info("Stopping actor counter") + // t <- Task(actor ! Counter.Stop) + // _ <- logger.info("Counter actor stopped") + // } yield () + // ) + // } + +} +object HelloWorldMain { + def apply(): Behavior[SpawnProtocol.Command] = + Behaviors.setup { context => + // Start initial tasks + // context.spawn(...) - def testActorL( - system: ActorSystem - ): Task[akka.actor.typed.ActorRef[Counter.Command]] = - Task { - val behaviour: Behavior[Counter.Command] = - Behaviors.setup(context => wire[Counter]) - system.spawn( - behaviour, - "CounterActor", - DispatcherSelector.fromConfig("javafx-dispatcher") - ) + SpawnProtocol() } } diff --git a/src/main/scala/nova/monadic_sfx/actors/TestActor.scala b/src/main/scala/nova/monadic_sfx/actors/TestActor.scala index e28ad2a..15ec3ca 100644 --- a/src/main/scala/nova/monadic_sfx/actors/TestActor.scala +++ b/src/main/scala/nova/monadic_sfx/actors/TestActor.scala @@ -8,6 +8,7 @@ object Counter { case object Increment extends Command final case class GetValue(replyTo: ActorRef[Value]) extends Command final case class Value(n: Int) + final case object Stop extends Command def apply(): Behavior[Command] = { Behaviors.setup(context => new Counter(context)) @@ -28,7 +29,22 @@ class Counter(context: ActorContext[Counter.Command]) this case GetValue(replyTo) => replyTo ! Value(n) + this + case Stop => + context.log.info("Recieved shutdown counter actor") Behaviors.stopped } } + + override def onSignal: PartialFunction[Signal, Behavior[Counter.Command]] = + PartialFunction.fromFunction((signal: Signal) => { + signal match { + case _: Terminated => + context.log.info("Recieved shutdown counter actor terminated") + this + case PostStop => + context.log.info("Recieved shutdown counter actor poststop") + this + } + }) } diff --git a/src/main/scala/nova/monadic_sfx/http/HttpModule.scala b/src/main/scala/nova/monadic_sfx/http/HttpModule.scala index 850d2aa..553f1ca 100644 --- a/src/main/scala/nova/monadic_sfx/http/HttpModule.scala +++ b/src/main/scala/nova/monadic_sfx/http/HttpModule.scala @@ -2,11 +2,12 @@ package nova.monadic_sfx.http import nova.monadic_sfx.http.requests.DummyRequest import nova.monadic_sfx.AppTypes +import akka.actor.typed._ trait HttpModule { def requesters( backend: AppTypes.HttpBackend, - system: akka.actor.ActorSystem + system: ActorSystem[SpawnProtocol.Command] ): Requesters = { import com.softwaremill.macwire._ val dummyRequester = wire[DummyRequest] @@ -19,7 +20,7 @@ class Requesters(val dummyRequester: DummyRequest) // object Requesters { // def apply( // backend: AppTypes.HttpBackend, -// system: akka.actor.ActorSystem +// system: akka.actor.typed.ActorSystem[SpawnProtocol.Command] // ): Requesters = { // import com.softwaremill.macwire._ // val dummyRequester = wire[DummyRequest] diff --git a/src/main/scala/nova/monadic_sfx/ui/MyFxApp.scala b/src/main/scala/nova/monadic_sfx/ui/MyFxApp.scala index a5ce284..a178a56 100644 --- a/src/main/scala/nova/monadic_sfx/ui/MyFxApp.scala +++ b/src/main/scala/nova/monadic_sfx/ui/MyFxApp.scala @@ -6,39 +6,46 @@ import monix.execution.Scheduler import monix.eval.Task import nova.monadic_sfx.screens.LoginScreen import nova.monadic_sfx.AppTypes -import scalafx.application.Platform import scala.concurrent.duration._ import io.odin.Logger import monix.execution.Callback import com.softwaremill.macwire._ import nova.monadic_sfx.http.Requesters -import akka.actor._ -import akka.actor.typed.Behavior -import akka.actor.typed.scaladsl.adapter._ -import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed._ import nova.monadic_sfx.actors.Counter -import akka.actor.typed.DispatcherSelector +import akka.util.Timeout class MyFxApp( logger: Logger[Task], backend: AppTypes.HttpBackend, - actorSystem: akka.actor.ActorSystem, + actorSystem: ActorSystem[SpawnProtocol.Command], requesters: Requesters, schedulers: Schedulers ) extends JFXApp { implicit lazy val defaultScheduler: Scheduler = schedulers.fx + lazy val fxActor: Task[ActorRef[Counter.Command]] = wireWith( + MyFxApp.makeCounterActor _ + ) + lazy val application = for { - appStage <- Task( - UiModule.makePrimaryStage(backend, actorSystem) - ) + appStage <- Task(wireWith(UiModule.makePrimaryStage _)) + // _ <- Task { // val counterActor = testActor(actorSystem) // counterActor ! (Counter.Increment) // } + // ta <- testActor2(actorSystem) + // actor <- + // actorTask.bracket(actor => Task(actor ! (Counter.Increment)))(actor => + // Task(actor ! (Counter.Stop)) + // ) + // actor <- actorTask + actor <- fxActor + _ <- Task(actor ! (Counter.Increment)) _ <- Task { stage = appStage } _ <- Task.sleep(2.seconds) loginScene <- wire[LoginScreen].render @@ -54,17 +61,17 @@ class MyFxApp( } } yield () - def testActor( - system: ActorSystem - ): akka.actor.typed.ActorRef[Counter.Command] = { - val behaviour: Behavior[Counter.Command] = - Behaviors.setup(context => wire[Counter]) - system.spawn( - behaviour, - "CounterActor", - DispatcherSelector.fromConfig("javafx-dispatcher") - ) - } +// def testActor( +// system: ActorSystem +// ): akka.actor.typed.ActorRef[Counter.Command] = { +// val behaviour: Behavior[Counter.Command] = +// Behaviors.setup(context => wire[Counter]) +// system.spawn( +// behaviour, +// "CounterActor", +// DispatcherSelector.fromConfig("javafx-dispatcher") +// ) +// } application.timed.runAsync( new Callback[Throwable, (FiniteDuration, Unit)] { @@ -85,6 +92,38 @@ class MyFxApp( ) override def stopApp() = { - Platform.exit() + val stop = for { + actor <- fxActor + _ <- logger.info("Stopping actor counter") + // _ <- Task.fromFuture { actor.ask[Counter.Value](Counter.GetValue) } + t <- Task(actor ! Counter.Stop) + // _ <- Task.sleep(1.second) + _ <- logger.info("Counter actor stopped") + } yield () + stop.runAsyncAndForget + // Platform.exit() + } +} + +object MyFxApp { + def makeCounterActor( + system: ActorSystem[SpawnProtocol.Command] + ): Task[ActorRef[Counter.Command]] = { + import akka.actor.typed.scaladsl.AskPattern._ + import scala.concurrent.ExecutionContext + + implicit val timeout: Timeout = Timeout(3.seconds) + implicit val ec: ExecutionContext = system.executionContext + implicit val scheduler = system.scheduler + Task.fromFuture { + system.ask( + SpawnProtocol.Spawn( + behavior = wireWith(Counter.apply _), + name = "counterActor", + DispatcherSelector.fromConfig("javafx-dispatcher"), + _ + ) + ) + } } } diff --git a/src/main/scala/nova/monadic_sfx/ui/UiModule.scala b/src/main/scala/nova/monadic_sfx/ui/UiModule.scala index 486e7f2..d032b32 100644 --- a/src/main/scala/nova/monadic_sfx/ui/UiModule.scala +++ b/src/main/scala/nova/monadic_sfx/ui/UiModule.scala @@ -9,26 +9,26 @@ import cats.effect.Resource import com.softwaremill.macwire._ import nova.monadic_sfx.http.Requesters import nova.monadic_sfx.executors.Schedulers +import akka.actor.typed._ trait UiModule { def fxAppResource( logger: Logger[Task], backend: AppTypes.HttpBackend, - actorSystem: akka.actor.ActorSystem, + actorSystem: ActorSystem[SpawnProtocol.Command], requesters: Requesters, schedulers: Schedulers ): Resource[Task, JFXApp] = - Resource.make(logger.info("Creating FX Application") >> Task { - val app: JFXApp = wire[MyFxApp] - app - })(app => logger.info("Stopping FX Application") >> Task(app.stopApp())) - + Resource.make(for { + _ <- logger.info("Creating FX Application") + app <- Task { wire[MyFxApp] } + } yield (app))(app => logger.info("Stopping FX Application")) } object UiModule { def makePrimaryStage( backend: AppTypes.HttpBackend, - actorSystem: akka.actor.ActorSystem + actorSystem: ActorSystem[SpawnProtocol.Command] ) = { new PrimaryStage { scene = new DefaultUI().scene diff --git a/src/main/scala/nova/monadic_sfx/ui/screens/HomeScreen.scala b/src/main/scala/nova/monadic_sfx/ui/screens/HomeScreen.scala index 72862b7..ec27705 100644 --- a/src/main/scala/nova/monadic_sfx/ui/screens/HomeScreen.scala +++ b/src/main/scala/nova/monadic_sfx/ui/screens/HomeScreen.scala @@ -11,10 +11,11 @@ import scalafx.scene.Parent import scalafx.application.JFXApp.PrimaryStage import monix.eval.Task import nova.monadic_sfx.util.Action +import akka.actor.typed._ class HomeScreen( backend: AppTypes.HttpBackend, - system: akka.actor.ActorSystem, + system: ActorSystem[SpawnProtocol.Command], onLogout: () => Task[Unit] ) { private lazy val root = Task.deferAction { implicit s => @@ -38,7 +39,7 @@ class HomeScreen( object HomeScreen { def apply( backend: AppTypes.HttpBackend, - system: akka.actor.ActorSystem, + system: ActorSystem[SpawnProtocol.Command], onLogout: () => Task[Unit] ): Task[Parent] = new HomeScreen(backend, system, onLogout).render diff --git a/src/main/scala/nova/monadic_sfx/ui/screens/LoginScreen.scala b/src/main/scala/nova/monadic_sfx/ui/screens/LoginScreen.scala index d505ad0..04fc5f8 100644 --- a/src/main/scala/nova/monadic_sfx/ui/screens/LoginScreen.scala +++ b/src/main/scala/nova/monadic_sfx/ui/screens/LoginScreen.scala @@ -20,6 +20,7 @@ import nova.monadic_sfx.models.HttpBinResponse import sttp.client.ResponseError import nova.monadic_sfx.executors.Schedulers import nova.monadic_sfx.ui.screens.Screen +import akka.actor.typed._ // import io.odin.syntax._ // import _root_.monix.eval.Task // import io.odin.monix._ @@ -29,8 +30,9 @@ class LoginScreen( override protected val appStage: PrimaryStage, logger: Logger[Task], backend: AppTypes.HttpBackend, - system: akka.actor.ActorSystem, + system: ActorSystem[SpawnProtocol.Command], requesters: Requesters, + // dm: DummyRequest, schedulers: Schedulers ) extends Screen { val dummyRequester: DummyRequest = requesters.dummyRequester @@ -103,6 +105,6 @@ class LoginScreen( // def apply( // appStage: PrimaryStage, // backend: AppTypes.HttpBackend, -// system: akka.actor.ActorSystem +// system: ActorSystem[SpawnProtocol.Command] // ) = new LoginScreen(appStage, backend, system).render // }