Testing out JmonkeyEngine to make a game in Scala with Akka Actors within a pure FP layer
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
4.6 KiB

4 years ago
4 years ago
4 years ago
3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
4 years ago
3 years ago
4 years ago
3 years ago
4 years ago
3 years ago
4 years ago
3 years ago
4 years ago
  1. package wow.doge.mygame.subsystems.events
  2. import scala.reflect.ClassTag
  3. import akka.actor.typed.ActorRef
  4. import akka.actor.typed.Behavior
  5. import akka.actor.typed.scaladsl.Behaviors
  6. import akka.event.EventStream
  7. import monix.reactive.Observable
  8. import monix.reactive.OverflowStrategy
  9. import monix.execution.cancelables.SingleAssignCancelable
  10. import monix.execution.Ack
  11. import akka.util.Timeout
  12. import akka.actor.typed.Scheduler
  13. import akka.actor.typed.SpawnProtocol
  14. import scala.util.Random
  15. import akka.actor.typed.scaladsl.AskPattern._
  16. import monix.execution.Cancelable
  17. import wow.doge.mygame.utils.AkkaUtils
  18. import wow.doge.mygame.implicits._
  19. import monix.bio.UIO
  20. import wow.doge.mygame.subsystems.events.EventBus.ObservableSubscription
  21. /**
  22. * A (typed) event bus
  23. * Copied (and repurposed) from Akka's EventStream
  24. */
  25. object EventBus {
  26. sealed trait Command[A]
  27. final case class Publish[A, E <: A](
  28. event: E,
  29. publisherName: String
  30. ) extends Command[A]
  31. final case class Subscribe[A, E <: A](subscriber: ActorRef[E])(implicit
  32. classTag: ClassTag[E]
  33. ) extends Command[A] {
  34. def topic: Class[_] = classTag.runtimeClass
  35. }
  36. final case class Unsubscribe[A, E <: A](subscriber: ActorRef[E])
  37. extends Command[A]
  38. final case class ObservableSubscription[A, E <: A](
  39. replyTo: ActorRef[Observable[E]]
  40. )(implicit
  41. classTag: ClassTag[E]
  42. ) extends Command[A] {
  43. def ct = classTag
  44. }
  45. def apply[A: ClassTag]()(implicit
  46. timeout: Timeout,
  47. spawnProtocol: ActorRef[SpawnProtocol.Command]
  48. ): Behavior[EventBus.Command[A]] =
  49. Behaviors.setup { ctx =>
  50. val eventStream = new EventStream(ctx.system.classicSystem)
  51. implicit val scheduler = ctx.system.scheduler
  52. new EventBus().eventStreamBehavior(eventStream)
  53. }
  54. def observable[E](eventBus: ActorRef[EventBus.Command[E]])(implicit
  55. timeout: Timeout,
  56. scheduler: Scheduler,
  57. spawnProtocol: ActorRef[SpawnProtocol.Command],
  58. ct: ClassTag[E]
  59. ) =
  60. Observable.create[E](OverflowStrategy.DropOld(50)) { sub =>
  61. implicit val s = sub.scheduler
  62. val c = SingleAssignCancelable()
  63. val behavior = Behaviors.receive[E] { (ctx, msg) =>
  64. if (sub.onNext(msg) == Ack.Stop) {
  65. c.cancel()
  66. Behaviors.stopped
  67. } else Behaviors.same
  68. }
  69. val actor =
  70. AkkaUtils
  71. .spawnActorL(
  72. behavior,
  73. s"eventBusObservable-${ct.toString}-${Random.nextLong()}"
  74. )
  75. .tapError {
  76. case ex => UIO(sub.onError(ex))
  77. }
  78. (for {
  79. a <- actor
  80. _ <- eventBus !! Subscribe(a)
  81. _ <- UIO(c := Cancelable(() => eventBus ! Unsubscribe(a)))
  82. } yield ()).runToFuture
  83. c
  84. }
  85. def observable2[A, B <: A](eventBus: ActorRef[EventBus.Command[A]])(implicit
  86. timeout: Timeout,
  87. scheduler: Scheduler,
  88. spawnProtocol: ActorRef[SpawnProtocol.Command],
  89. ct: ClassTag[A],
  90. ct2: ClassTag[B]
  91. ) =
  92. Observable.create[B](OverflowStrategy.DropOld(50)) { sub =>
  93. implicit val s = sub.scheduler
  94. val c = SingleAssignCancelable()
  95. val behavior = Behaviors.receive[B] { (ctx, msg) =>
  96. if (sub.onNext(msg) == Ack.Stop) {
  97. c.cancel()
  98. Behaviors.stopped
  99. } else Behaviors.same
  100. }
  101. val actor =
  102. AkkaUtils
  103. .spawnActorL(
  104. behavior,
  105. s"eventBusObservable-${ct.toString}-${math.abs(Random.nextLong())}"
  106. )
  107. .tapError {
  108. case ex => UIO(sub.onError(ex))
  109. }
  110. (for {
  111. a <- actor
  112. _ <- eventBus !! Subscribe(a)
  113. _ <- UIO(c := Cancelable(() => eventBus ! Unsubscribe(a)))
  114. } yield ()).runToFuture
  115. c
  116. }
  117. }
  118. class EventBus[A] {
  119. import akka.actor.typed.scaladsl.adapter._
  120. private def eventStreamBehavior(
  121. eventStream: EventStream
  122. )(implicit
  123. timeout: Timeout,
  124. scheduler: Scheduler,
  125. spawnProtocol: ActorRef[SpawnProtocol.Command],
  126. ct: ClassTag[A]
  127. ): Behavior[EventBus.Command[A]] =
  128. Behaviors.setup { ctx =>
  129. Behaviors.receiveMessage {
  130. case EventBus.Publish(event, name) =>
  131. eventStream.publish(event)
  132. Behaviors.same
  133. case s @ EventBus.Subscribe(subscriber) =>
  134. eventStream.subscribe(subscriber.toClassic, s.topic)
  135. Behaviors.same
  136. case EventBus.Unsubscribe(subscriber) =>
  137. eventStream.unsubscribe(subscriber.toClassic)
  138. Behaviors.same
  139. case s @ ObservableSubscription(replyTo) =>
  140. val obs = EventBus.observable2(
  141. ctx.self
  142. )(timeout, scheduler, spawnProtocol, ct, s.ct)
  143. replyTo ! obs
  144. Behaviors.same
  145. }
  146. }
  147. }