You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

350 lines
9.9 KiB

  1. package nova.monadic_sfx.implicits
  2. import javafx.beans.property.ObjectProperty
  3. import javafx.collections.ObservableList
  4. import javafx.event.ActionEvent
  5. import javafx.event.EventHandler
  6. import javafx.scene.{input => jfxsi}
  7. import javafx.{event => jfxe}
  8. import monix.bio.Task
  9. import monix.eval.Coeval
  10. import monix.execution.Ack
  11. import monix.execution.Cancelable
  12. import monix.execution.Scheduler
  13. import monix.execution.cancelables.SingleAssignCancelable
  14. import monix.reactive.Observable
  15. import monix.reactive.Observer
  16. import monix.reactive.OverflowStrategy
  17. import monix.tail.Iterant
  18. import monix.{eval => me}
  19. import org.gerweck.scalafx.util._
  20. import scalafx.Includes._
  21. import scalafx.beans.property.Property
  22. import scalafx.beans.property.ReadOnlyProperty
  23. import scalafx.collections.ObservableBuffer
  24. import scalafx.event.subscriptions.Subscription
  25. import scalafx.scene.Scene
  26. import scalafx.scene.control.ButtonBase
  27. import scalafx.scene.control.MenuItem
  28. trait JavaFXMonixObservables {
  29. import JavaFXMonixObservables._
  30. implicit def extendedScene(scene: Scene) = new SceneExt(scene)
  31. implicit def extendedProperty[T, J](
  32. propery: Property[T, J]
  33. ): PropertyExt[T, J] =
  34. new PropertyExt(propery)
  35. implicit def extendedObjectPropety[A](prop: ObjectProperty[A]) =
  36. new ObjectPropertyExt[A](prop)
  37. implicit def extendedReadOnlyObjectPropety[T, J](
  38. prop: ReadOnlyProperty[T, J]
  39. ) =
  40. new ReadOnlyPropertyExt[T, J](prop)
  41. implicit def extendedObservableList[A](
  42. list: ObservableList[A]
  43. ) = new ObservableListExt(list)
  44. implicit def extendedObjectPropertyObservableList[A](
  45. prop: ObjectProperty[ObservableList[A]]
  46. ) = new ObjectPropertyObservableListExt(prop)
  47. implicit def extendedButton(button: ButtonBase) = new ButtonBaseExt(button)
  48. implicit def extendedMenuItem(item: MenuItem) = new MenuItemExt(item)
  49. }
  50. object JavaFXMonixObservables {
  51. final class SceneExt(private val scene: Scene) extends AnyVal {
  52. def observableMousePressed(): Observable[jfxsi.MouseEvent] = {
  53. import monix.execution.cancelables.SingleAssignCancelable
  54. Observable.create(OverflowStrategy.Unbounded) { sub =>
  55. val c = SingleAssignCancelable()
  56. val l = new jfxe.EventHandler[jfxsi.MouseEvent] {
  57. override def handle(event: jfxsi.MouseEvent): Unit = {
  58. if (sub.onNext(event) == Ack.Stop) c.cancel()
  59. }
  60. }
  61. scene.onMousePressed = l
  62. c := Cancelable(() =>
  63. scene.removeEventHandler(
  64. jfxsi.MouseEvent.MOUSE_PRESSED,
  65. l
  66. )
  67. )
  68. c
  69. }
  70. }
  71. def observableMouseDragged(): Observable[jfxsi.MouseEvent] = {
  72. import monix.execution.cancelables.SingleAssignCancelable
  73. Observable.create(OverflowStrategy.Unbounded) { sub =>
  74. val c = SingleAssignCancelable()
  75. val l = new jfxe.EventHandler[jfxsi.MouseEvent] {
  76. override def handle(event: jfxsi.MouseEvent): Unit = {
  77. if (sub.onNext(event) == Ack.Stop) c.cancel()
  78. }
  79. }
  80. scene.onMouseDragged = l
  81. c := Cancelable(() =>
  82. scene.removeEventHandler(
  83. jfxsi.MouseEvent.MOUSE_DRAGGED,
  84. l
  85. )
  86. )
  87. c
  88. }
  89. }
  90. }
  91. final class PropertyExt[T, J](private val prop: Property[T, J])
  92. extends AnyVal {
  93. def -->(op: Observer[T]) = {
  94. op.onNext(prop.value)
  95. }
  96. def ==>(op: Property[T, J]) = {
  97. op <== prop
  98. }
  99. def <--(obs: Observable[T])(implicit s: Scheduler) = {
  100. obs.doOnNextF(v => Coeval(prop.value = v)).subscribe()
  101. }
  102. def asOption = prop.map(Option(_))
  103. def observableChange[J1 >: J]: Observable[J1] = {
  104. import monix.execution.cancelables.SingleAssignCancelable
  105. Observable.create(OverflowStrategy.Unbounded) { sub =>
  106. val c = SingleAssignCancelable()
  107. val canc =
  108. prop.onChange((a, b, c1) =>
  109. if (c1 != null && sub.onNext(c1) == Ack.Stop) c.cancel()
  110. )
  111. c := Cancelable(() => canc.cancel())
  112. c
  113. }
  114. }
  115. }
  116. final class ObjectPropertyExt[A](private val prop: ObjectProperty[A])
  117. extends AnyVal {
  118. def -->(sub: Observer[A]) =
  119. prop.onChange((a, b, c) => if (c != null) sub.onNext(c))
  120. def -->(op: Property[A, A]) = {
  121. prop.onChange((a, b, c) => if (c != null) op() = c)
  122. }
  123. def <--(obs: Observable[A])(implicit s: Scheduler) = {
  124. obs.doOnNextF(v => Coeval(prop() = v)).subscribe()
  125. }
  126. def observableChange[J1 >: A]: Observable[J1] = {
  127. import monix.execution.cancelables.SingleAssignCancelable
  128. Observable.create(OverflowStrategy.Unbounded) { sub =>
  129. val c = SingleAssignCancelable()
  130. val canc = prop.onChange((_, _, c1) =>
  131. if (c1 != null && sub.onNext(c1) == Ack.Stop) c.cancel()
  132. )
  133. c := Cancelable(() => canc.cancel())
  134. c
  135. }
  136. }
  137. }
  138. final class ObservableListExt[A](
  139. private val buffer: ObservableList[A]
  140. ) extends AnyVal {
  141. // def -->(sub: Observer[A]) =
  142. // buffer.onChange((a, b, c) => if (c != null) sub.onNext(c))
  143. // def -->(op: Property[A, A]) = {
  144. // buffer.onChange((a, b, c) => if (c != null) op() = c)
  145. // }
  146. def <--(obs: Observable[A])(implicit s: Scheduler) = {
  147. obs
  148. .doOnNextF(v =>
  149. for {
  150. _ <- Coeval(buffer.clear())
  151. _ <- Coeval(buffer += v)
  152. } yield ()
  153. )
  154. .subscribe()
  155. }
  156. def observableChange[J1 >: A]: Observable[J1] = {
  157. import monix.execution.cancelables.SingleAssignCancelable
  158. Observable.create(OverflowStrategy.Unbounded) { sub =>
  159. val c = SingleAssignCancelable()
  160. implicit val s = sub.scheduler
  161. val canc =
  162. buffer.onChange((buf, _) =>
  163. loop(sub, buf.toIterable.iterator, c).runToFuture
  164. )
  165. c := Cancelable(() => canc.cancel())
  166. c
  167. }
  168. }
  169. private def loop(
  170. sub: Observer[A],
  171. it: Iterator[A],
  172. c: Cancelable
  173. ): Task[Unit] =
  174. if (it.hasNext) {
  175. val next = it.next()
  176. Task.deferFuture(sub.onNext(next)).flatMap {
  177. case Ack.Continue => loop(sub, it, c)
  178. case Ack.Stop => Task(c.cancel())
  179. }
  180. } else Task.unit
  181. }
  182. final class ReadOnlyPropertyExt[T, J](
  183. private val prop: ReadOnlyProperty[T, J]
  184. ) extends AnyVal {
  185. def -->(op: Observer[T]) = {
  186. op.onNext(prop.value)
  187. }
  188. def ==>(op: Property[T, J]) = {
  189. op <== prop
  190. }
  191. def observableChange[J1 >: J]: Observable[J1] = {
  192. import monix.execution.cancelables.SingleAssignCancelable
  193. Observable.create(OverflowStrategy.Unbounded) { sub =>
  194. val c = SingleAssignCancelable()
  195. val canc = prop.onChange((a, b, c1) =>
  196. if (c1 != null && sub.onNext(c1) == Ack.Stop) c.cancel()
  197. )
  198. c := Cancelable(() => canc.cancel())
  199. c
  200. }
  201. }
  202. }
  203. final class ObjectPropertyObservableListExt[A](
  204. private val prop: ObjectProperty[ObservableList[A]]
  205. ) extends AnyVal {
  206. def <--(obs: Observable[Seq[A]])(implicit s: Scheduler) = {
  207. obs.doOnNext(v => me.Task(prop() = ObservableBuffer.from(v))).subscribe()
  208. }
  209. def -->(sub: Observer[A])(implicit s: Scheduler) = {
  210. val c = SingleAssignCancelable()
  211. val subs: Subscription = prop.onChange((a, b, c1) =>
  212. if (c1 != null)
  213. Iterant[Task]
  214. .fromIterable(c1.toIterable)
  215. .consume
  216. .use(consume(sub, c, _))
  217. .runToFuture
  218. )
  219. c := Cancelable(() => subs.cancel())
  220. }
  221. private def loop(sub: Observer[A], it: Iterator[A]): Task[Unit] =
  222. if (it.hasNext) {
  223. val next = it.next()
  224. Task.deferFuture(sub.onNext(next)).flatMap {
  225. case Ack.Continue => loop(sub, it)
  226. case Ack.Stop => Task.unit
  227. }
  228. } else Task.unit
  229. private def consume(
  230. sub: Observer[A],
  231. c: Cancelable,
  232. consumer: Iterant.Consumer[Task, A]
  233. ): Task[Unit] =
  234. consumer.pull.flatMap {
  235. case Left(value) => Task.unit
  236. case Right(value) =>
  237. Task.deferFuture(sub.onNext(value)).flatMap {
  238. case Ack.Continue => consume(sub, c, consumer)
  239. case Ack.Stop => Task(c.cancel())
  240. }
  241. }
  242. }
  243. final class ObjectPropertyActionEvent(
  244. private val prop: ObjectProperty[EventHandler[ActionEvent]]
  245. ) extends AnyVal {
  246. // def <--(obs: Observable[ActionEvent])(implicit s: Scheduler) = {
  247. // obs.doOnNext(v => me.Task(prop() = ObservableBuffer.from(v))).subscribe()
  248. // }
  249. // def -->(sub: Observer[ActionEvent]) =
  250. // prop().
  251. }
  252. // def emit(prop: ObjectProperty[EventHandler[ActionEvent]]) =
  253. final class ButtonBaseExt(
  254. private val button: ButtonBase
  255. ) extends AnyVal {
  256. def observableAction: Observable[jfxe.ActionEvent] = {
  257. import monix.execution.cancelables.SingleAssignCancelable
  258. Observable.create(OverflowStrategy.Unbounded) { sub =>
  259. val c = SingleAssignCancelable()
  260. val l = new jfxe.EventHandler[jfxe.ActionEvent] {
  261. override def handle(event: jfxe.ActionEvent): Unit = {
  262. if (sub.onNext(event) == Ack.Stop) c.cancel()
  263. }
  264. }
  265. button.onAction = l
  266. c := Cancelable(() =>
  267. button.removeEventHandler(
  268. jfxe.ActionEvent.ACTION,
  269. l
  270. )
  271. )
  272. c
  273. }
  274. }
  275. }
  276. final class MenuItemExt(
  277. private val item: MenuItem
  278. ) extends AnyVal {
  279. def observableAction: Observable[jfxe.ActionEvent] = {
  280. import monix.execution.cancelables.SingleAssignCancelable
  281. Observable.create(OverflowStrategy.Unbounded) { sub =>
  282. val c = SingleAssignCancelable()
  283. val l = new jfxe.EventHandler[jfxe.ActionEvent] {
  284. override def handle(event: jfxe.ActionEvent): Unit = {
  285. if (sub.onNext(event) == Ack.Stop) c.cancel()
  286. }
  287. }
  288. item.onAction = l
  289. c := Cancelable(() =>
  290. item.removeEventHandler(
  291. jfxe.ActionEvent.ACTION,
  292. l
  293. )
  294. )
  295. c
  296. }
  297. }
  298. }
  299. }