// package org.gerweck.scalafx.akka // import language.implicitConversions // import scala.collection.mutable.{ Clearable, Growable } // import scala.concurrent._ // import scala.concurrent.duration._ // import scala.util._ // import akka.{ Done, NotUsed } // import akka.stream.Materializer // import akka.stream.scaladsl._ // import scalafx.application.Platform.runLater // import scalafx.beans.Observable // import scalafx.beans.property._ // import org.gerweck.scalafx.util.FutureObservable // trait AkkaFXCollections extends Any { // implicit def sourceToRichSource[A, B](source: Source[A, B]) = new AkkaFXCollections.RichSource(source) // } // object AkkaFXCollections { // def collectionWriterSink[A] // (buffer: Observable with Growable[A] with Clearable, // clearFirst: Boolean, // groupingSize: Int = 50, groupingTimeout: FiniteDuration = 100.milliseconds) // : Sink[A, Future[Done]] = { // sealed trait PopulateAction // case class InsertRows(data: Seq[A]) extends PopulateAction // case object ClearData extends PopulateAction // val grouping = { // Flow[A] // .groupedWithin(groupingSize, groupingTimeout) // .map(InsertRows) // .named("GroupInsertActions") // } // val clearData = { // if (clearFirst) // Source.single(ClearData) // else // Source.empty // }.named("OptionalClearAction") // val combinedSource: Flow[A, PopulateAction, NotUsed] = grouping.prepend(clearData) // combinedSource .toMat { // Sink .foreach[PopulateAction] { // case ClearData => runLater { buffer.clear() } // case InsertRows(data) => runLater { buffer ++= data } // } .named("BufferWriter") // }(Keep.right) // } // class RichSource[A, B](val inner: Source[A, B]) extends AnyVal { // def populateCollection[C >: A] // (buffer: Observable with Growable[C] with Clearable, // clearFirst: Boolean, // groupingSize: Int = 50, groupingTimeout: FiniteDuration = 100.milliseconds) // (implicit mat: Materializer, ec: ExecutionContext) // : ReadOnlyObjectProperty[Option[Try[Done]]] = { // val sink = collectionWriterSink(buffer, clearFirst, groupingSize, groupingTimeout) // val graph = inner.toMat(sink)(Keep.right) // FutureObservable.ofTryOption(graph.run()) // } // } // }