From 937a1046151b7acda32cef289aa4e260c186d185 Mon Sep 17 00:00:00 2001 From: Rohan Sircar Date: Wed, 30 Dec 2020 13:51:45 +0530 Subject: [PATCH] Added WebSocket observable class --- .../outwatchapp/util/reactive/WebSocket.scala | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 src/main/scala/outwatchapp/util/reactive/WebSocket.scala diff --git a/src/main/scala/outwatchapp/util/reactive/WebSocket.scala b/src/main/scala/outwatchapp/util/reactive/WebSocket.scala new file mode 100644 index 0000000..75a999b --- /dev/null +++ b/src/main/scala/outwatchapp/util/reactive/WebSocket.scala @@ -0,0 +1,84 @@ +package outwatchapp.util.reactive + +import org.scalajs.dom.{raw => sjsdr} +import io.circe.Decoder +import io.circe.Encoder +import io.circe.Printer +import io.circe.generic.JsonCodec +import io.circe.parser._ +import io.circe.syntax._ +import monix.bio.Task +import monix.execution.Ack +import monix.execution.Cancelable +import monix.reactive.Observable +import monix.reactive.Observer +import monix.reactive.OverflowStrategy +import outwatchapp.util.reactive.MonixProSubject +import org.scalajs.dom.raw.Event +import org.scalajs.dom.raw.MessageEvent +import scala.concurrent.Future + +class WebSocketImpl[T: Encoder: Decoder]( + ws: sjsdr.WebSocket, + overflowStrategy: OverflowStrategy.Synchronous[T] +) { + val printer = Printer.noSpaces + + lazy val source: Task[Observable[T]] = + Task.deferAction(implicit s => + for { + obs <- Task( + Observable + .create[T](overflowStrategy) { sub => + ws.onmessage = (e: MessageEvent) => + e.data match { + case s: String => + decode[T](s).map(sub.onNext).left.foreach(println) + case other => println(other) + } + ws.onerror = (e: Event) => + sub.onError(new Exception(s"Error in WebSocket: $e")) + Cancelable(() => ws.close()) + } + .publish + .refCount + ) + _ <- Task(obs.subscribe(Observer.empty)) + } yield obs + ) + + lazy val sink: Task[Observer[T]] = + Task( + new Observer[T] { + override def onNext(elem: T): Future[Ack] = { + val msg = printer.print(elem.asJson) + ws.send(msg) + Future.successful(Ack.Continue) + } + override def onError(ex: Throwable): Unit = println(ex) + override def onComplete(): Unit = () + } + ) + +} +object WebSocket { + sealed trait Error + final case object Error extends Error + // val w = new sjsdr.Worker("/worker.js").asInstanceOf[Worker] + + // type MonixWebWorker[T] = MonixProSubject[T, T] + + def apply[T <: Product: Encoder: Decoder]( + path: String, + overflowStrategy: OverflowStrategy.Synchronous[T] = + OverflowStrategy.DropOld(50) + ): Task[WebSocket[T]] = for { + websocket <- Task(new sjsdr.WebSocket(path)) + impl = new WebSocketImpl[T](websocket, overflowStrategy) + source <- impl.source + sink <- impl.sink + } yield MonixProSubject.from(sink, source) +} + +@JsonCodec +case class WebsocketData(data: Long)