You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

300 lines
8.6 KiB

  1. package nova.monadic_sfx.implicits
  2. import javafx.beans.property.ObjectProperty
  3. import javafx.collections.ObservableList
  4. import javafx.scene.{input => jfxsi}
  5. import javafx.{event => jfxe}
  6. import monix.bio.Task
  7. import monix.execution.Ack
  8. import monix.execution.Cancelable
  9. import monix.execution.Scheduler
  10. import monix.reactive.Observable
  11. import monix.reactive.Observer
  12. import monix.reactive.OverflowStrategy
  13. import monix.tail.Iterant
  14. import monix.{eval => me}
  15. import scalafx.Includes._
  16. import scalafx.beans.property.Property
  17. import scalafx.beans.value.ObservableValue
  18. import scalafx.collections.ObservableBuffer
  19. import scalafx.scene.Scene
  20. import scalafx.scene.control.ButtonBase
  21. import scalafx.beans.property.ReadOnlyProperty
  22. import javafx.event.ActionEvent
  23. import javafx.event.EventHandler
  24. import javafx.scene.{control => jfxsc}
  25. import scalafx.scene.control.MenuItem
  26. import org.gerweck.scalafx.util._
  27. object JavaFXMonixObservables {
  28. implicit final class SceneObservables(private val scene: Scene)
  29. extends AnyVal {
  30. def observableMousePressed(): Observable[jfxsi.MouseEvent] = {
  31. import monix.execution.cancelables.SingleAssignCancelable
  32. Observable.create(OverflowStrategy.Unbounded) { sub =>
  33. val c = SingleAssignCancelable()
  34. val l = new jfxe.EventHandler[jfxsi.MouseEvent] {
  35. override def handle(event: jfxsi.MouseEvent): Unit = {
  36. if (sub.onNext(event) == Ack.Stop) c.cancel()
  37. }
  38. }
  39. scene.onMousePressed = l
  40. c := Cancelable(() =>
  41. scene.removeEventHandler(
  42. jfxsi.MouseEvent.MOUSE_PRESSED,
  43. l
  44. )
  45. )
  46. c
  47. }
  48. }
  49. def observableMouseDragged(): Observable[jfxsi.MouseEvent] = {
  50. import monix.execution.cancelables.SingleAssignCancelable
  51. Observable.create(OverflowStrategy.Unbounded) { sub =>
  52. val c = SingleAssignCancelable()
  53. val l = new jfxe.EventHandler[jfxsi.MouseEvent] {
  54. override def handle(event: jfxsi.MouseEvent): Unit = {
  55. if (sub.onNext(event) == Ack.Stop) c.cancel()
  56. }
  57. }
  58. scene.onMouseDragged = l
  59. c := Cancelable(() =>
  60. scene.removeEventHandler(
  61. jfxsi.MouseEvent.MOUSE_DRAGGED,
  62. l
  63. )
  64. )
  65. c
  66. }
  67. }
  68. }
  69. implicit final class BindObs[T, J](private val prop: Property[T, J])
  70. extends AnyVal {
  71. def -->(op: Observer[T]) = {
  72. op.onNext(prop.value)
  73. }
  74. def -->(op: Property[T, J]) = {
  75. op() = prop.value
  76. }
  77. def <--(obs: Observable[T])(implicit s: Scheduler) = {
  78. obs.doOnNext(v => me.Task(prop.value = v)).subscribe()
  79. }
  80. def asOption = prop.map(Option(_))
  81. def observableChange[J1 >: J]()
  82. : Observable[(ObservableValue[T, J], J1, J1)] = {
  83. import monix.execution.cancelables.SingleAssignCancelable
  84. Observable.create(OverflowStrategy.Unbounded) { sub =>
  85. val c = SingleAssignCancelable()
  86. val canc = prop.onChange((a, b, c) => sub.onNext((a, b, c)))
  87. c := Cancelable(() => canc.cancel())
  88. c
  89. }
  90. }
  91. }
  92. implicit final class BindObs2[A](private val prop: ObjectProperty[A])
  93. extends AnyVal {
  94. // def -->(sub: Var[A]) =
  95. // prop.onChange((a, b, c) => sub := c)
  96. def -->(sub: Observer[A]) =
  97. prop.onChange((a, b, c) => if (c != null) sub.onNext(c))
  98. // def -->[J1 >: A, T](
  99. // op: Observable[J1] => me.Task[T]
  100. // )(implicit s: Scheduler) = {
  101. // op(prop.observableChange().map(_._3)).runToFuture
  102. // }
  103. def -->(op: Property[A, A]) = {
  104. prop.onChange((a, b, c) => if (c != null) op() = c)
  105. }
  106. def <--(obs: Observable[A])(implicit s: Scheduler) = {
  107. obs.doOnNext(v => me.Task(prop() = v)).subscribe()
  108. }
  109. def observableChange[J1 >: A]()
  110. : Observable[(ObservableValue[A, A], J1, J1)] = {
  111. import monix.execution.cancelables.SingleAssignCancelable
  112. Observable.create(OverflowStrategy.Unbounded) { sub =>
  113. val c = SingleAssignCancelable()
  114. val canc = prop.onChange((a, b, c) => sub.onNext((a, b, c)))
  115. c := Cancelable(() => canc.cancel())
  116. c
  117. }
  118. }
  119. }
  120. implicit final class BindObs3[T, J](private val prop: ReadOnlyProperty[T, J])
  121. extends AnyVal {
  122. def -->(op: Observer[T]) = {
  123. op.onNext(prop.value)
  124. }
  125. def -->(op: Property[T, J]) = {
  126. op <== prop
  127. }
  128. // def <--(obs: Observable[T])(implicit s: Scheduler) = {
  129. // obs.doOnNext(v => me.Task(prop.value = v)).subscribe()
  130. // }
  131. def observableChange[J1 >: J]()
  132. : Observable[(ObservableValue[T, J], J1, J1)] = {
  133. import monix.execution.cancelables.SingleAssignCancelable
  134. Observable.create(OverflowStrategy.Unbounded) { sub =>
  135. val c = SingleAssignCancelable()
  136. val canc = prop.onChange((a, b, c) => sub.onNext((a, b, c)))
  137. c := Cancelable(() => canc.cancel())
  138. c
  139. }
  140. }
  141. }
  142. implicit final class ObjectPropertyObservableListExt[A](
  143. private val prop: ObjectProperty[ObservableList[A]]
  144. ) extends AnyVal {
  145. def <--(obs: Observable[Seq[A]])(implicit s: Scheduler) = {
  146. obs.doOnNext(v => me.Task(prop() = ObservableBuffer.from(v))).subscribe()
  147. }
  148. def -->(sub: Observer[A])(implicit s: Scheduler) =
  149. prop.onChange((a, b, c) =>
  150. if (c != null)
  151. Iterant[Task]
  152. .fromIterable(c.toIterable)
  153. .consume
  154. .use(consume(sub, _))
  155. .runToFuture
  156. )
  157. private def loop(sub: Observer[A], it: Iterator[A]): Task[Unit] =
  158. if (it.hasNext) {
  159. val next = it.next()
  160. Task.deferFuture(sub.onNext(next)).flatMap {
  161. case Ack.Continue => loop(sub, it)
  162. case Ack.Stop => Task.unit
  163. }
  164. } else Task.unit
  165. private def consume(
  166. sub: Observer[A],
  167. consumer: Iterant.Consumer[Task, A]
  168. ): Task[Unit] =
  169. consumer.pull.flatMap {
  170. case Left(value) => Task.unit
  171. case Right(value) =>
  172. Task.deferFuture(sub.onNext(value)).flatMap {
  173. case Ack.Continue => consume(sub, consumer)
  174. case Ack.Stop => Task.unit
  175. }
  176. }
  177. }
  178. implicit final class ObjectPropertyActionEvent(
  179. private val prop: ObjectProperty[EventHandler[ActionEvent]]
  180. ) extends AnyVal {
  181. // def <--(obs: Observable[ActionEvent])(implicit s: Scheduler) = {
  182. // obs.doOnNext(v => me.Task(prop() = ObservableBuffer.from(v))).subscribe()
  183. // }
  184. // def -->(sub: Observer[ActionEvent]) =
  185. // prop().
  186. }
  187. // def emit(prop: ObjectProperty[EventHandler[ActionEvent]]) =
  188. implicit final class OnActionObservable(
  189. private val button: ButtonBase
  190. ) extends AnyVal {
  191. // def -->[T](
  192. // op: Observable[jfxe.ActionEvent] => me.Task[T]
  193. // )(implicit s: Scheduler) = {
  194. // op(button.observableAction()).runToFuture
  195. // }
  196. // def -->(
  197. // sub: ConcurrentSubject[jfxe.ActionEvent, jfxe.ActionEvent]
  198. // ) = {
  199. // button.onAction = value => sub.onNext(value)
  200. // }
  201. def observableAction(): Observable[jfxe.ActionEvent] = {
  202. import monix.execution.cancelables.SingleAssignCancelable
  203. Observable.create(OverflowStrategy.Unbounded) { sub =>
  204. val c = SingleAssignCancelable()
  205. val l = new jfxe.EventHandler[jfxe.ActionEvent] {
  206. override def handle(event: jfxe.ActionEvent): Unit = {
  207. if (sub.onNext(event) == Ack.Stop) c.cancel()
  208. }
  209. }
  210. button.onAction = l
  211. c := Cancelable(() =>
  212. button.removeEventHandler(
  213. jfxe.ActionEvent.ACTION,
  214. l
  215. )
  216. )
  217. c
  218. }
  219. }
  220. }
  221. implicit final class MenuItemActionObservable(
  222. private val et: MenuItem
  223. ) extends AnyVal {
  224. // def -->[T](
  225. // op: Observable[jfxe.ActionEvent] => me.Task[T]
  226. // )(implicit s: Scheduler) = {
  227. // op(button.observableAction()).runToFuture
  228. // }
  229. // def -->(
  230. // sub: ConcurrentSubject[jfxe.ActionEvent, jfxe.ActionEvent]
  231. // ) = {
  232. // button.onAction = value => sub.onNext(value)
  233. // }
  234. def observableAction(): Observable[jfxe.ActionEvent] = {
  235. import monix.execution.cancelables.SingleAssignCancelable
  236. Observable.create(OverflowStrategy.Unbounded) { sub =>
  237. val c = SingleAssignCancelable()
  238. val l = new jfxe.EventHandler[jfxe.ActionEvent] {
  239. override def handle(event: jfxe.ActionEvent): Unit = {
  240. if (sub.onNext(event) == Ack.Stop) c.cancel()
  241. }
  242. }
  243. et.onAction = l
  244. c := Cancelable(() =>
  245. et.removeEventHandler(
  246. jfxe.ActionEvent.ACTION,
  247. l
  248. )
  249. )
  250. c
  251. }
  252. }
  253. }
  254. }