Made changes to actors

Change actor system from untyped to typed
Made fx actor initialization in FxApp to enable proper shutdown
This commit is contained in:
Rohan Sircar 2020-09-03 22:07:09 +05:30
parent b6823aebfa
commit e2f5dc15c3
9 changed files with 156 additions and 70 deletions

View File

@ -2,4 +2,5 @@ javafx-dispatcher {
type = "Dispatcher" type = "Dispatcher"
executor = "nova.monadic_sfx.executors.JavaFXEventThreadExecutorServiceConfigurator" executor = "nova.monadic_sfx.executors.JavaFXEventThreadExecutorServiceConfigurator"
throughput = 1 throughput = 1
} }
akka.jvm-exit-on-fatal-error = on

View File

@ -13,6 +13,7 @@ import io.odin.monix._
import monix.eval.TaskApp import monix.eval.TaskApp
import cats.effect.ExitCode import cats.effect.ExitCode
import cats.implicits._ import cats.implicits._
import com.softwaremill.macwire._
object Main extends MainModule with TaskApp { object Main extends MainModule with TaskApp {
@ -24,10 +25,10 @@ object Main extends MainModule with TaskApp {
// clock <- Resource.liftF(Task(Task.clock)) // clock <- Resource.liftF(Task(Task.clock))
logger <- consoleLogger().withAsync() logger <- consoleLogger().withAsync()
backend <- AsyncHttpClientMonixBackend.resource() backend <- AsyncHttpClientMonixBackend.resource()
actorSystem <- actorResource(logger) actorSystem <- actorSystemResource(logger)
reqs <- Resource.liftF(Task(requesters(backend, actorSystem))) reqs <- Resource.liftF(Task(wireWith(requesters _)))
schedulers <- Resource.liftF(Task(new Schedulers())) schedulers <- Resource.liftF(Task(new Schedulers()))
fxApp <- fxAppResource(logger, backend, actorSystem, reqs, schedulers) fxApp <- wireWith(fxAppResource _)
} yield (fxApp) } yield (fxApp)
appResource appResource
.use(fxApp => Task(fxApp.main(args.toArray))) .use(fxApp => Task(fxApp.main(args.toArray)))

View File

@ -3,47 +3,72 @@ package nova.monadic_sfx.actors
import io.odin.Logger import io.odin.Logger
import monix.eval.Task import monix.eval.Task
import cats.effect.Resource import cats.effect.Resource
import akka.actor._
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import com.softwaremill.macwire._ import com.softwaremill.macwire._
import akka.actor.typed.Behavior import akka.util.Timeout
import akka.actor.typed.DispatcherSelector import scala.concurrent.duration._
import scala.concurrent.Future
import akka.actor.typed._
import akka.actor.typed.scaladsl.AskPattern._
import scala.concurrent.Await
import nova.monadic_sfx.executors.Schedulers
trait ActorModule { trait ActorModule {
def actorResource(logger: Logger[Task]): Resource[Task, ActorSystem] = import scala.concurrent.ExecutionContext
implicit val timeout: Timeout = Timeout(3.seconds)
def actorSystemResource(
logger: Logger[Task]
): Resource[Task, ActorSystem[SpawnProtocol.Command]] =
Resource.make(logger.info("Creating Actor System") >> Task { Resource.make(logger.info("Creating Actor System") >> Task {
ActorSystem( ActorSystem(HelloWorldMain(), name = "FXActorSystem")
name = "FXActorSystem"
)
})(sys => })(sys =>
logger.info("Shutting down actor system") >> Task.fromFuture( logger.info("Shutting down actor system") >> Task(
sys.terminate() sys.terminate()
) >> logger.info("Actor System terminated") ) >> logger.info("Actor System terminated")
) )
def testActor( // def actorsResource(
system: ActorSystem // system: ActorSystem[SpawnProtocol.Command],
): akka.actor.typed.ActorRef[Counter.Command] = { // logger: Logger[Task],
val behaviour: Behavior[Counter.Command] = // schedulers: Schedulers
Behaviors.setup(context => wire[Counter]) // ): Resource[Task, Task[ActorRef[Counter.Command]]] = {
system.spawn( // implicit val ec: ExecutionContext = system.executionContext
behaviour, // implicit val scheduler = system.scheduler
"CounterActor", // Resource.make(
DispatcherSelector.fromConfig("javafx-dispatcher") // Task {
) // val actor = Task.deferFuture {
} // system.ask[ActorRef[Counter.Command]](
// SpawnProtocol.Spawn(
// behavior = Counter(),
// name = "counterActor",
// // DispatcherSelector.fromConfig("javafx-dispatcher"),
// // Props.empty,
// _
// )
// )
// }
// // system.
// actor
// }
// )(actorTask =>
// for {
// actor <- actorTask
// _ <- logger.info("Stopping actor counter")
// t <- Task(actor ! Counter.Stop)
// _ <- logger.info("Counter actor stopped")
// } yield ()
// )
// }
def testActorL( }
system: ActorSystem object HelloWorldMain {
): Task[akka.actor.typed.ActorRef[Counter.Command]] = def apply(): Behavior[SpawnProtocol.Command] =
Task { Behaviors.setup { context =>
val behaviour: Behavior[Counter.Command] = // Start initial tasks
Behaviors.setup(context => wire[Counter]) // context.spawn(...)
system.spawn(
behaviour, SpawnProtocol()
"CounterActor",
DispatcherSelector.fromConfig("javafx-dispatcher")
)
} }
} }

View File

@ -8,6 +8,7 @@ object Counter {
case object Increment extends Command case object Increment extends Command
final case class GetValue(replyTo: ActorRef[Value]) extends Command final case class GetValue(replyTo: ActorRef[Value]) extends Command
final case class Value(n: Int) final case class Value(n: Int)
final case object Stop extends Command
def apply(): Behavior[Command] = { def apply(): Behavior[Command] = {
Behaviors.setup(context => new Counter(context)) Behaviors.setup(context => new Counter(context))
@ -28,7 +29,22 @@ class Counter(context: ActorContext[Counter.Command])
this this
case GetValue(replyTo) => case GetValue(replyTo) =>
replyTo ! Value(n) replyTo ! Value(n)
this
case Stop =>
context.log.info("Recieved shutdown counter actor")
Behaviors.stopped Behaviors.stopped
} }
} }
override def onSignal: PartialFunction[Signal, Behavior[Counter.Command]] =
PartialFunction.fromFunction((signal: Signal) => {
signal match {
case _: Terminated =>
context.log.info("Recieved shutdown counter actor terminated")
this
case PostStop =>
context.log.info("Recieved shutdown counter actor poststop")
this
}
})
} }

