package wow.doge.mygame.subsystems.events import scala.reflect.ClassTag import scala.util.Random import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.Scheduler import akka.actor.typed.SpawnProtocol import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.Behaviors import akka.event.EventStream import akka.util.Timeout import monix.bio.UIO import monix.execution.Ack import monix.execution.Cancelable import monix.execution.cancelables.SingleAssignCancelable import monix.reactive.Observable import monix.reactive.OverflowStrategy import wow.doge.mygame.implicits._ import wow.doge.mygame.subsystems.events.EventBus.ObservableSubscription import wow.doge.mygame.utils.AkkaUtils /** * A (typed) event bus * Copied (and repurposed) from Akka's EventStream */ object EventBus { sealed trait Command[A] final case class Publish[A, E <: A]( event: E, publisherName: String ) extends Command[A] final case class Subscribe[A, E <: A](subscriber: ActorRef[E])(implicit classTag: ClassTag[E] ) extends Command[A] { def topic: Class[_] = classTag.runtimeClass } final case class Unsubscribe[A, E <: A](subscriber: ActorRef[E]) extends Command[A] final case class ObservableSubscription[A, E <: A]( replyTo: ActorRef[Observable[E]] )(implicit classTag: ClassTag[E] ) extends Command[A] { def ct = classTag } def apply[A: ClassTag]()(implicit timeout: Timeout, spawnProtocol: ActorRef[SpawnProtocol.Command] ): Behavior[EventBus.Command[A]] = Behaviors.setup { ctx => val eventStream = new EventStream(ctx.system.classicSystem) implicit val scheduler = ctx.system.scheduler new EventBus().eventStreamBehavior(eventStream) } def observable[E](eventBus: ActorRef[EventBus.Command[E]])(implicit timeout: Timeout, scheduler: Scheduler, spawnProtocol: ActorRef[SpawnProtocol.Command], ct: ClassTag[E] ) = Observable.create[E](OverflowStrategy.DropOld(50)) { sub => implicit val s = sub.scheduler val c = SingleAssignCancelable() val behavior = Behaviors.receive[E] { (ctx, msg) => if (sub.onNext(msg) == Ack.Stop) { c.cancel() Behaviors.stopped } else Behaviors.same } val actor = AkkaUtils .spawnActorL( behavior, s"eventBusObservable-${ct.toString}-${Random.nextLong()}" ) .mapError(err => new Exception(err.toString)) .tapError { case ex => UIO(sub.onError(ex)) } (for { a <- actor _ <- eventBus !! Subscribe(a) _ <- UIO(c := Cancelable(() => eventBus ! Unsubscribe(a))) } yield ()).runToFuture c } def observable2[A, B <: A](eventBus: ActorRef[EventBus.Command[A]])(implicit timeout: Timeout, scheduler: Scheduler, spawnProtocol: ActorRef[SpawnProtocol.Command], ct: ClassTag[A], ct2: ClassTag[B] ) = Observable.create[B](OverflowStrategy.DropOld(50)) { sub => implicit val s = sub.scheduler val c = SingleAssignCancelable() val behavior = Behaviors.receive[B] { (ctx, msg) => if (sub.onNext(msg) == Ack.Stop) { c.cancel() Behaviors.stopped } else Behaviors.same } val actor = AkkaUtils .spawnActorL( behavior, s"eventBusObservable-${ct.toString}-${math.abs(Random.nextLong())}" ) .mapError(err => new Throwable(err.toString)) .tapError { case ex => UIO(sub.onError(ex)) } (for { a <- actor _ <- eventBus !! Subscribe(a) _ <- UIO(c := Cancelable(() => eventBus ! Unsubscribe(a))) } yield ()).runToFuture c } } class EventBus[A] { import akka.actor.typed.scaladsl.adapter._ private def eventStreamBehavior( eventStream: EventStream )(implicit timeout: Timeout, scheduler: Scheduler, spawnProtocol: ActorRef[SpawnProtocol.Command], ct: ClassTag[A] ): Behavior[EventBus.Command[A]] = Behaviors.setup { ctx => Behaviors.receiveMessage { case EventBus.Publish(event, name) => eventStream.publish(event) Behaviors.same case s @ EventBus.Subscribe(subscriber) => eventStream.subscribe(subscriber.toClassic, s.topic) Behaviors.same case EventBus.Unsubscribe(subscriber) => eventStream.unsubscribe(subscriber.toClassic) Behaviors.same case s @ ObservableSubscription(replyTo) => val obs = EventBus.observable2( ctx.self )(timeout, scheduler, spawnProtocol, ct, s.ct) replyTo ! obs Behaviors.same } } }