jmonkey-test/src/main/scala/wow/doge/mygame/utils/MonixDirectoryWatcher.scala
2021-02-27 11:36:32 +05:30

66 lines
2.2 KiB
Scala

package wow.doge.mygame.utils
import cats.kernel.Eq
import cats.syntax.show._
import com.typesafe.scalalogging.Logger
import monix.bio.Task
import monix.execution.Ack
import monix.execution.Cancelable
import monix.execution.cancelables.SingleAssignCancelable
import monix.reactive.Observable
import monix.reactive.OverflowStrategy
object MonixDirectoryWatcher {
import better.files._
import io.methvin.better.files._
private val logger = Logger[MonixDirectoryWatcher.type]
sealed trait WatchEvent extends Product with Serializable
final case class CreateEvent(file: File, count: Int) extends WatchEvent
final case class ModifyEvent(file: File, count: Int) extends WatchEvent
final case class DeleteEvent(file: File, count: Int) extends WatchEvent
object WatchEvent {
implicit val eq = Eq.fromUniversalEquals[WatchEvent]
}
@SuppressWarnings(Array("org.wartremover.warts.Equals"))
def apply(path: os.Path) =
Task.deferAction(implicit s =>
Task(
Observable
.create[WatchEvent](OverflowStrategy.DropNew(50)) { sub =>
import sub.scheduler
val c = SingleAssignCancelable()
val watcher =
new RecursiveFileMonitor(
File(path.toString),
logger = logger.underlying
) {
override def onCreate(file: File, count: Int) =
// println(show"$file got created")
if (sub.onNext(CreateEvent(file, count)) == Ack.Stop)
c.cancel()
override def onModify(file: File, count: Int) = {
pprint.log(show"${file.toString} got modified $count times")
if (sub.onNext(ModifyEvent(file, count)) == Ack.Stop)
c.cancel()
}
override def onDelete(file: File, count: Int) =
// println(show"$file got deleted")
if (sub.onNext(DeleteEvent(file, count)) == Ack.Stop)
c.cancel()
}
watcher.start()(scheduler)
c := Cancelable(() => watcher.stop())
c
}
.publish
.refCount
)
)
}