package outwatchapp.util.reactive import scala.concurrent.Future import io.circe.Decoder import io.circe.Encoder import io.circe.Printer import io.circe.generic.JsonCodec import io.circe.parser._ import io.circe.syntax._ import monix.bio.Task import monix.execution.Ack import monix.execution.Cancelable import monix.reactive.Observable import monix.reactive.Observer import monix.reactive.OverflowStrategy import outwatchapp.util.reactive.MonixProSubject import org.scalajs.dom.raw.Event import org.scalajs.dom.raw.MessageEvent import org.scalajs.dom.{raw => sjsdr} class WebWorkerImpl[T <: Product: Encoder: Decoder]( worker: sjsdr.Worker, overflowStrategy: OverflowStrategy.Synchronous[T] ) { // private def parseFn(data: T) = { // data match { // case _: AnyRef => parseRef(data) // case s: String => // case other => // println(other) // Left(WebWorker.Error) // } // } // private def parseRef(data: T): Either[WebWorker.Error, T] = { // data match { // case s: String => decode[T](s).leftMap(_ => WebWorker.Error) // case a: Int => // println(a) // Left(WebWorker.Error) // case other => // println(other) // Left(WebWorker.Error) // } // } // private def parsePrimitive(data: T): Either[WebWorker.Error, T] = { // data match { // case s: String => decode[T](s).leftMap(_ => WebWorker.Error) // case a: Int => // println(a) // Left(WebWorker.Error) // case other => // println(other) // Left(WebWorker.Error) // } // } // println(s"Got data $a") // .map(sub.onNext).left.foreach(println) val printer = Printer.noSpaces lazy val source: Task[Observable[T]] = Task.deferAction(implicit s => for { obs <- Task( Observable .create[T](overflowStrategy) { sub => worker.onmessage = (e: MessageEvent) => e.data match { case s: String => decode[T](s).map(sub.onNext).left.foreach(println) case other => println(other) } worker.onerror = (e: Event) => sub.onError(new Exception(s"Error in WebSocket: $e")) Cancelable(() => worker.terminate()) } .publish .refCount ) _ <- Task(obs.subscribe(Observer.empty)) } yield obs ) lazy val sink: Task[Observer[T]] = Task( new Observer[T] { override def onNext(elem: T): Future[Ack] = { val msg = printer.print(elem.asJson) worker.postMessage(msg) Future.successful(Ack.Continue) } override def onError(ex: Throwable): Unit = println(ex) override def onComplete(): Unit = () } ) } object WebWorker { sealed trait Error final case object Error extends Error // val w = new sjsdr.Worker("/worker.js").asInstanceOf[Worker] // type MonixWebWorker[T] = MonixProSubject[T, T] def apply[T <: Product: Encoder: Decoder]( path: String, overflowStrategy: OverflowStrategy.Synchronous[T] = OverflowStrategy.DropOld(50) ): Task[WebWorker[T]] = for { worker <- Task(new sjsdr.Worker(path)) impl = new WebWorkerImpl[T](worker, overflowStrategy) source <- impl.source sink <- impl.sink } yield MonixProSubject.from(sink, source) } @JsonCodec case class WorkerData(data: Long)