From 537e1abc2912d6a3f6bf91f556b3dc805c694415 Mon Sep 17 00:00:00 2001 From: Rohan Sircar Date: Wed, 30 Dec 2020 13:51:33 +0530 Subject: [PATCH] Added webworker observable class and demo --- assets/worker.js | 18 +++ .../outwatchapp/util/reactive/WebWorker.scala | 122 ++++++++++++++++++ yarn.lock | 61 ++++----- 3 files changed, 168 insertions(+), 33 deletions(-) create mode 100644 assets/worker.js create mode 100644 src/main/scala/outwatchapp/util/reactive/WebWorker.scala diff --git a/assets/worker.js b/assets/worker.js new file mode 100644 index 0000000..a5bd5ce --- /dev/null +++ b/assets/worker.js @@ -0,0 +1,18 @@ +var i = 0; + +console.log("Starting worker") +onmessage = (ev) => { + console.log(`Worker received data ${ev.data}`) + const data = JSON.parse(ev.data) + postMessage(JSON.stringify({ data: data.data * 2 })) +} + +// function timedCount() { +// i = i + 1; +// postMessage(JSON.stringify({ data: i })); +// setTimeout("timedCount()", 2000); + + +// } + +// timedCount(); \ No newline at end of file diff --git a/src/main/scala/outwatchapp/util/reactive/WebWorker.scala b/src/main/scala/outwatchapp/util/reactive/WebWorker.scala new file mode 100644 index 0000000..2d9c2ff --- /dev/null +++ b/src/main/scala/outwatchapp/util/reactive/WebWorker.scala @@ -0,0 +1,122 @@ +package outwatchapp.util.reactive + +import scala.concurrent.Future + +import io.circe.Decoder +import io.circe.Encoder +import io.circe.Printer +import io.circe.generic.JsonCodec +import io.circe.parser._ +import io.circe.syntax._ +import monix.bio.Task +import monix.execution.Ack +import monix.execution.Cancelable +import monix.reactive.Observable +import monix.reactive.Observer +import monix.reactive.OverflowStrategy +import outwatchapp.util.reactive.MonixProSubject +import org.scalajs.dom.raw.Event +import org.scalajs.dom.raw.MessageEvent +import org.scalajs.dom.{raw => sjsdr} + +class WebWorkerImpl[T <: Product: Encoder: Decoder]( + worker: sjsdr.Worker, + overflowStrategy: OverflowStrategy.Synchronous[T] +) { + +// private def parseFn(data: T) = { +// data match { +// case _: AnyRef => parseRef(data) +// case s: String => +// case other => +// println(other) +// Left(WebWorker.Error) +// } +// } + +// private def parseRef(data: T): Either[WebWorker.Error, T] = { +// data match { +// case s: String => decode[T](s).leftMap(_ => WebWorker.Error) +// case a: Int => +// println(a) +// Left(WebWorker.Error) +// case other => +// println(other) +// Left(WebWorker.Error) +// } +// } +// private def parsePrimitive(data: T): Either[WebWorker.Error, T] = { +// data match { +// case s: String => decode[T](s).leftMap(_ => WebWorker.Error) +// case a: Int => +// println(a) +// Left(WebWorker.Error) +// case other => +// println(other) +// Left(WebWorker.Error) +// } +// } +// println(s"Got data $a") +// .map(sub.onNext).left.foreach(println) + + val printer = Printer.noSpaces + + lazy val source: Task[Observable[T]] = + Task.deferAction(implicit s => + for { + obs <- Task( + Observable + .create[T](overflowStrategy) { sub => + worker.onmessage = (e: MessageEvent) => + e.data match { + case s: String => + decode[T](s).map(sub.onNext).left.foreach(println) + case other => println(other) + } + worker.onerror = (e: Event) => + sub.onError(new Exception(s"Error in WebSocket: $e")) + Cancelable(() => worker.terminate()) + } + .publish + .refCount + ) + _ <- Task(obs.subscribe(Observer.empty)) + } yield obs + ) + + lazy val sink: Task[Observer[T]] = + Task( + new Observer[T] { + override def onNext(elem: T): Future[Ack] = { + val msg = printer.print(elem.asJson) + worker.postMessage(msg) + Future.successful(Ack.Continue) + } + override def onError(ex: Throwable): Unit = println(ex) + override def onComplete(): Unit = () + } + ) + +} +object WebWorker { + sealed trait Error + final case object Error extends Error + // val w = new sjsdr.Worker("/worker.js").asInstanceOf[Worker] + + // type MonixWebWorker[T] = MonixProSubject[T, T] + + def apply[T <: Product: Encoder: Decoder]( + path: String, + overflowStrategy: OverflowStrategy.Synchronous[T] = + OverflowStrategy.DropOld(50) + ): Task[WebWorker[T]] = for { + worker <- Task(new sjsdr.Worker(path)) + impl = new WebWorkerImpl[T](worker, overflowStrategy) + source <- impl.source + sink <- impl.sink + } yield MonixProSubject.from(sink, source) + +} + +@JsonCodec +case class WorkerData(data: Long) diff --git a/yarn.lock b/yarn.lock index 384fb34..464614e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -28,9 +28,9 @@ integrity sha512-tHq6qdbT9U1IRSGf14CL0pUlULksvY9OZ+5eEgl1N7t+OA3tGvNpxJCzuKQlsNgCVwbAs670L1vcVQi8j9HjnA== "@types/node@*": - version "14.14.10" - resolved "https://registry.yarnpkg.com/@types/node/-/node-14.14.10.tgz#5958a82e41863cfc71f2307b3748e3491ba03785" - integrity sha512-J32dgx2hw8vXrSbu4ZlVhn1Nm3GbeCFNw2FWL8S5QKucHGY0cyNwjdQdO+KMBZ4wpmC7KhLCiNsdk1RFRIYUQQ== + version "14.14.16" + resolved "https://registry.yarnpkg.com/@types/node/-/node-14.14.16.tgz#3cc351f8d48101deadfed4c9e4f116048d437b4b" + integrity sha512-naXYePhweTi+BMv11TgioE2/FXU4fSl29HAH1ffxVciNsH3rYXjNP2yM8wqmSm7jS20gM8TIklKiTen+1iVncw== "@webassemblyjs/ast@1.9.0": version "1.9.0" @@ -1230,9 +1230,9 @@ enhanced-resolve@^4.1.0: tapable "^1.0.0" errno@^0.1.3, errno@~0.1.7: - version "0.1.7" - resolved "https://registry.yarnpkg.com/errno/-/errno-0.1.7.tgz#4684d71779ad39af177e3f007996f7c67c852618" - integrity sha512-MfrRBDWzIWifgq6tJj60gkAwtLNb6sQPlcFrSOflcP1aFmmruKQ2wRnze/8V6kgyz7H3FF8Npzv78mZ7XLLflg== + version "0.1.8" + resolved "https://registry.yarnpkg.com/errno/-/errno-0.1.8.tgz#8bb3e9c7d463be4976ff888f76b4809ebc2e811f" + integrity sha512-dJ6oBr5SQ1VSd9qkk7ByRgb/1SH4JZjCHSW/mr63/QcXO9zLVxvJ6Oy13nio03rxpSnVDDjFor75SjVeZWPW/A== dependencies: prr "~1.0.1" @@ -1532,9 +1532,9 @@ flush-write-stream@^1.0.0: readable-stream "^2.3.6" follow-redirects@^1.0.0: - version "1.13.0" - resolved "https://registry.yarnpkg.com/follow-redirects/-/follow-redirects-1.13.0.tgz#b42e8d93a2a7eea5ed88633676d6597bc8e384db" - integrity sha512-aq6gF1BEKje4a9i9+5jimNFIpq4Q1WiwBToeRK5NvZBd/TRsmW8BsJfOEGkr76TbOyPVD3OVDN910EcUNtRYEA== + version "1.13.1" + resolved "https://registry.yarnpkg.com/follow-redirects/-/follow-redirects-1.13.1.tgz#5f69b813376cee4fd0474a3aba835df04ab763b7" + integrity sha512-SSG5xmZh1mkPGyKzjZP8zLjltIfpW32Y5QpdNJyjcfGxK3qo3NDDkZOZSFiGn1A6SclQxY9GzEwAHQ3dmYRWpg== for-in@^1.0.2: version "1.0.2" @@ -1615,9 +1615,9 @@ get-caller-file@^2.0.1: integrity sha512-DyFP3BM/3YHTQOCUL/w0OZHR0lpKeGrxotcHWcqNEdnltqFwXVfhEBQ94eIo34AfQpo0rGki4cyIiftY06h2Fg== get-intrinsic@^1.0.0: - version "1.0.1" - resolved "https://registry.yarnpkg.com/get-intrinsic/-/get-intrinsic-1.0.1.tgz#94a9768fcbdd0595a1c9273aacf4c89d075631be" - integrity sha512-ZnWP+AmS1VUaLgTRy47+zKtjTxz+0xMpx3I52i+aalBK1QP19ggLF3Db89KJX7kjfOfP2eoa01qc++GwPgufPg== + version "1.0.2" + resolved "https://registry.yarnpkg.com/get-intrinsic/-/get-intrinsic-1.0.2.tgz#6820da226e50b24894e08859469dc68361545d49" + integrity sha512-aeX0vrFm21ILl3+JpFFRNe9aUvp6VFZb2/CTbgLb8j75kOhvoNYjt9d8KA/tJG4gSo8nzEDedRl0h7vDmBYRVg== dependencies: function-bind "^1.1.1" has "^1.0.3" @@ -1795,9 +1795,9 @@ hpack.js@^2.1.6: wbuf "^1.1.0" html-entities@^1.3.1: - version "1.3.1" - resolved "https://registry.yarnpkg.com/html-entities/-/html-entities-1.3.1.tgz#fb9a1a4b5b14c5daba82d3e34c6ae4fe701a0e44" - integrity sha512-rhE/4Z3hIhzHAUKbW8jVcCyuT5oJCXXqhN/6mXXVCpzTmvJnoH2HL/bt3EZ6p55jbFJBeAe1ZNpL5BugLujxNA== + version "1.4.0" + resolved "https://registry.yarnpkg.com/html-entities/-/html-entities-1.4.0.tgz#cfbd1b01d2afaf9adca1b10ae7dffab98c71d2dc" + integrity sha512-8nxjcBcd8wovbeKx7h3wTji4e6+rhaVuPNpMqwWgnHh+N9ToqsCs6XztWRBPQ+UtzsoMAdKZtUENoVzU/EMtZA== http-deceiver@^1.2.7: version "1.2.7" @@ -1934,9 +1934,9 @@ inherits@2.0.3: integrity sha1-Yzwsg+PaQqUC9SRmAiSA9CCCYd4= ini@^1.3.4: - version "1.3.5" - resolved "https://registry.yarnpkg.com/ini/-/ini-1.3.5.tgz#eee25f56db1c9ec6085e0c22778083f596abf927" - integrity sha512-RZY5huIKCMRWDUqZlEi72f/lmXKMvuszcMBduliQ3nnWbx9X/ZBQO7DijMEYS9EhHBb2qacRUMtC7svLwe0lcw== + version "1.3.8" + resolved "https://registry.yarnpkg.com/ini/-/ini-1.3.8.tgz#a29da425b48806f34767a4efce397269af28432c" + integrity sha512-JV/yugV2uzW5iMRSiZAyDtQd+nxtUnjeLt0acNdw98kKLrvuRVyB80tsREOE7yvGVgalhZ6RNXCmEHkUKBKxew== internal-ip@^4.3.0: version "4.3.0" @@ -2455,16 +2455,11 @@ mime@1.6.0: resolved "https://registry.yarnpkg.com/mime/-/mime-1.6.0.tgz#32cd9e5c64553bd58d19a568af452acff04981b1" integrity sha512-x0Vn8spI+wuJ1O6S7gnbaQg8Pxh4NNHb7KSINmEWKiPE4RKOplvijn+NkmYmmRgP68mc70j2EbeTFRsrswaQeg== -mime@^2.0.3: +mime@^2.0.3, mime@^2.4.4: version "2.4.7" resolved "https://registry.yarnpkg.com/mime/-/mime-2.4.7.tgz#962aed9be0ed19c91fd7dc2ece5d7f4e89a90d74" integrity sha512-dhNd1uA2u397uQk3Nv5LM4lm93WYDUXFn3Fu291FJerns4jyTudqhIWe4W04YLy7Uk1tm1Ore04NpjRvQp/NPA== -mime@^2.4.4: - version "2.4.6" - resolved "https://registry.yarnpkg.com/mime/-/mime-2.4.6.tgz#e5b407c90db442f2beb5b162373d07b69affa4d1" - integrity sha512-RZKhC3EmpBchfTGBVb8fb+RL2cWyw/32lshnsETttkBAyAUXSGHxbEJWWRXc751DrIxG1q04b8QwMbAwkRPpUA== - mimic-fn@^2.0.0: version "2.1.0" resolved "https://registry.yarnpkg.com/mimic-fn/-/mimic-fn-2.1.0.tgz#7ed2c2ccccaf84d3ffcb7a69b57711fc2083401b" @@ -2550,11 +2545,16 @@ ms@2.1.1: resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.1.tgz#30a5864eb3ebb0a66f2ebe6d727af06a09d86e0a" integrity sha512-tgp+dl5cGk28utYktBsrFqA7HKgrhgPsg6Z/EfhWI4gl1Hwq8B/GmY/0oXZ6nF8hDVesS/FpnYaD/kOWhYQvyg== -ms@2.1.2, ms@^2.1.1: +ms@2.1.2: version "2.1.2" resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.2.tgz#d09d1f357b443f493382a8eb3ccd183872ae6009" integrity sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w== +ms@^2.1.1: + version "2.1.3" + resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.3.tgz#574c8138ce1d2b5861f0b44579dbadd60c6615b2" + integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== + multicast-dns-service-types@^1.1.0: version "1.1.0" resolved "https://registry.yarnpkg.com/multicast-dns-service-types/-/multicast-dns-service-types-1.1.0.tgz#899f11d9686e5e05cb91b35d5f0e63b773cfc901" @@ -3870,11 +3870,6 @@ typedarray@^0.0.6: resolved "https://registry.yarnpkg.com/typedarray/-/typedarray-0.0.6.tgz#867ac74e3864187b1d3d47d996a78ec5c8830777" integrity sha1-hnrHTjhkGHsdPUfZlqeOxciDB3c= -typescript@4: - version "4.1.3" - resolved "https://registry.yarnpkg.com/typescript/-/typescript-4.1.3.tgz#519d582bd94cba0cf8934c7d8e8467e473f53bb7" - integrity sha512-B3ZIOf1IKeH2ixgHhj6la6xdwR9QrLC5d1VKeCSY4tvkqhF2eqd9O7txNlS0PO3GrBAFIdr3L1ndNwteUbZLYg== - union-value@^1.0.0: version "1.0.1" resolved "https://registry.yarnpkg.com/union-value/-/union-value-1.0.1.tgz#0b6fe7b835aecda61c6ea4d4f02c14221e109847" @@ -4051,9 +4046,9 @@ webpack-cli@3.3.2: yargs "^12.0.5" webpack-dev-middleware@^3.7.2: - version "3.7.2" - resolved "https://registry.yarnpkg.com/webpack-dev-middleware/-/webpack-dev-middleware-3.7.2.tgz#0019c3db716e3fa5cecbf64f2ab88a74bab331f3" - integrity sha512-1xC42LxbYoqLNAhV6YzTYacicgMZQTqRd27Sim9wn5hJrX3I5nxYy1SxSd4+gjUFsz1dQFj+yEe6zEVmSkeJjw== + version "3.7.3" + resolved "https://registry.yarnpkg.com/webpack-dev-middleware/-/webpack-dev-middleware-3.7.3.tgz#0639372b143262e2b84ab95d3b91a7597061c2c5" + integrity sha512-djelc/zGiz9nZj/U7PTBi2ViorGJXEWo/3ltkPbDyxCXhhEXkW0ce99falaok4TPj+AsxLiXJR0EBOb0zh9fKQ== dependencies: memory-fs "^0.4.1" mime "^2.4.4"