From 83d7f5f009a68a911a73a70d8087394da3daf478 Mon Sep 17 00:00:00 2001 From: Sarah Gerweck Date: Mon, 18 Apr 2016 21:15:14 -0700 Subject: [PATCH] Add `observableSource` converter --- .../org/gerweck/scalafx/akka/AkkaFX.scala | 46 ++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/src/main/scala/org/gerweck/scalafx/akka/AkkaFX.scala b/src/main/scala/org/gerweck/scalafx/akka/AkkaFX.scala index 8d48979..4a0023d 100644 --- a/src/main/scala/org/gerweck/scalafx/akka/AkkaFX.scala +++ b/src/main/scala/org/gerweck/scalafx/akka/AkkaFX.scala @@ -1,12 +1,15 @@ package org.gerweck.scalafx.akka -import scala.concurrent.Future +import scala.collection.mutable +import scala.concurrent._ import akka.Done +import akka.stream.OverflowStrategy import akka.stream.scaladsl._ import scalafx.application.Platform.runLater import scalafx.beans.property.Property +import scalafx.beans.value.ObservableValue /** A master object that exposes all the Akka-ScalaFX bridges. * @@ -28,4 +31,45 @@ trait AkkaStreamFX { } } } + + /** A [[akka.stream.scaladsl.Source]] that generates an event for each + * change of an [[scalafx.beans.value.ObservableValue]]. + * + * This source adds an `onChange` handler to the given `ObservableValue`. + * Each time it observes a change, the new value is pushed from the + * `Source`. The change handler is registered as soon as the source is + * materialized into a graph. It should be safe to use a single source + * in several graphs, as each will register its own change listener upon + * materialization. + * + * @param prop The value to observe. + * + * @param queueSize The maximum number of values to queue while waiting for + * the downstream flow to consume more data. + * + * @param overflow What to do when the queue is full because the downstream + * flow cannot keep up. The default behavior is to block, slowing the UI's + * main thread until some events are consumed, freeing space in the queue. + */ + def observableSource[A](prop: ObservableValue[_, A], + queueSize: Int = 10, + overflow: OverflowStrategy = OverflowStrategy.backpressure + )(implicit ec: ExecutionContext) = { + val src = Source + .queue[A](queueSize, overflow) + .mapMaterializedValue { m => + val sub = { + prop.onChange { (dta, oldV, newV) => + m.offer(newV) + } + } + m.watchCompletion foreach { c => + runLater { + sub.cancel() + } + } + m + } + src + } }