package wow.doge.mygame.scriptsystem import scala.util.Failure import scala.util.Success import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.LogOptions import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.PoolRouter import akka.actor.typed.scaladsl.Routers import akka.util.Timeout import com.typesafe.scalalogging.Logger import org.slf4j.event.Level import wow.doge.mygame.state.ScriptActor import scala.concurrent.duration._ import ScriptActor.ScriptObject object ScriptCachingActor { type ScriptsMap = Map[os.Path, ScriptObject] type ScriptResult = Either[ScriptActor.Error, ScriptObject] sealed trait Command /** * @param scriptPath path of the script to compile * @param requester return address of the asking actor * @param force if true, forces script compilation even if a previous cached version exists */ final case class Get( scriptPath: os.Path, requester: ActorRef[ScriptResult], force: Boolean = false ) extends Command // final case class GetAll( // scriptPaths: Seq[os.Path], // requester: ActorRef[Map[os.Path, ScriptResult]], // force: Boolean = false // ) extends Command final case class GetMap(requester: ActorRef[ScriptsMap]) extends Command final case class Put(scriptPath: os.Path, script: ScriptObject) extends Command private[scriptsystem] final case object NoOp extends Command private[scriptsystem] final case class DelegateToChild( scriptActor: ActorRef[ScriptActor.Command], scriptPath: os.Path, requester: ActorRef[ScriptResult] ) extends Command // private[scriptsystem] final case class DelegateAllToChild( // scriptPaths: Seq[os.Path], // requester: ActorRef[Map[os.Path, ScriptResult]] // ) extends Command // private[scriptsystem] final case class ReplyWithPrecompiled( // precompiled: Map[os.Path, ScriptResult], // requester: ActorRef[Map[os.Path, ScriptResult]] // ) extends Command // final case class Props( // ctx: ActorContext[Command], // scriptActor: ActorRef[ScriptActor.Command] // ) { // def create(state: State = State(Map.empty)): Behavior[Command] = // Behaviors.logMessages { // Behaviors.setup { ctx => // val pool = ScriptActorPool(4) // val scriptsRouter = ctx.spawn(pool, "script-actors-pool") // new ScriptCachingActor(this) // .receiveMessage(state) // } // } // } final case class State(scriptsMap: ScriptsMap) def apply(state: State = State(Map.empty)): Behavior[Command] = Behaviors.logMessages( LogOptions() .withLevel(Level.TRACE) .withLogger( Logger[ScriptCachingActor].underlying ), Behaviors.setup { ctx => val pool = ScriptActorPool(4) val scriptsRouter = ctx.spawn(pool, "script-actors-pool") new ScriptCachingActor(ctx, scriptsRouter).receiveMessage(state) } ) } class ScriptCachingActor( ctx: ActorContext[ScriptCachingActor.Command], scriptActor: ActorRef[ScriptActor.Command] ) { import com.softwaremill.quicklens._ import ScriptCachingActor._ import Methods._ def receiveMessage(state: State): Behavior[Command] = Behaviors.receiveMessage { msg => msg match { case Get(scriptPath, requester, force) => if (force) ctx.self ! DelegateToChild(scriptActor, scriptPath, requester) else getOrCompileScript( ctx, scriptPath, state.scriptsMap, scriptActor, requester ) Behaviors.same // case GetAll(scriptPaths, requester, force) => // import scala.concurrent.duration._ // implicit val timeout = Timeout(15.seconds) // /** // * Holy complexity batman this is getting too complex // */ // if (force) { // scriptPaths // .sliding( // if (scriptPaths.length > 3 && scriptPaths.length % 2 == 0) 2 // else 3 // ) // .foreach(lst => // ctx.self ! DelegateAllToChild(scriptPaths, requester) // ) // } else { // val (failures, successes) = scriptPaths // .partitionMap(path => // state.scriptsMap.get(path) match { // case Some(value) => Right(path -> value) // case None => Left(path) // } // ) match { // case (failures, successes) => // import cats.syntax.either._ // failures -> Map.from(successes.map { // case (p, obj) => p -> obj.asRight[ScriptActor.Error] // }) // } // ctx.ask(scriptActor, ScriptActor.CompileAll(failures, _)) { // case Success(value) => // val total = successes ++ value // requester ! total // value.foreach { // case (path, res) => res.foreach(r => ctx.self ! Put(path, r)) // } // NoOp // case Failure(exception) => NoOp // } // } // // scriptPaths.foreach(p => ctx.self ! Get(p)) // Behaviors.same case DelegateToChild(scriptActor, scriptPath, requester) => implicit val timeout = Timeout(15.seconds) // child ! ScriptActor.CompileAny(scriptPath, requester) askChildForScriptCompilation( ctx, scriptActor, scriptPath, requester ) Behaviors.same // case DelegateAllToChild(scriptPaths, requester) => // import scala.concurrent.duration._ // implicit val timeout = Timeout(15.seconds) // ctx.ask(scriptActor, ScriptActor.CompileAll(scriptPaths, _)) { // case Success(value) => // value.foreach { // case (path, res) => res.foreach(r => ctx.self ! Put(path, r)) // } // NoOp // case Failure(exception) => NoOp // } // Behaviors.same case GetMap(requester) => requester ! state.scriptsMap Behaviors.same case Put(scriptPath, script) => ctx.log.debug(s"Putting $script at path $scriptPath") val newState = state.modify(_.scriptsMap).using(_ + (scriptPath -> script)) ctx.log.trace(newState.toString()) receiveMessage(state = newState) case NoOp => Behaviors.same } } } object ScriptActorPool { def apply( poolSize: Int ): PoolRouter[ScriptActor.Command] = Routers.pool(poolSize = poolSize)( // make sure the workers are restarted if they fail Behaviors .supervise(ScriptActor()) .onFailure[Exception](SupervisorStrategy.restart) ) } private[scriptsystem] object Methods { import ScriptCachingActor._ def getOrCompileScript( ctx: ActorContext[Command], scriptPath: os.Path, scriptsMap: ScriptsMap, scriptActor: ActorRef[ScriptActor.Command], requester: ActorRef[ScriptResult] ) = { scriptsMap .get(scriptPath) .fold { ctx.log.debug("Delegating to child") ctx.self ! DelegateToChild( scriptActor, scriptPath, requester ) } { s => ctx.log.debug("Getting script from cache") requester ! Right(s) } } def askChildForScriptCompilation( ctx: ActorContext[Command], scriptActor: ActorRef[ScriptActor.Command], scriptPath: os.Path, requester: ActorRef[ScriptResult] )(implicit timeout: Timeout) = { ctx.ask(scriptActor, ScriptActor.CompileAny(scriptPath, _)) { case Success(value) => requester ! value value.fold( err => { ctx.log.error(err.reason) NoOp }, res => { Put(scriptPath, res) } ) case Failure(exception) => { ctx.log.error(exception.getMessage()) NoOp } } } }