You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
98 lines
2.7 KiB
98 lines
2.7 KiB
package nova.monadic_sfx.util.controls
|
|
|
|
import monix.execution.Cancelable
|
|
import monix.execution.Scheduler
|
|
import monix.execution.cancelables.CompositeCancelable
|
|
import monix.reactive.Observable
|
|
import monix.reactive.Observer
|
|
import monix.{eval => me}
|
|
|
|
class ActionObservableExecutor[T](
|
|
private val delegate: Observable[T]
|
|
) extends AnyVal {
|
|
def -->(sub: Observer[T])(implicit s: Scheduler) =
|
|
delegate
|
|
.doOnNext(el => me.Task.deferFuture(sub.onNext(el)) >> me.Task.unit)
|
|
.subscribe()
|
|
|
|
}
|
|
class ActionObservableBuilder[A](
|
|
private val observableAction: Observable[A]
|
|
) extends AnyVal {
|
|
def useLazyEval[T](v: => me.Task[T]) =
|
|
new ActionObservableExecutor[T](observableAction.mapEval(_ => v))
|
|
|
|
def useEval[T](cb: A => me.Task[T]) =
|
|
new ActionObservableExecutor[T](
|
|
observableAction.mapEval(cb)
|
|
)
|
|
|
|
def useIterableEval[T](cb: A => collection.immutable.Iterable[T]) =
|
|
new ActionObservableExecutor[T](
|
|
observableAction.flatMap(a =>
|
|
Observable.suspend(Observable.fromIterable(cb(a)))
|
|
)
|
|
)
|
|
|
|
def doOnNext(cb: A => me.Task[Unit]): ActionObservableBuilder[A] =
|
|
new ActionObservableBuilder(observableAction.doOnNext(cb))
|
|
|
|
def mapEval[B](cb: A => me.Task[B]) =
|
|
new ActionObservableBuilder(observableAction.mapEval(cb))
|
|
|
|
def underlying = observableAction
|
|
|
|
// Caution: Experimental stuff below..
|
|
|
|
def useEval2[B, C](f: A => me.Task[B], g: A => me.Task[C]) =
|
|
new ActionObservableExecutor[(B, C)](
|
|
observableAction.publishSelector(conn =>
|
|
conn
|
|
.mapEval(f)
|
|
.switchMap(b =>
|
|
conn.mapEval(a =>
|
|
for {
|
|
c <- g(a)
|
|
} yield (b, c)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
|
|
def bifurcate[B, C](
|
|
f: ActionObservableBuilder[A] => B,
|
|
g: ActionObservableBuilder[A] => C
|
|
)(implicit s: Scheduler) =
|
|
observableAction
|
|
.publishSelector(conn =>
|
|
Observable(
|
|
Observable.unit.doOnNext(_ =>
|
|
me.Task(f(new ActionObservableBuilder[A](conn))) >> me.Task.unit
|
|
),
|
|
Observable.unit.doOnNext(_ =>
|
|
me.Task(g(new ActionObservableBuilder[A](conn))) >> me.Task.unit
|
|
)
|
|
).merge
|
|
)
|
|
.subscribe()
|
|
|
|
def split(
|
|
lst: (ActionObservableBuilder[A] => Cancelable)*
|
|
)(implicit s: Scheduler): Cancelable = {
|
|
val comp = CompositeCancelable()
|
|
comp += observableAction
|
|
.publishSelector(conn =>
|
|
Observable(
|
|
lst.map(f =>
|
|
Observable.unit.doOnNext(_ =>
|
|
me.Task(
|
|
comp += f(new ActionObservableBuilder[A](conn))
|
|
) >> me.Task.unit
|
|
)
|
|
): _*
|
|
).merge
|
|
)
|
|
.subscribe()
|
|
}
|
|
|
|
}
|