forked from nova/jmonkey-test
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
139 lines
4.8 KiB
139 lines
4.8 KiB
package wow.doge.mygame.events
|
|
|
|
// import akka.event.ActorEventBus
|
|
// import akka.event.ManagedActorClassification
|
|
// import akka.event.ActorClassifier
|
|
import akka.actor.typed.ActorRef
|
|
// import akka.actor.ActorSystem
|
|
// import akka.event.EventBus
|
|
// import akka.util.Subclassification
|
|
// import java.util.concurrent.atomic.AtomicReference
|
|
import akka.actor.typed.Behavior
|
|
import akka.actor.typed.scaladsl.Behaviors
|
|
import scala.reflect.ClassTag
|
|
import akka.event.EventStream
|
|
|
|
// private[events] final case class ClassificationMessage(ref: ActorRef, id: Int)
|
|
|
|
// class ActorBusImpl(val system: ActorSystem, val busSize: Int)
|
|
// extends ActorEventBus
|
|
// with ActorClassifier
|
|
// with ManagedActorClassification {
|
|
// type Event = ClassificationMessage
|
|
|
|
// // is used for extracting the classifier from the incoming events
|
|
// override protected def classify(event: Event): ActorRef = event.ref
|
|
|
|
// // determines the initial size of the index data structure
|
|
// // used internally (i.e. the expected number of different classifiers)
|
|
// override protected def mapSize: Int = busSize
|
|
// }
|
|
|
|
// class StartsWithSubclassification extends Subclassification[String] {
|
|
// override def isEqual(x: String, y: String): Boolean =
|
|
// x == y
|
|
|
|
// override def isSubclass(x: String, y: String): Boolean =
|
|
// x.startsWith(y)
|
|
// }
|
|
|
|
// import akka.event.SubchannelClassification
|
|
|
|
// final case class MsgEnvelope(topic: String, payload: Any)
|
|
// import akka.actor.typed.scaladsl.adapter._
|
|
|
|
// /**
|
|
// * Publishes the payload of the MsgEnvelope when the topic of the
|
|
// * MsgEnvelope starts with the String specified when subscribing.
|
|
// */
|
|
// class SubchannelBusImpl extends EventBus with SubchannelClassification {
|
|
// type Event = Any
|
|
// type Classifier = Class[_]
|
|
// type Subscriber = ActorRef
|
|
|
|
// // Subclassification is an object providing `isEqual` and `isSubclass`
|
|
// // to be consumed by the other methods of this classifier
|
|
// // override protected val subclassification: Subclassification[Classifier] =
|
|
// // new StartsWithSubclassification
|
|
|
|
// private val initiallySubscribedOrUnsubscriber =
|
|
// new AtomicReference[Either[Set[ActorRef], ActorRef]](Left(Set.empty))
|
|
|
|
// override protected implicit val subclassification
|
|
// : Subclassification[Classifier] = new Subclassification[Class[_]] {
|
|
// def isEqual(x: Class[_], y: Class[_]) = x == y
|
|
// def isSubclass(x: Class[_], y: Class[_]) = y.isAssignableFrom(x)
|
|
// }
|
|
|
|
// // is used for extracting the classifier from the incoming events
|
|
// override protected def classify(event: Event): Classifier = event.getClass()
|
|
|
|
// // will be invoked for each event for all subscribers which registered
|
|
// // themselves for the event’s classifier
|
|
// override protected def publish(event: Event, subscriber: Subscriber): Unit = {
|
|
// // subscriber ! event.payload
|
|
// subscriber ! event
|
|
// }
|
|
// }
|
|
|
|
object EventBus {
|
|
sealed trait Command[A]
|
|
final case class Publish[A, E <: A](event: E, publisher: ActorRef[_])
|
|
extends Command[A]
|
|
|
|
/**
|
|
* Subscribe a typed actor to listen for types or subtypes of E
|
|
* by sending this command to the [[akka.actor.typed.ActorSystem.eventStream]].
|
|
*
|
|
* ==Simple example==
|
|
* {{{
|
|
* sealed trait A
|
|
* case object A1 extends A
|
|
* //listen for all As
|
|
* def subscribe(actorSystem: ActorSystem[_], actorRef: ActorRef[A]) =
|
|
* actorSystem.eventStream ! EventStream.Subscribe(actorRef)
|
|
* //listen for A1s only
|
|
* def subscribe(actorSystem: ActorSystem[_], actorRef: ActorRef[A]) =
|
|
* actorSystem.eventStream ! EventStream.Subscribe[A1](actorRef)
|
|
* }}}
|
|
*/
|
|
final case class Subscribe[A, E <: A](subscriber: ActorRef[E])(implicit
|
|
classTag: ClassTag[E]
|
|
) extends Command[A] {
|
|
|
|
def topic: Class[_] = classTag.runtimeClass
|
|
}
|
|
|
|
/**
|
|
* Unsubscribe an actor ref from the event stream
|
|
* by sending this command to the [[akka.actor.typed.ActorSystem.eventStream]].
|
|
*/
|
|
final case class Unsubscribe[A, E <: A](subscriber: ActorRef[E])
|
|
extends Command[A]
|
|
|
|
def apply[A](): Behavior[EventBus.Command[A]] =
|
|
Behaviors.setup { ctx =>
|
|
val eventStream = new EventStream(ctx.system.classicSystem)
|
|
new EventBus().eventStreamBehavior(eventStream)
|
|
}
|
|
|
|
}
|
|
|
|
class EventBus[B] {
|
|
import akka.actor.typed.scaladsl.adapter._
|
|
|
|
private def eventStreamBehavior(
|
|
eventStream: akka.event.EventStream
|
|
): Behavior[EventBus.Command[B]] =
|
|
Behaviors.receiveMessage {
|
|
case EventBus.Publish(event, publisher) =>
|
|
eventStream.publish(event)
|
|
Behaviors.same
|
|
case s @ EventBus.Subscribe(subscriber) =>
|
|
eventStream.subscribe(subscriber.toClassic, s.topic)
|
|
Behaviors.same
|
|
case EventBus.Unsubscribe(subscriber) =>
|
|
eventStream.unsubscribe(subscriber.toClassic)
|
|
Behaviors.same
|
|
}
|
|
}
|