Add new mechanism for streaming to a collection

This commit is contained in:
Sarah Gerweck 2016-07-30 15:05:45 -07:00
parent a16adb85c5
commit e50159b82e
2 changed files with 77 additions and 2 deletions

View File

@ -15,9 +15,9 @@ import scalafx.beans.value.ObservableValue
*
* @author Sarah Gerweck <sarah.a180@gmail.com>
*/
object AkkaFX extends AkkaStreamFX
object AkkaFX extends AkkaStreamFX with AkkaFXCollections
trait AkkaStreamFX {
trait AkkaStreamFX extends Any {
/** A [[akka.stream.scaladsl.Sink]] that sends all values to a
* [[scalafx.beans.property.Property]].
*

View File

@ -0,0 +1,75 @@
package org.gerweck.scalafx.akka
import language.implicitConversions
import scala.collection.generic.{ Clearable, Growable }
import scala.concurrent._
import scala.concurrent.duration._
import scala.util._
import akka.{ Done, NotUsed }
import akka.stream.Materializer
import akka.stream.scaladsl._
import scalafx.application.Platform.runLater
import scalafx.beans.Observable
import scalafx.beans.property._
import org.log4s._
import org.gerweck.scalafx.util.FutureObservable
trait AkkaFXCollections extends Any {
implicit def sourceToRichSource[A, B](source: Source[A, B]) = new AkkaFXCollections.RichSource(source)
}
object AkkaFXCollections {
def collectionWriterSink[A]
(buffer: Observable with Growable[A] with Clearable,
clearFirst: Boolean,
groupingSize: Int = 50, groupingTimeout: FiniteDuration = 100.milliseconds)
(implicit ec: ExecutionContext)
: Sink[A, Future[Done]] = {
sealed trait PopulateAction
case class InsertRows(data: Seq[A]) extends PopulateAction
case object ClearData extends PopulateAction
val grouping = {
Flow[A]
.groupedWithin(groupingSize, groupingTimeout)
.map(InsertRows)
.named("GroupInsertActions")
}
val clearData = {
if (clearFirst)
Source.single(ClearData)
else
Source.empty
}.named("OptionalClearAction")
val combinedSource: Flow[A, PopulateAction, NotUsed] = grouping.prepend(clearData)
combinedSource .toMat {
Sink .foreach[PopulateAction] {
case ClearData => runLater { buffer.clear() }
case InsertRows(data) => runLater { buffer ++= data }
} .named("BufferWriter")
}(Keep.right)
}
class RichSource[A, B](val inner: Source[A, B]) extends AnyVal {
def populateCollection[C >: A]
(buffer: Observable with Growable[C] with Clearable,
clearFirst: Boolean,
groupingSize: Int = 50, groupingTimeout: FiniteDuration = 100.milliseconds)
(implicit mat: Materializer, ec: ExecutionContext)
: ReadOnlyObjectProperty[Option[Try[Done]]] = {
val sink = collectionWriterSink(buffer, clearFirst, groupingSize, groupingTimeout)
val graph = inner.toMat(sink)(Keep.right)
FutureObservable.ofTryOption(graph.run())
}
}
}