package wow.doge.mygame.utils import cats.kernel.Eq import cats.syntax.show._ import com.typesafe.scalalogging.Logger import monix.bio.Task import monix.execution.Ack import monix.execution.Cancelable import monix.execution.cancelables.SingleAssignCancelable import monix.reactive.Observable import monix.reactive.OverflowStrategy object MonixDirectoryWatcher { import better.files._ import io.methvin.better.files._ private val logger = Logger[MonixDirectoryWatcher.type] sealed trait WatchEvent extends Product with Serializable final case class CreateEvent(file: File, count: Int) extends WatchEvent final case class ModifyEvent(file: File, count: Int) extends WatchEvent final case class DeleteEvent(file: File, count: Int) extends WatchEvent object WatchEvent { implicit val eq = Eq.fromUniversalEquals[WatchEvent] } @SuppressWarnings(Array("org.wartremover.warts.Equals")) def apply(path: os.Path) = Task.deferAction(implicit s => Task( Observable .create[WatchEvent](OverflowStrategy.DropNew(50)) { sub => import sub.scheduler val c = SingleAssignCancelable() val watcher = new RecursiveFileMonitor( File(path.toString), logger = logger.underlying ) { override def onCreate(file: File, count: Int) = if (sub.onNext(CreateEvent(file, count)) == Ack.Stop) c.cancel() override def onModify(file: File, count: Int) = if (sub.onNext(ModifyEvent(file, count)) == Ack.Stop) c.cancel() override def onDelete(file: File, count: Int) = if (sub.onNext(DeleteEvent(file, count)) == Ack.Stop) c.cancel() } watcher.start()(scheduler) c := Cancelable(() => watcher.stop()) c } .publish .refCount ) ) }