using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace StarsAssistant.Helpers;
public static class RxExtensions
{
///
/// Wrap an observerble to throttle the incoming changes into a given window. Will delay and collect
/// any events fired until the minimum inactivity period has elapsed without events.
///
/// Curtesy of https://endjin.com/blog/2024/05/observe-file-system-changes-with-rx-dotnet
/// The original type being observed
/// The source observable to throttle.
/// The minimum inactivity period to wait before propagating.
/// The scheduler to use, defaults to the default scheduler.
/// Wrapped observable.
public static IObservable> Quiescent(
this IObservable src,
TimeSpan minimumInactivityPeriod,
IScheduler? scheduler = null)
{
IScheduler realScheduler = scheduler ?? DefaultScheduler.Instance;
IObservable onoffs =
from _ in src
from delta in
Observable.Return(1, realScheduler)
.Concat(Observable.Return(-1, realScheduler)
.Delay(minimumInactivityPeriod, realScheduler))
select delta;
IObservable outstanding = onoffs.Scan(0, (total, delta) => total + delta);
IObservable zeroCrossings = outstanding.Where(total => total == 0);
return src.Buffer(zeroCrossings);
}
}