package outwatch.util import outwatch.helpers._ // import colibri._ import org.scalajs.dom.{Event, MessageEvent} import cats.effect.Sync import monix.reactive.Observable import monix.reactive.Observer import monix.execution.Cancelable import monix.reactive.OverflowStrategy import scala.concurrent.Future import monix.execution.Ack import cats.Show object MonixWS { // implicit def toObserver[F[_]: Sync, S: Show]( // socket: MonixWS[F, S] // ): F[Observer[S]] = socket.sink // implicit def toObservable[F[_]: Sync, S: Show]( // socket: MonixWS[F, S] // ): Observable[MessageEvent] = socket.source } class MonixWS[F[_], S](val url: String)(implicit F: Sync[F], S: Show[S]) { val ws = new org.scalajs.dom.WebSocket(url) lazy val source: Observable[MessageEvent] = Observable.create[MessageEvent](OverflowStrategy.Unbounded) { sub => ws.onmessage = (e: MessageEvent) => sub.onNext(e) ws.onerror = (e: Event) => sub.onError(new Exception(s"Error in WebSocket: $e")) Cancelable(() => ws.close()) } lazy val sink: F[Observer[S]] = { F.delay { new Observer[S] { override def onNext(elem: S): Future[Ack] = { ws.send(S.show(elem)) Future.successful(Ack.Continue) } override def onError(ex: Throwable): Unit = OutwatchTracing.errorSubject.onNext(ex) override def onComplete(): Unit = () } } } }