From a9c7dc4a949a949ddf664d32aeb8fb6c611d8c80 Mon Sep 17 00:00:00 2001 From: Torben Nehmer Date: Fri, 23 Aug 2024 17:56:46 +0200 Subject: [PATCH] Add FSWatcher using RxNEt --- Stars Assistant/Helpers/FsWatcher.cs | 93 +++++++++++++++++++++++++ Stars Assistant/Helpers/RxExtensions.cs | 38 ++++++++++ 2 files changed, 131 insertions(+) create mode 100644 Stars Assistant/Helpers/FsWatcher.cs create mode 100644 Stars Assistant/Helpers/RxExtensions.cs diff --git a/Stars Assistant/Helpers/FsWatcher.cs b/Stars Assistant/Helpers/FsWatcher.cs new file mode 100644 index 0000000..d2587da --- /dev/null +++ b/Stars Assistant/Helpers/FsWatcher.cs @@ -0,0 +1,93 @@ +using System.Reactive.Linq; +using System.Reactive.Concurrency; + +namespace StarsAssistant.Helpers; + +public class FsWatcher +{ + /// + /// Throttle a FileSystemObserver and eliminate all duplicates. + /// + /// + /// Curtesy of https://endjin.com/blog/2024/05/observe-file-system-changes-with-rx-dotnet + /// + /// An observer created with ObserveFileSystem + /// Throttling window, defaults to 1 second. + /// Scheduler to user for throttling, default is set by Quiescent + /// A throttled observer with duplicate elimination. + public static IObservable> ThrottleAndDistinctObserver ( + IObservable watcher, + int inactivitySeconds = 1, + IScheduler? scheduler = null) + { + return watcher + .Quiescent(TimeSpan.FromSeconds(inactivitySeconds), scheduler) + .Select(changes => changes.DistinctBy(x => (x.ChangeType, x.FullPath))); + } + + /// + /// Helper to convert a FileSystemWatcher into an obervable stream. See FileSystemWatcher + /// documentation for further details on the parameters. + /// + /// + /// Curtesy of https://endjin.com/blog/2024/05/observe-file-system-changes-with-rx-dotnet + /// + /// The base directory to watch over. + /// An optional list of filter expressions to look for, default is no filter. + /// Scan subdirectories, default is exclude subdirs + /// Set this to override the default event selection to filter. + /// An observable for further use, usually throttled by Quiescent. + public static IObservable ObserveFileSystem( + string directoryPath, + IEnumerable? filters = null, + bool includeSubdirectories = false, + NotifyFilters? notifyFilter = null) + { + return + // Observable.Defer enables us to avoid doing any work + // until we have a subscriber. + Observable.Defer(() => + { + FileSystemWatcher fsw = new(directoryPath); + if (filters != null) + foreach (string filter in filters) + fsw.Filters.Add(filter); + if (notifyFilter != null) + fsw.NotifyFilter = (NotifyFilters) notifyFilter; + fsw.EnableRaisingEvents = true; + fsw.IncludeSubdirectories = includeSubdirectories; + + return Observable.Return(fsw); + }) + // Once the preceding part emits the FileSystemWatcher + // (which will happen when someone first subscribes), we + // want to wrap all the events as IObservables, for which + // we'll use a projection. To avoid ending up with an + // IObservable>, we use + // SelectMany, which effectively flattens it by one level. + .SelectMany(fsw => + Observable.Merge(new[] + { + Observable.FromEventPattern( + h => fsw.Created += h, h => fsw.Created -= h), + Observable.FromEventPattern( + h => fsw.Changed += h, h => fsw.Changed -= h), + Observable.FromEventPattern( + h => fsw.Renamed += h, h => fsw.Renamed -= h), + Observable.FromEventPattern( + h => fsw.Deleted += h, h => fsw.Deleted -= h) + }) + // FromEventPattern supplies both the sender and the event + // args. Extract just the latter. + .Select(ep => ep.EventArgs) + // The Finally here ensures the watcher gets shut down once + // we have no subscribers. + .Finally(fsw.Dispose)) + // This combination of Publish and RefCount means that multiple + // subscribers will get to share a single FileSystemWatcher, + // but that it gets shut down if all subscribers unsubscribe. + .Publish() + .RefCount(); + } + +} diff --git a/Stars Assistant/Helpers/RxExtensions.cs b/Stars Assistant/Helpers/RxExtensions.cs new file mode 100644 index 0000000..92df866 --- /dev/null +++ b/Stars Assistant/Helpers/RxExtensions.cs @@ -0,0 +1,38 @@ +using System.Reactive.Concurrency; +using System.Reactive.Linq; + +namespace StarsAssistant.Helpers; + +public static class RxExtensions +{ + /// + /// Wrap an observerble to throttle the incoming changes into a given window. Will delay and collect + /// any events fired until the minimum inactivity period has elapsed without events. + /// + /// Curtesy of https://endjin.com/blog/2024/05/observe-file-system-changes-with-rx-dotnet + /// The original type being observed + /// The source observable to throttle. + /// The minimum inactivity period to wait before propagating. + /// The scheduler to use, defaults to the default scheduler. + /// Wrapped observable. + public static IObservable> Quiescent( + this IObservable src, + TimeSpan minimumInactivityPeriod, + IScheduler? scheduler = null) + { + IScheduler realScheduler = scheduler ?? DefaultScheduler.Instance; + IObservable onoffs = + from _ in src + from delta in + Observable.Return(1, realScheduler) + .Concat(Observable.Return(-1, realScheduler) + .Delay(minimumInactivityPeriod, realScheduler)) + select delta; + + IObservable outstanding = onoffs.Scan(0, (total, delta) => total + delta); + IObservable zeroCrossings = outstanding.Where(total => total == 0); + + return src.Buffer(zeroCrossings); + } + +}