Browse Source

Added WebSocket observable class

development
Rohan Sircar 3 months ago
parent
commit
937a104615
1 changed files with 84 additions and 0 deletions
  1. +84
    -0
      src/main/scala/outwatchapp/util/reactive/WebSocket.scala

+ 84
- 0
src/main/scala/outwatchapp/util/reactive/WebSocket.scala View File

@ -0,0 +1,84 @@
package outwatchapp.util.reactive
import org.scalajs.dom.{raw => sjsdr}
import io.circe.Decoder
import io.circe.Encoder
import io.circe.Printer
import io.circe.generic.JsonCodec
import io.circe.parser._
import io.circe.syntax._
import monix.bio.Task
import monix.execution.Ack
import monix.execution.Cancelable
import monix.reactive.Observable
import monix.reactive.Observer
import monix.reactive.OverflowStrategy
import outwatchapp.util.reactive.MonixProSubject
import org.scalajs.dom.raw.Event
import org.scalajs.dom.raw.MessageEvent
import scala.concurrent.Future
class WebSocketImpl[T: Encoder: Decoder](
ws: sjsdr.WebSocket,
overflowStrategy: OverflowStrategy.Synchronous[T]
) {
val printer = Printer.noSpaces
lazy val source: Task[Observable[T]] =
Task.deferAction(implicit s =>
for {
obs <- Task(
Observable
.create[T](overflowStrategy) { sub =>
ws.onmessage = (e: MessageEvent) =>
e.data match {
case s: String =>
decode[T](s).map(sub.onNext).left.foreach(println)
case other => println(other)
}
ws.onerror = (e: Event) =>
sub.onError(new Exception(s"Error in WebSocket: $e"))
Cancelable(() => ws.close())
}
.publish
.refCount
)
_ <- Task(obs.subscribe(Observer.empty))
} yield obs
)
lazy val sink: Task[Observer[T]] =
Task(
new Observer[T] {
override def onNext(elem: T): Future[Ack] = {
val msg = printer.print(elem.asJson)
ws.send(msg)
Future.successful(Ack.Continue)
}
override def onError(ex: Throwable): Unit = println(ex)
override def onComplete(): Unit = ()
}
)
}
object WebSocket {
sealed trait Error
final case object Error extends Error
// val w = new sjsdr.Worker("/worker.js").asInstanceOf[Worker]
// type MonixWebWorker[T] = MonixProSubject[T, T]
def apply[T <: Product: Encoder: Decoder](
path: String,
overflowStrategy: OverflowStrategy.Synchronous[T] =
OverflowStrategy.DropOld(50)
): Task[WebSocket[T]] = for {
websocket <- Task(new sjsdr.WebSocket(path))
impl = new WebSocketImpl[T](websocket, overflowStrategy)
source <- impl.source
sink <- impl.sink
} yield MonixProSubject.from(sink, source)
}
@JsonCodec
case class WebsocketData(data: Long)

Loading…
Cancel
Save