From e50159b82e3ac0c488f642360feba0cca5ead6c9 Mon Sep 17 00:00:00 2001 From: Sarah Gerweck Date: Sat, 30 Jul 2016 15:05:45 -0700 Subject: [PATCH] Add new mechanism for streaming to a collection --- .../org/gerweck/scalafx/akka/AkkaFX.scala | 4 +- .../scalafx/akka/AkkaFXCollections.scala | 75 +++++++++++++++++++ 2 files changed, 77 insertions(+), 2 deletions(-) create mode 100644 src/main/scala/org/gerweck/scalafx/akka/AkkaFXCollections.scala diff --git a/src/main/scala/org/gerweck/scalafx/akka/AkkaFX.scala b/src/main/scala/org/gerweck/scalafx/akka/AkkaFX.scala index 4a0023d..3d22f69 100644 --- a/src/main/scala/org/gerweck/scalafx/akka/AkkaFX.scala +++ b/src/main/scala/org/gerweck/scalafx/akka/AkkaFX.scala @@ -15,9 +15,9 @@ import scalafx.beans.value.ObservableValue * * @author Sarah Gerweck */ -object AkkaFX extends AkkaStreamFX +object AkkaFX extends AkkaStreamFX with AkkaFXCollections -trait AkkaStreamFX { +trait AkkaStreamFX extends Any { /** A [[akka.stream.scaladsl.Sink]] that sends all values to a * [[scalafx.beans.property.Property]]. * diff --git a/src/main/scala/org/gerweck/scalafx/akka/AkkaFXCollections.scala b/src/main/scala/org/gerweck/scalafx/akka/AkkaFXCollections.scala new file mode 100644 index 0000000..4882539 --- /dev/null +++ b/src/main/scala/org/gerweck/scalafx/akka/AkkaFXCollections.scala @@ -0,0 +1,75 @@ +package org.gerweck.scalafx.akka + +import language.implicitConversions + +import scala.collection.generic.{ Clearable, Growable } +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.util._ + +import akka.{ Done, NotUsed } +import akka.stream.Materializer +import akka.stream.scaladsl._ + +import scalafx.application.Platform.runLater +import scalafx.beans.Observable +import scalafx.beans.property._ + +import org.log4s._ + +import org.gerweck.scalafx.util.FutureObservable + +trait AkkaFXCollections extends Any { + implicit def sourceToRichSource[A, B](source: Source[A, B]) = new AkkaFXCollections.RichSource(source) +} + +object AkkaFXCollections { + def collectionWriterSink[A] + (buffer: Observable with Growable[A] with Clearable, + clearFirst: Boolean, + groupingSize: Int = 50, groupingTimeout: FiniteDuration = 100.milliseconds) + (implicit ec: ExecutionContext) + : Sink[A, Future[Done]] = { + sealed trait PopulateAction + case class InsertRows(data: Seq[A]) extends PopulateAction + case object ClearData extends PopulateAction + + val grouping = { + Flow[A] + .groupedWithin(groupingSize, groupingTimeout) + .map(InsertRows) + .named("GroupInsertActions") + } + + val clearData = { + if (clearFirst) + Source.single(ClearData) + else + Source.empty + }.named("OptionalClearAction") + + val combinedSource: Flow[A, PopulateAction, NotUsed] = grouping.prepend(clearData) + combinedSource .toMat { + Sink .foreach[PopulateAction] { + case ClearData => runLater { buffer.clear() } + case InsertRows(data) => runLater { buffer ++= data } + } .named("BufferWriter") + }(Keep.right) + } + + class RichSource[A, B](val inner: Source[A, B]) extends AnyVal { + def populateCollection[C >: A] + (buffer: Observable with Growable[C] with Clearable, + clearFirst: Boolean, + groupingSize: Int = 50, groupingTimeout: FiniteDuration = 100.milliseconds) + (implicit mat: Materializer, ec: ExecutionContext) + : ReadOnlyObjectProperty[Option[Try[Done]]] = { + + val sink = collectionWriterSink(buffer, clearFirst, groupingSize, groupingTimeout) + + val graph = inner.toMat(sink)(Keep.right) + + FutureObservable.ofTryOption(graph.run()) + } + } +}