sa/Stars Assistant/Helpers/RxExtensions.cs

38 lines
1.6 KiB
C#
Raw Normal View History

2024-08-23 15:56:46 +00:00
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace StarsAssistant.Helpers;
public static class RxExtensions
{
/// <summary>
/// Wrap an observerble to throttle the incoming changes into a given window. Will delay and collect
/// any events fired until the minimum inactivity period has elapsed without events.
/// </summary>
/// <remarks>Curtesy of https://endjin.com/blog/2024/05/observe-file-system-changes-with-rx-dotnet</remarks>
/// <typeparam name="T">The original type being observed</typeparam>
/// <param name="src">The source observable to throttle.</param>
/// <param name="minimumInactivityPeriod">The minimum inactivity period to wait before propagating.</param>
/// <param name="scheduler">The scheduler to use, defaults to the default scheduler.</param>
/// <returns>Wrapped observable.</returns>
public static IObservable<IList<T>> Quiescent<T>(
this IObservable<T> src,
TimeSpan minimumInactivityPeriod,
IScheduler? scheduler = null)
{
IScheduler realScheduler = scheduler ?? DefaultScheduler.Instance;
IObservable<int> onoffs =
from _ in src
from delta in
Observable.Return(1, realScheduler)
.Concat(Observable.Return(-1, realScheduler)
.Delay(minimumInactivityPeriod, realScheduler))
select delta;
IObservable<int> outstanding = onoffs.Scan(0, (total, delta) => total + delta);
IObservable<int> zeroCrossings = outstanding.Where(total => total == 0);
return src.Buffer(zeroCrossings);
}
}