View File

@ -2,11 +2,12 @@ package nova.monadic_sfx.http
import nova.monadic_sfx.http.requests.DummyRequest import nova.monadic_sfx.http.requests.DummyRequest
import nova.monadic_sfx.AppTypes import nova.monadic_sfx.AppTypes
import akka.actor.typed._
trait HttpModule { trait HttpModule {
def requesters( def requesters(
backend: AppTypes.HttpBackend, backend: AppTypes.HttpBackend,
system: akka.actor.ActorSystem system: ActorSystem[SpawnProtocol.Command]
): Requesters = { ): Requesters = {
import com.softwaremill.macwire._ import com.softwaremill.macwire._
val dummyRequester = wire[DummyRequest] val dummyRequester = wire[DummyRequest]
@ -19,7 +20,7 @@ class Requesters(val dummyRequester: DummyRequest)
// object Requesters { // object Requesters {
// def apply( // def apply(
// backend: AppTypes.HttpBackend, // backend: AppTypes.HttpBackend,
// system: akka.actor.ActorSystem // system: akka.actor.typed.ActorSystem[SpawnProtocol.Command]
// ): Requesters = { // ): Requesters = {
// import com.softwaremill.macwire._ // import com.softwaremill.macwire._
// val dummyRequester = wire[DummyRequest] // val dummyRequester = wire[DummyRequest]

View File

@ -6,39 +6,46 @@ import monix.execution.Scheduler
import monix.eval.Task import monix.eval.Task
import nova.monadic_sfx.screens.LoginScreen import nova.monadic_sfx.screens.LoginScreen
import nova.monadic_sfx.AppTypes import nova.monadic_sfx.AppTypes
import scalafx.application.Platform
import scala.concurrent.duration._ import scala.concurrent.duration._
import io.odin.Logger import io.odin.Logger
import monix.execution.Callback import monix.execution.Callback
import com.softwaremill.macwire._ import com.softwaremill.macwire._
import nova.monadic_sfx.http.Requesters import nova.monadic_sfx.http.Requesters
import akka.actor._ import akka.actor.typed._
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.Behaviors
import nova.monadic_sfx.actors.Counter import nova.monadic_sfx.actors.Counter
import akka.actor.typed.DispatcherSelector import akka.util.Timeout
class MyFxApp( class MyFxApp(
logger: Logger[Task], logger: Logger[Task],
backend: AppTypes.HttpBackend, backend: AppTypes.HttpBackend,
actorSystem: akka.actor.ActorSystem, actorSystem: ActorSystem[SpawnProtocol.Command],
requesters: Requesters, requesters: Requesters,
schedulers: Schedulers schedulers: Schedulers
) extends JFXApp { ) extends JFXApp {
implicit lazy val defaultScheduler: Scheduler = schedulers.fx implicit lazy val defaultScheduler: Scheduler = schedulers.fx
lazy val fxActor: Task[ActorRef[Counter.Command]] = wireWith(
MyFxApp.makeCounterActor _
)
lazy val application = lazy val application =
for { for {
appStage <- Task( appStage <- Task(wireWith(UiModule.makePrimaryStage _))
UiModule.makePrimaryStage(backend, actorSystem)
)
// _ <- Task { // _ <- Task {
// val counterActor = testActor(actorSystem) // val counterActor = testActor(actorSystem)
// counterActor ! (Counter.Increment) // counterActor ! (Counter.Increment)
// } // }
// ta <- testActor2(actorSystem)
// actor <-
// actorTask.bracket(actor => Task(actor ! (Counter.Increment)))(actor =>
// Task(actor ! (Counter.Stop))
// )
// actor <- actorTask
actor <- fxActor
_ <- Task(actor ! (Counter.Increment))
_ <- Task { stage = appStage } _ <- Task { stage = appStage }
_ <- Task.sleep(2.seconds) _ <- Task.sleep(2.seconds)
loginScene <- wire[LoginScreen].render loginScene <- wire[LoginScreen].render
@ -54,17 +61,17 @@ class MyFxApp(
} }
} yield () } yield ()
def testActor( // def testActor(
system: ActorSystem // system: ActorSystem
): akka.actor.typed.ActorRef[Counter.Command] = { // ): akka.actor.typed.ActorRef[Counter.Command] = {
val behaviour: Behavior[Counter.Command] = // val behaviour: Behavior[Counter.Command] =
Behaviors.setup(context => wire[Counter]) // Behaviors.setup(context => wire[Counter])
system.spawn( // system.spawn(
behaviour, // behaviour,
"CounterActor", // "CounterActor",
DispatcherSelector.fromConfig("javafx-dispatcher") // DispatcherSelector.fromConfig("javafx-dispatcher")
) // )
} // }
application.timed.runAsync( application.timed.runAsync(
new Callback[Throwable, (FiniteDuration, Unit)] { new Callback[Throwable, (FiniteDuration, Unit)] {
@ -85,6 +92,38 @@ class MyFxApp(
) )
override def stopApp() = { override def stopApp() = {
Platform.exit() val stop = for {
actor <- fxActor
_ <- logger.info("Stopping actor counter")
// _ <- Task.fromFuture { actor.ask[Counter.Value](Counter.GetValue) }
t <- Task(actor ! Counter.Stop)
// _ <- Task.sleep(1.second)
_ <- logger.info("Counter actor stopped")
} yield ()
stop.runAsyncAndForget
// Platform.exit()
}
}
object MyFxApp {
def makeCounterActor(
system: ActorSystem[SpawnProtocol.Command]
): Task[ActorRef[Counter.Command]] = {
import akka.actor.typed.scaladsl.AskPattern._
import scala.concurrent.ExecutionContext
implicit val timeout: Timeout = Timeout(3.seconds)
implicit val ec: ExecutionContext = system.executionContext
implicit val scheduler = system.scheduler
Task.fromFuture {
system.ask(
SpawnProtocol.Spawn(
behavior = wireWith(Counter.apply _),
name = "counterActor",
DispatcherSelector.fromConfig("javafx-dispatcher"),
_
)
)
}
} }
} }

View File

@ -9,26 +9,26 @@ import cats.effect.Resource
import com.softwaremill.macwire._ import com.softwaremill.macwire._
import nova.monadic_sfx.http.Requesters import nova.monadic_sfx.http.Requesters
import nova.monadic_sfx.executors.Schedulers import nova.monadic_sfx.executors.Schedulers
import akka.actor.typed._
trait UiModule { trait UiModule {
def fxAppResource( def fxAppResource(
logger: Logger[Task], logger: Logger[Task],
backend: AppTypes.HttpBackend, backend: AppTypes.HttpBackend,
actorSystem: akka.actor.ActorSystem, actorSystem: ActorSystem[SpawnProtocol.Command],
requesters: Requesters, requesters: Requesters,
schedulers: Schedulers schedulers: Schedulers
): Resource[Task, JFXApp] = ): Resource[Task, JFXApp] =
Resource.make(logger.info("Creating FX Application") >> Task { Resource.make(for {
val app: JFXApp = wire[MyFxApp] _ <- logger.info("Creating FX Application")
app app <- Task { wire[MyFxApp] }
})(app => logger.info("Stopping FX Application") >> Task(app.stopApp())) } yield (app))(app => logger.info("Stopping FX Application"))
} }
object UiModule { object UiModule {
def makePrimaryStage( def makePrimaryStage(
backend: AppTypes.HttpBackend, backend: AppTypes.HttpBackend,
actorSystem: akka.actor.ActorSystem actorSystem: ActorSystem[SpawnProtocol.Command]
) = { ) = {
new PrimaryStage { new PrimaryStage {
scene = new DefaultUI().scene scene = new DefaultUI().scene

View File

@ -11,10 +11,11 @@ import scalafx.scene.Parent
import scalafx.application.JFXApp.PrimaryStage import scalafx.application.JFXApp.PrimaryStage
import monix.eval.Task import monix.eval.Task
import nova.monadic_sfx.util.Action import nova.monadic_sfx.util.Action
import akka.actor.typed._
class HomeScreen( class HomeScreen(
backend: AppTypes.HttpBackend, backend: AppTypes.HttpBackend,
system: akka.actor.ActorSystem, system: ActorSystem[SpawnProtocol.Command],
onLogout: () => Task[Unit] onLogout: () => Task[Unit]
) { ) {
private lazy val root = Task.deferAction { implicit s => private lazy val root = Task.deferAction { implicit s =>
@ -38,7 +39,7 @@ class HomeScreen(
object HomeScreen { object HomeScreen {
def apply( def apply(
backend: AppTypes.HttpBackend, backend: AppTypes.HttpBackend,
system: akka.actor.ActorSystem, system: ActorSystem[SpawnProtocol.Command],
onLogout: () => Task[Unit] onLogout: () => Task[Unit]
): Task[Parent] = ): Task[Parent] =
new HomeScreen(backend, system, onLogout).render new HomeScreen(backend, system, onLogout).render

View File

@ -20,6 +20,7 @@ import nova.monadic_sfx.models.HttpBinResponse
import sttp.client.ResponseError import sttp.client.ResponseError
import nova.monadic_sfx.executors.Schedulers import nova.monadic_sfx.executors.Schedulers
import nova.monadic_sfx.ui.screens.Screen import nova.monadic_sfx.ui.screens.Screen
import akka.actor.typed._
// import io.odin.syntax._ // import io.odin.syntax._
// import _root_.monix.eval.Task // import _root_.monix.eval.Task
// import io.odin.monix._ // import io.odin.monix._
@ -29,8 +30,9 @@ class LoginScreen(
override protected val appStage: PrimaryStage, override protected val appStage: PrimaryStage,
logger: Logger[Task], logger: Logger[Task],
backend: AppTypes.HttpBackend, backend: AppTypes.HttpBackend,
system: akka.actor.ActorSystem, system: ActorSystem[SpawnProtocol.Command],
requesters: Requesters, requesters: Requesters,
// dm: DummyRequest,
schedulers: Schedulers schedulers: Schedulers
) extends Screen { ) extends Screen {
val dummyRequester: DummyRequest = requesters.dummyRequester val dummyRequester: DummyRequest = requesters.dummyRequester
@ -103,6 +105,6 @@ class LoginScreen(
// def apply( // def apply(
// appStage: PrimaryStage, // appStage: PrimaryStage,
// backend: AppTypes.HttpBackend, // backend: AppTypes.HttpBackend,
// system: akka.actor.ActorSystem // system: ActorSystem[SpawnProtocol.Command]
// ) = new LoginScreen(appStage, backend, system).render // ) = new LoginScreen(appStage, backend, system).render
// } // }