Testing out JmonkeyEngine to make a game in Scala with Akka Actors within a pure FP layer
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

136 lines
4.0 KiB

4 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
4 years ago
3 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
3 years ago
4 years ago
3 years ago
3 years ago
3 years ago
4 years ago
  1. package wow.doge.mygame.subsystems.events
  2. import scala.reflect.ClassTag
  3. import akka.actor.typed.ActorRef
  4. import akka.actor.typed.Behavior
  5. import akka.actor.typed.PostStop
  6. import akka.actor.typed.Scheduler
  7. import akka.actor.typed.SpawnProtocol
  8. import akka.actor.typed.scaladsl.AskPattern._
  9. import akka.actor.typed.scaladsl.Behaviors
  10. import akka.event.EventStream
  11. import akka.util.Timeout
  12. import monix.bio.UIO
  13. import monix.execution.Ack
  14. import monix.execution.Cancelable
  15. import monix.execution.cancelables.SingleAssignCancelable
  16. import monix.reactive.Observable
  17. import monix.reactive.OverflowStrategy
  18. import wow.doge.mygame.implicits._
  19. import wow.doge.mygame.subsystems.events.EventBus.ObservableSubscription
  20. import wow.doge.mygame.utils.AkkaUtils
  21. /**
  22. * A (typed) event bus
  23. * Copied (and repurposed) from Akka's EventStream
  24. */
  25. object EventBus {
  26. sealed trait Command[A]
  27. final case class Publish[A, E <: A](
  28. event: E,
  29. publisherName: String
  30. ) extends Command[A]
  31. final case class Subscribe[A, E <: A](subscriber: ActorRef[E])(implicit
  32. classTag: ClassTag[E]
  33. ) extends Command[A] {
  34. def topic: Class[_] = classTag.runtimeClass
  35. }
  36. final case class Unsubscribe[A, E <: A](subscriber: ActorRef[E])
  37. extends Command[A]
  38. final case class ObservableSubscription[A, E <: A](
  39. replyTo: ActorRef[Observable[E]]
  40. )(implicit classTag: ClassTag[E])
  41. extends Command[A] {
  42. def ct = classTag
  43. }
  44. def apply[A: ClassTag]()(implicit
  45. timeout: Timeout,
  46. spawnProtocol: ActorRef[SpawnProtocol.Command]
  47. ): Behavior[EventBus.Command[A]] =
  48. Behaviors.setup { ctx =>
  49. val eventStream = new EventStream(ctx.system.classicSystem)
  50. implicit val scheduler = ctx.system.scheduler
  51. new EventBus().eventStreamBehavior(eventStream)
  52. }
  53. def observable[A, B <: A](eventBus: ActorRef[EventBus.Command[A]])(implicit
  54. timeout: Timeout,
  55. scheduler: Scheduler,
  56. spawnProtocol: ActorRef[SpawnProtocol.Command],
  57. ct: ClassTag[A],
  58. ct2: ClassTag[B]
  59. ) =
  60. Observable.create[B](OverflowStrategy.DropOld(50)) { sub =>
  61. implicit val s = sub.scheduler
  62. val c = SingleAssignCancelable()
  63. val behavior = Behaviors
  64. .receive[B] { (ctx, msg) =>
  65. ctx.log.traceP(s"Emitted $msg")
  66. if (sub.onNext(msg) == Ack.Stop) {
  67. c.cancel()
  68. Behaviors.stopped
  69. } else Behaviors.same
  70. }
  71. .receiveSignal {
  72. case (_, PostStop) =>
  73. sub.onComplete()
  74. Behaviors.same
  75. }
  76. val actor =
  77. AkkaUtils
  78. .spawnActorL(
  79. behavior,
  80. actorName =
  81. Some(s"eventBusObservable-${ct.toString.split("""\.""").last}")
  82. )
  83. .mapError(err => new Throwable(err.toString))
  84. .tapError {
  85. case ex => UIO(sub.onError(ex))
  86. }
  87. (for {
  88. a <- actor
  89. _ <- eventBus !! Subscribe(a)
  90. _ <- UIO(c := Cancelable(() => eventBus ! Unsubscribe(a)))
  91. } yield ()).runToFuture
  92. c
  93. }
  94. }
  95. class EventBus[A] {
  96. import akka.actor.typed.scaladsl.adapter._
  97. private def eventStreamBehavior(
  98. eventStream: EventStream
  99. )(implicit
  100. timeout: Timeout,
  101. scheduler: Scheduler,
  102. spawnProtocol: ActorRef[SpawnProtocol.Command],
  103. ct: ClassTag[A]
  104. ): Behavior[EventBus.Command[A]] =
  105. Behaviors.setup { ctx =>
  106. Behaviors.receiveMessage {
  107. case EventBus.Publish(event, name) =>
  108. eventStream.publish(event)
  109. Behaviors.same
  110. case s @ EventBus.Subscribe(subscriber) =>
  111. eventStream.subscribe(subscriber.toClassic, s.topic)
  112. Behaviors.same
  113. case EventBus.Unsubscribe(subscriber) =>
  114. eventStream.unsubscribe(subscriber.toClassic)
  115. Behaviors.same
  116. case s @ ObservableSubscription(replyTo) =>
  117. val obs = EventBus.observable(
  118. ctx.self
  119. )(timeout, scheduler, spawnProtocol, ct, s.ct)
  120. replyTo ! obs
  121. Behaviors.same
  122. }
  123. }
  124. }