using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace StarsAssistant.Helpers;
public static class RxExtensions
{
///
/// Wrap an observerble to throttle the incoming changes into a given window. Will delay and collect
/// any events fired until the minimum inactivity period has elapsed without events.
///
/// Curtesy of https://endjin.com/blog/2024/05/observe-file-system-changes-with-rx-dotnet
/// The original type being observed
/// The source observable to throttle.
/// The minimum inactivity period to wait before propagating.
/// The scheduler to use, defaults to the default scheduler.
/// Wrapped observable.
public static IObservable> Quiescent(
this IObservable src,
TimeSpan minimumInactivityPeriod,
IScheduler? scheduler = null)
{
IScheduler realScheduler = scheduler ?? DefaultScheduler.Instance;
IObservable onoffs =
from _ in src
from delta in
Observable.Return(1, realScheduler)
.Concat(Observable.Return(-1, realScheduler)
.Delay(minimumInactivityPeriod, realScheduler))
select delta;
IObservable outstanding = onoffs.Scan(0, (total, delta) => total + delta);
IObservable zeroCrossings = outstanding.Where(total => total == 0);
return src.Buffer(zeroCrossings);
}
///
/// Throttle a FileSystemObserver and eliminate all duplicates.
///
///
/// Curtesy of https://endjin.com/blog/2024/05/observe-file-system-changes-with-rx-dotnet
///
/// An observer created with ObserveFileSystem
/// Throttling window, defaults to 1 second.
/// Scheduler to user for throttling, default is set by Quiescent
/// A throttled observer with duplicate elimination.
public static IObservable> ThrottleAndDistinct (
this IObservable watcher,
int inactivitySeconds = 1,
IScheduler? scheduler = null)
{
return watcher
.Quiescent(TimeSpan.FromSeconds(inactivitySeconds), scheduler)
.Select(changes => changes.DistinctBy(x => (x.ChangeType, x.FullPath)));
}
}