using System.Reactive.Concurrency; using System.Reactive.Linq; namespace StarsAssistant.Helpers; public static class RxExtensions { /// /// Wrap an observerble to throttle the incoming changes into a given window. Will delay and collect /// any events fired until the minimum inactivity period has elapsed without events. /// /// Curtesy of https://endjin.com/blog/2024/05/observe-file-system-changes-with-rx-dotnet /// The original type being observed /// The source observable to throttle. /// The minimum inactivity period to wait before propagating. /// The scheduler to use, defaults to the default scheduler. /// Wrapped observable. public static IObservable> Quiescent( this IObservable src, TimeSpan minimumInactivityPeriod, IScheduler? scheduler = null) { IScheduler realScheduler = scheduler ?? DefaultScheduler.Instance; IObservable onoffs = from _ in src from delta in Observable.Return(1, realScheduler) .Concat(Observable.Return(-1, realScheduler) .Delay(minimumInactivityPeriod, realScheduler)) select delta; IObservable outstanding = onoffs.Scan(0, (total, delta) => total + delta); IObservable zeroCrossings = outstanding.Where(total => total == 0); return src.Buffer(zeroCrossings); } }