Testing out JmonkeyEngine to make a game in Scala with Akka Actors within a pure FP layer
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

137 lines
4.0 KiB

4 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
4 years ago
3 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
3 years ago
4 years ago
3 years ago
3 years ago
3 years ago
4 years ago
  1. package wow.doge.mygame.subsystems.events
  2. import scala.reflect.ClassTag
  3. import akka.actor.typed.ActorRef
  4. import akka.actor.typed.Behavior
  5. import akka.actor.typed.PostStop
  6. import akka.actor.typed.Scheduler
  7. import akka.actor.typed.SpawnProtocol
  8. import akka.actor.typed.scaladsl.AskPattern._
  9. import akka.actor.typed.scaladsl.Behaviors
  10. import akka.event.EventStream
  11. import akka.util.Timeout
  12. import monix.bio.UIO
  13. import monix.execution.Ack
  14. import monix.execution.Cancelable
  15. import monix.execution.cancelables.SingleAssignCancelable
  16. import monix.reactive.Observable
  17. import monix.reactive.OverflowStrategy
  18. import wow.doge.mygame.implicits._
  19. import wow.doge.mygame.subsystems.events.EventBus.ObservableSubscription
  20. import wow.doge.mygame.utils.AkkaUtils
  21. /**
  22. * A (typed) event bus
  23. * Copied (and repurposed) from Akka's EventStream
  24. */
  25. object EventBus {
  26. sealed trait Command[A]
  27. final case class Publish[A, E <: A](
  28. event: E,
  29. publisherName: String
  30. ) extends Command[A]
  31. final case class Subscribe[A, E <: A](subscriber: ActorRef[E])(implicit
  32. classTag: ClassTag[E]
  33. ) extends Command[A] {
  34. def topic: Class[_] = classTag.runtimeClass
  35. }
  36. final case class Unsubscribe[A, E <: A](subscriber: ActorRef[E])
  37. extends Command[A]
  38. final case class ObservableSubscription[A, E <: A](
  39. replyTo: ActorRef[Observable[E]]
  40. )(implicit
  41. classTag: ClassTag[E]
  42. ) extends Command[A] {
  43. def ct = classTag
  44. }
  45. def apply[A: ClassTag]()(implicit
  46. timeout: Timeout,
  47. spawnProtocol: ActorRef[SpawnProtocol.Command]
  48. ): Behavior[EventBus.Command[A]] =
  49. Behaviors.setup { ctx =>
  50. val eventStream = new EventStream(ctx.system.classicSystem)
  51. implicit val scheduler = ctx.system.scheduler
  52. new EventBus().eventStreamBehavior(eventStream)
  53. }
  54. def observable[A, B <: A](eventBus: ActorRef[EventBus.Command[A]])(implicit
  55. timeout: Timeout,
  56. scheduler: Scheduler,
  57. spawnProtocol: ActorRef[SpawnProtocol.Command],
  58. ct: ClassTag[A],
  59. ct2: ClassTag[B]
  60. ) =
  61. Observable.create[B](OverflowStrategy.DropOld(50)) { sub =>
  62. implicit val s = sub.scheduler
  63. val c = SingleAssignCancelable()
  64. val behavior = Behaviors
  65. .receive[B] { (ctx, msg) =>
  66. ctx.log.traceP(s"Emitted $msg")
  67. if (sub.onNext(msg) == Ack.Stop) {
  68. c.cancel()
  69. Behaviors.stopped
  70. } else Behaviors.same
  71. }
  72. .receiveSignal {
  73. case (_, PostStop) =>
  74. sub.onComplete()
  75. Behaviors.same
  76. }
  77. val actor =
  78. AkkaUtils
  79. .spawnActorL(
  80. behavior,
  81. actorName =
  82. Some(s"eventBusObservable-${ct.toString.split("""\.""").last}")
  83. )
  84. .mapError(err => new Throwable(err.toString))
  85. .tapError {
  86. case ex => UIO(sub.onError(ex))
  87. }
  88. (for {
  89. a <- actor
  90. _ <- eventBus !! Subscribe(a)
  91. _ <- UIO(c := Cancelable(() => eventBus ! Unsubscribe(a)))
  92. } yield ()).runToFuture
  93. c
  94. }
  95. }
  96. class EventBus[A] {
  97. import akka.actor.typed.scaladsl.adapter._
  98. private def eventStreamBehavior(
  99. eventStream: EventStream
  100. )(implicit
  101. timeout: Timeout,
  102. scheduler: Scheduler,
  103. spawnProtocol: ActorRef[SpawnProtocol.Command],
  104. ct: ClassTag[A]
  105. ): Behavior[EventBus.Command[A]] =
  106. Behaviors.setup { ctx =>
  107. Behaviors.receiveMessage {
  108. case EventBus.Publish(event, name) =>
  109. eventStream.publish(event)
  110. Behaviors.same
  111. case s @ EventBus.Subscribe(subscriber) =>
  112. eventStream.subscribe(subscriber.toClassic, s.topic)
  113. Behaviors.same
  114. case EventBus.Unsubscribe(subscriber) =>
  115. eventStream.unsubscribe(subscriber.toClassic)
  116. Behaviors.same
  117. case s @ ObservableSubscription(replyTo) =>
  118. val obs = EventBus.observable(
  119. ctx.self
  120. )(timeout, scheduler, spawnProtocol, ct, s.ct)
  121. replyTo ! obs
  122. Behaviors.same
  123. }
  124. }
  125. }