package wow.doge.mygame.events // import akka.event.ActorEventBus // import akka.event.ManagedActorClassification // import akka.event.ActorClassifier import akka.actor.typed.ActorRef // import akka.actor.ActorSystem // import akka.event.EventBus // import akka.util.Subclassification // import java.util.concurrent.atomic.AtomicReference import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import scala.reflect.ClassTag import akka.event.EventStream // private[events] final case class ClassificationMessage(ref: ActorRef, id: Int) // class ActorBusImpl(val system: ActorSystem, val busSize: Int) // extends ActorEventBus // with ActorClassifier // with ManagedActorClassification { // type Event = ClassificationMessage // // is used for extracting the classifier from the incoming events // override protected def classify(event: Event): ActorRef = event.ref // // determines the initial size of the index data structure // // used internally (i.e. the expected number of different classifiers) // override protected def mapSize: Int = busSize // } // class StartsWithSubclassification extends Subclassification[String] { // override def isEqual(x: String, y: String): Boolean = // x == y // override def isSubclass(x: String, y: String): Boolean = // x.startsWith(y) // } // import akka.event.SubchannelClassification // final case class MsgEnvelope(topic: String, payload: Any) // import akka.actor.typed.scaladsl.adapter._ // /** // * Publishes the payload of the MsgEnvelope when the topic of the // * MsgEnvelope starts with the String specified when subscribing. // */ // class SubchannelBusImpl extends EventBus with SubchannelClassification { // type Event = Any // type Classifier = Class[_] // type Subscriber = ActorRef // // Subclassification is an object providing `isEqual` and `isSubclass` // // to be consumed by the other methods of this classifier // // override protected val subclassification: Subclassification[Classifier] = // // new StartsWithSubclassification // private val initiallySubscribedOrUnsubscriber = // new AtomicReference[Either[Set[ActorRef], ActorRef]](Left(Set.empty)) // override protected implicit val subclassification // : Subclassification[Classifier] = new Subclassification[Class[_]] { // def isEqual(x: Class[_], y: Class[_]) = x == y // def isSubclass(x: Class[_], y: Class[_]) = y.isAssignableFrom(x) // } // // is used for extracting the classifier from the incoming events // override protected def classify(event: Event): Classifier = event.getClass() // // will be invoked for each event for all subscribers which registered // // themselves for the event’s classifier // override protected def publish(event: Event, subscriber: Subscriber): Unit = { // // subscriber ! event.payload // subscriber ! event // } // } object EventBus { sealed trait Command[A] final case class Publish[A, E <: A](event: E, publisher: ActorRef[_]) extends Command[A] /** * Subscribe a typed actor to listen for types or subtypes of E * by sending this command to the [[akka.actor.typed.ActorSystem.eventStream]]. * * ==Simple example== * {{{ * sealed trait A * case object A1 extends A * //listen for all As * def subscribe(actorSystem: ActorSystem[_], actorRef: ActorRef[A]) = * actorSystem.eventStream ! EventStream.Subscribe(actorRef) * //listen for A1s only * def subscribe(actorSystem: ActorSystem[_], actorRef: ActorRef[A]) = * actorSystem.eventStream ! EventStream.Subscribe[A1](actorRef) * }}} */ final case class Subscribe[A, E <: A](subscriber: ActorRef[E])(implicit classTag: ClassTag[E] ) extends Command[A] { def topic: Class[_] = classTag.runtimeClass } /** * Unsubscribe an actor ref from the event stream * by sending this command to the [[akka.actor.typed.ActorSystem.eventStream]]. */ final case class Unsubscribe[A, E <: A](subscriber: ActorRef[E]) extends Command[A] def apply[A](): Behavior[EventBus.Command[A]] = Behaviors.setup { ctx => val eventStream = new EventStream(ctx.system.classicSystem) new EventBus().eventStreamBehavior(eventStream) } } class EventBus[B] { import akka.actor.typed.scaladsl.adapter._ private def eventStreamBehavior( eventStream: akka.event.EventStream ): Behavior[EventBus.Command[B]] = Behaviors.receiveMessage { case EventBus.Publish(event, publisher) => eventStream.publish(event) Behaviors.same case s @ EventBus.Subscribe(subscriber) => eventStream.subscribe(subscriber.toClassic, s.topic) Behaviors.same case EventBus.Unsubscribe(subscriber) => eventStream.unsubscribe(subscriber.toClassic) Behaviors.same } }