You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

165 lines
5.3 KiB

3 years ago
  1. package outwatchapp.util.reactive
  2. import scala.concurrent.Future
  3. import io.circe.Decoder
  4. import io.circe.Encoder
  5. import io.circe.Printer
  6. import io.circe.parser._
  7. import io.circe.syntax._
  8. import monix.bio.Task
  9. import monix.execution.Ack
  10. import monix.execution.Cancelable
  11. import monix.execution.CancelablePromise
  12. import monix.execution.cancelables.SingleAssignCancelable
  13. import monix.reactive.Observable
  14. import monix.reactive.Observer
  15. import monix.reactive.OverflowStrategy
  16. import typings.reconnectingWebsocket.eventsMod.CloseEvent
  17. import typings.reconnectingWebsocket.eventsMod.ErrorEvent
  18. import typings.reconnectingWebsocket.eventsMod.Event
  19. import typings.reconnectingWebsocket.mod.{default => RW}
  20. import scalajs.js
  21. import typings.std.MessageEvent
  22. import typings.reconnectingWebsocket.mod.Options
  23. import outwatchapp.util.reactive.Exceptions.DecodeException
  24. import outwatchapp.util.reactive.Exceptions.WrongTypeException
  25. import monix.reactive.observers.Subscriber
  26. import monix.reactive.observers.BufferedSubscriber
  27. import outwatchapp.util.reactive.Exceptions.UseAfterClose
  28. import monix.bio.IO
  29. class ReconnectingWebSocketImpl[T: Encoder: Decoder](
  30. ws: RW,
  31. overflowStrategy: OverflowStrategy.Synchronous[T]
  32. ) {
  33. val printer = Printer.noSpaces
  34. /** @throws ReactiveException
  35. */
  36. val source: Task[Observable[T]] =
  37. Task.deferAction(implicit s =>
  38. for {
  39. obs <- Task(
  40. Observable
  41. .create[T](overflowStrategy) { sub =>
  42. val c = SingleAssignCancelable()
  43. val onmessage: js.Function1[MessageEvent[_], Unit] =
  44. _.data match {
  45. case s: String =>
  46. decode[T](s)
  47. .map { res =>
  48. if (sub.onNext(res) == Ack.Stop) c.cancel()
  49. res
  50. }
  51. .left
  52. .foreach(err =>
  53. sub.onError(
  54. DecodeException(
  55. s"Failed to decode $s. Error was $err"
  56. )
  57. )
  58. )
  59. case other =>
  60. sub.onError(
  61. WrongTypeException(s"Received wrong type: $other")
  62. )
  63. }
  64. val emptyMsgFn: js.Function1[MessageEvent[_], Unit] =
  65. _ => println("message fn not initialized")
  66. ws.onmessage = onmessage
  67. val onclose: js.Function1[CloseEvent, Unit] = a => {
  68. println(
  69. s"Websocket closing - ${a.code} ${a.wasClean} ${a.reason}"
  70. )
  71. sub.onComplete()
  72. }
  73. ws.onclose = onclose
  74. val onerror: js.Function1[ErrorEvent, Unit] = (e: Event) =>
  75. sub.onError(new Exception(s"Error in WebSocket: $e"))
  76. ws.onerror = onerror
  77. c := Cancelable { () => ws.onmessage = emptyMsgFn }
  78. }
  79. .publish
  80. .refCount
  81. )
  82. _ <- Task(obs.subscribe(Observer.empty))
  83. } yield obs
  84. )
  85. val sink: Task[Observer[T]] =
  86. Task.deferAction(implicit s =>
  87. Task(
  88. BufferedSubscriber(
  89. new Subscriber[T] {
  90. import monix.execution.FutureUtils
  91. import scala.concurrent.duration._
  92. override def scheduler = s
  93. override def onNext(elem: T): Future[Ack] = {
  94. val msg = printer.print(elem.asJson)
  95. if (ws.readyState == 2 || ws.readyState == 3)
  96. Ack.Stop
  97. else {
  98. FutureUtils
  99. .delayedResult(2.second)(ws.send(msg))
  100. .flatMap(_ => Ack.Continue)
  101. }
  102. }
  103. override def onError(ex: Throwable): Unit = s.reportFailure(ex)
  104. override def onComplete(): Unit = println("CLOSING WEBSOCKET 2 ")
  105. },
  106. OverflowStrategy.Default
  107. )
  108. )
  109. )
  110. }
  111. object ReconnectingWebSocket {
  112. sealed trait Error
  113. final case object Error extends Error
  114. /** @throws ReactiveException
  115. */
  116. final case class Source[T](value: Observable[T])
  117. /** A buffered Observer
  118. *
  119. * @param value
  120. */
  121. final case class Sink[T](value: Observer[T]) {
  122. def send(t: T) =
  123. IO.deferFuture(value.onNext(t)).flatMap {
  124. case Ack.Continue => Task.unit
  125. case Ack.Stop => IO.raiseError(UseAfterClose("Websocket was closed"))
  126. }
  127. }
  128. private val defaultOptions = Options()
  129. .setMinReconnectionDelay(5000)
  130. .setMaxRetries(2)
  131. .setMaxEnqueuedMessages(50)
  132. def onopen[A](op: => A): js.Function1[Event, Unit] = _ => op
  133. def apply[T <: Product: Encoder: Decoder](
  134. path: String,
  135. options: Options = defaultOptions,
  136. overflowStrategy: OverflowStrategy.Synchronous[T] =
  137. OverflowStrategy.DropOld(50)
  138. ) = for {
  139. websocket <- Task(new RW(path, js.undefined, options))
  140. p = CancelablePromise[Unit]()
  141. _ <- Task(websocket.onopen = onopen(p.success(())))
  142. _ <- Task.deferFuture(p.future)
  143. _ <- Task(websocket.onopen = onopen(()))
  144. impl = new ReconnectingWebSocketImpl[T](websocket, overflowStrategy)
  145. source <- impl.source
  146. sink <- impl.sink
  147. // _ <- Task.deferAction(implicit s => Task(source.subscribe(sink)))
  148. } yield Source(source) -> Sink(sink)
  149. }