You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
205 lines
6.0 KiB
205 lines
6.0 KiB
package nova.monadic_sfx.implicits
|
|
|
|
import javafx.beans.property.ObjectProperty
|
|
import javafx.collections.ObservableList
|
|
import javafx.scene.{input => jfxsi}
|
|
import javafx.{event => jfxe}
|
|
import monix.bio.Task
|
|
import monix.execution.Ack
|
|
import monix.execution.Cancelable
|
|
import monix.execution.Scheduler
|
|
import monix.reactive.Observable
|
|
import monix.reactive.Observer
|
|
import monix.reactive.OverflowStrategy
|
|
import monix.tail.Iterant
|
|
import monix.{eval => me}
|
|
import scalafx.Includes._
|
|
import scalafx.beans.property.Property
|
|
import scalafx.beans.value.ObservableValue
|
|
import scalafx.collections.ObservableBuffer
|
|
import scalafx.scene.Scene
|
|
import scalafx.scene.control.ButtonBase
|
|
|
|
object JavaFXMonixObservables {
|
|
|
|
implicit final class SceneObservables(private val scene: Scene)
|
|
extends AnyVal {
|
|
|
|
def observableMousePressed(): Observable[jfxsi.MouseEvent] = {
|
|
import monix.execution.cancelables.SingleAssignCancelable
|
|
Observable.create(OverflowStrategy.Unbounded) { sub =>
|
|
val c = SingleAssignCancelable()
|
|
val l = new jfxe.EventHandler[jfxsi.MouseEvent] {
|
|
override def handle(event: jfxsi.MouseEvent): Unit = {
|
|
if (sub.onNext(event) == Ack.Stop) c.cancel()
|
|
}
|
|
}
|
|
|
|
scene.onMousePressed = l
|
|
c := Cancelable(() =>
|
|
scene.removeEventHandler(
|
|
jfxsi.MouseEvent.MOUSE_PRESSED,
|
|
l
|
|
)
|
|
)
|
|
c
|
|
}
|
|
}
|
|
|
|
def observableMouseDragged(): Observable[jfxsi.MouseEvent] = {
|
|
import monix.execution.cancelables.SingleAssignCancelable
|
|
Observable.create(OverflowStrategy.Unbounded) { sub =>
|
|
val c = SingleAssignCancelable()
|
|
val l = new jfxe.EventHandler[jfxsi.MouseEvent] {
|
|
override def handle(event: jfxsi.MouseEvent): Unit = {
|
|
if (sub.onNext(event) == Ack.Stop) c.cancel()
|
|
}
|
|
}
|
|
|
|
scene.onMouseDragged = l
|
|
c := Cancelable(() =>
|
|
scene.removeEventHandler(
|
|
jfxsi.MouseEvent.MOUSE_DRAGGED,
|
|
l
|
|
)
|
|
)
|
|
c
|
|
}
|
|
}
|
|
}
|
|
|
|
// implicit final class BindObs[T, J](private val prop: Property[T, J])
|
|
// extends AnyVal {
|
|
// def -->(op: Observer[T]) = {
|
|
// op.onNext(prop.value)
|
|
// }
|
|
|
|
// def <--(obs: Observable[T])(implicit s: Scheduler) = {
|
|
// obs.doOnNext(v => me.Task(prop.value = v)).subscribe()
|
|
// }
|
|
|
|
// def observableChange[J1 >: J]()
|
|
// : Observable[(ObservableValue[T, J], J1, J1)] = {
|
|
// import monix.execution.cancelables.SingleAssignCancelable
|
|
// Observable.create(OverflowStrategy.Unbounded) { sub =>
|
|
// val c = SingleAssignCancelable()
|
|
|
|
// val canc = prop.onChange((a, b, c) => sub.onNext((a, b, c)))
|
|
|
|
// c := Cancelable(() => canc.cancel())
|
|
// c
|
|
// }
|
|
// }
|
|
// }
|
|
|
|
implicit final class BindObs2[A](private val prop: ObjectProperty[A])
|
|
extends AnyVal {
|
|
|
|
// def -->(sub: Var[A]) =
|
|
// prop.onChange((a, b, c) => sub := c)
|
|
|
|
def -->(sub: Observer[A]) =
|
|
prop.onChange((a, b, c) => if (c != null) sub.onNext(c))
|
|
|
|
// def -->[J1 >: A, T](
|
|
// op: Observable[J1] => me.Task[T]
|
|
// )(implicit s: Scheduler) = {
|
|
// op(prop.observableChange().map(_._3)).runToFuture
|
|
// }
|
|
|
|
def <--(obs: Observable[A])(implicit s: Scheduler) = {
|
|
obs.doOnNext(v => me.Task(prop() = v)).subscribe()
|
|
}
|
|
|
|
def observableChange[J1 >: A]()
|
|
: Observable[(ObservableValue[A, A], J1, J1)] = {
|
|
import monix.execution.cancelables.SingleAssignCancelable
|
|
Observable.create(OverflowStrategy.Unbounded) { sub =>
|
|
val c = SingleAssignCancelable()
|
|
|
|
val canc = prop.onChange((a, b, c) => sub.onNext((a, b, c)))
|
|
|
|
c := Cancelable(() => canc.cancel())
|
|
c
|
|
}
|
|
}
|
|
}
|
|
|
|
implicit final class ObjectPropertyObservableListExt[A](
|
|
private val prop: ObjectProperty[ObservableList[A]]
|
|
) extends AnyVal {
|
|
def <--(obs: Observable[Seq[A]])(implicit s: Scheduler) = {
|
|
obs.doOnNext(v => me.Task(prop() = ObservableBuffer.from(v))).subscribe()
|
|
}
|
|
|
|
def -->(sub: Observer[A])(implicit s: Scheduler) =
|
|
prop.onChange((a, b, c) =>
|
|
if (c != null)
|
|
Iterant[Task]
|
|
.fromIterable(c.toIterable)
|
|
.consume
|
|
.use(consume(sub, _))
|
|
.runToFuture
|
|
)
|
|
|
|
private def loop(sub: Observer[A], it: Iterator[A]): Task[Unit] =
|
|
if (it.hasNext) {
|
|
val next = it.next()
|
|
Task.deferFuture(sub.onNext(next)).flatMap {
|
|
case Ack.Continue => loop(sub, it)
|
|
case Ack.Stop => Task.unit
|
|
}
|
|
} else Task.unit
|
|
|
|
private def consume(
|
|
sub: Observer[A],
|
|
consumer: Iterant.Consumer[Task, A]
|
|
): Task[Unit] =
|
|
consumer.pull.flatMap {
|
|
case Left(value) => Task.unit
|
|
case Right(value) =>
|
|
Task.deferFuture(sub.onNext(value)).flatMap {
|
|
case Ack.Continue => consume(sub, consumer)
|
|
case Ack.Stop => Task.unit
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
implicit final class OnActionObservable(
|
|
private val button: ButtonBase
|
|
) extends AnyVal {
|
|
// def -->[T](
|
|
// op: Observable[jfxe.ActionEvent] => me.Task[T]
|
|
// )(implicit s: Scheduler) = {
|
|
// op(button.observableAction()).runToFuture
|
|
// }
|
|
|
|
// def -->(
|
|
// sub: ConcurrentSubject[jfxe.ActionEvent, jfxe.ActionEvent]
|
|
// ) = {
|
|
// button.onAction = value => sub.onNext(value)
|
|
// }
|
|
|
|
def observableAction(): Observable[jfxe.ActionEvent] = {
|
|
import monix.execution.cancelables.SingleAssignCancelable
|
|
Observable.create(OverflowStrategy.Unbounded) { sub =>
|
|
val c = SingleAssignCancelable()
|
|
val l = new jfxe.EventHandler[jfxe.ActionEvent] {
|
|
override def handle(event: jfxe.ActionEvent): Unit = {
|
|
if (sub.onNext(event) == Ack.Stop) c.cancel()
|
|
}
|
|
}
|
|
|
|
button.onAction = l
|
|
c := Cancelable(() =>
|
|
button.removeEventHandler(
|
|
jfxe.ActionEvent.ACTION,
|
|
l
|
|
)
|
|
)
|
|
c
|
|
}
|
|
}
|
|
}
|
|
}
|