Add FSWatcher using RxNEt
This commit is contained in:
		
							
								
								
									
										93
									
								
								Stars Assistant/Helpers/FsWatcher.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										93
									
								
								Stars Assistant/Helpers/FsWatcher.cs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,93 @@
 | 
			
		||||
using System.Reactive.Linq;
 | 
			
		||||
using System.Reactive.Concurrency;
 | 
			
		||||
 | 
			
		||||
namespace StarsAssistant.Helpers;
 | 
			
		||||
 | 
			
		||||
public class FsWatcher
 | 
			
		||||
{
 | 
			
		||||
    /// <summary>
 | 
			
		||||
    /// Throttle a FileSystemObserver and eliminate all duplicates.
 | 
			
		||||
    /// </summary>
 | 
			
		||||
    /// <remarks>
 | 
			
		||||
    /// Curtesy of https://endjin.com/blog/2024/05/observe-file-system-changes-with-rx-dotnet
 | 
			
		||||
    /// </remarks>
 | 
			
		||||
    /// <param name="watcher">An observer created with <c>ObserveFileSystem</c></param>
 | 
			
		||||
    /// <param name="inactivitySeconds">Throttling window, defaults to 1 second.</param>
 | 
			
		||||
    /// <param name="scheduler">Scheduler to user for throttling, default is set by <c>Quiescent</c></param>
 | 
			
		||||
    /// <returns>A throttled observer with duplicate elimination.</returns>
 | 
			
		||||
    public static IObservable<IEnumerable<FileSystemEventArgs>> ThrottleAndDistinctObserver (
 | 
			
		||||
        IObservable<FileSystemEventArgs> watcher, 
 | 
			
		||||
        int inactivitySeconds = 1,
 | 
			
		||||
        IScheduler? scheduler = null)
 | 
			
		||||
    {
 | 
			
		||||
        return watcher
 | 
			
		||||
                .Quiescent(TimeSpan.FromSeconds(inactivitySeconds), scheduler)
 | 
			
		||||
                .Select(changes => changes.DistinctBy(x => (x.ChangeType, x.FullPath)));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// <summary>
 | 
			
		||||
    /// Helper to convert a FileSystemWatcher into an obervable stream. See FileSystemWatcher
 | 
			
		||||
    /// documentation for further details on the parameters.
 | 
			
		||||
    /// </summary>
 | 
			
		||||
    /// <remarks>
 | 
			
		||||
    /// Curtesy of https://endjin.com/blog/2024/05/observe-file-system-changes-with-rx-dotnet
 | 
			
		||||
    /// </remarks>
 | 
			
		||||
    /// <param name="directoryPath">The base directory to watch over.</param>
 | 
			
		||||
    /// <param name="filters">An optional list of filter expressions to look for, default is no filter.</param>
 | 
			
		||||
    /// <param name="includeSubdirectories">Scan subdirectories, default is exclude subdirs</param>
 | 
			
		||||
    /// <param name="notifyFilter">Set this to override the default event selection to filter.</param>
 | 
			
		||||
    /// <returns>An observable for further use, usually throttled by Quiescent.</returns>
 | 
			
		||||
    public static IObservable<FileSystemEventArgs> ObserveFileSystem(
 | 
			
		||||
        string directoryPath, 
 | 
			
		||||
        IEnumerable<string>? filters = null, 
 | 
			
		||||
        bool includeSubdirectories = false, 
 | 
			
		||||
        NotifyFilters? notifyFilter = null)
 | 
			
		||||
    {
 | 
			
		||||
        return
 | 
			
		||||
            // Observable.Defer enables us to avoid doing any work
 | 
			
		||||
            // until we have a subscriber.
 | 
			
		||||
            Observable.Defer(() =>
 | 
			
		||||
                {
 | 
			
		||||
                    FileSystemWatcher fsw = new(directoryPath);
 | 
			
		||||
                    if (filters != null)
 | 
			
		||||
                        foreach (string filter in filters)
 | 
			
		||||
                            fsw.Filters.Add(filter);
 | 
			
		||||
                    if (notifyFilter != null)
 | 
			
		||||
                        fsw.NotifyFilter = (NotifyFilters) notifyFilter;
 | 
			
		||||
                    fsw.EnableRaisingEvents = true;
 | 
			
		||||
                    fsw.IncludeSubdirectories = includeSubdirectories;
 | 
			
		||||
 | 
			
		||||
                    return Observable.Return(fsw);
 | 
			
		||||
                })
 | 
			
		||||
                // Once the preceding part emits the FileSystemWatcher
 | 
			
		||||
                // (which will happen when someone first subscribes), we
 | 
			
		||||
                // want to wrap all the events as IObservable<T>s, for which
 | 
			
		||||
                // we'll use a projection. To avoid ending up with an
 | 
			
		||||
                // IObservable<IObservable<FileSystemEventArgs>>, we use
 | 
			
		||||
                // SelectMany, which effectively flattens it by one level.
 | 
			
		||||
                .SelectMany(fsw =>
 | 
			
		||||
                    Observable.Merge(new[]
 | 
			
		||||
                        {
 | 
			
		||||
                            Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
 | 
			
		||||
                                h => fsw.Created += h, h => fsw.Created -= h),
 | 
			
		||||
                            Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
 | 
			
		||||
                                h => fsw.Changed += h, h => fsw.Changed -= h),
 | 
			
		||||
                            Observable.FromEventPattern<RenamedEventHandler, FileSystemEventArgs>(
 | 
			
		||||
                                h => fsw.Renamed += h, h => fsw.Renamed -= h),
 | 
			
		||||
                            Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
 | 
			
		||||
                                h => fsw.Deleted += h, h => fsw.Deleted -= h)
 | 
			
		||||
                        })
 | 
			
		||||
                        // FromEventPattern supplies both the sender and the event
 | 
			
		||||
                        // args. Extract just the latter.
 | 
			
		||||
                        .Select(ep => ep.EventArgs)
 | 
			
		||||
                        // The Finally here ensures the watcher gets shut down once
 | 
			
		||||
                        // we have no subscribers.
 | 
			
		||||
                        .Finally(fsw.Dispose))
 | 
			
		||||
                // This combination of Publish and RefCount means that multiple
 | 
			
		||||
                // subscribers will get to share a single FileSystemWatcher,
 | 
			
		||||
                // but that it gets shut down if all subscribers unsubscribe.
 | 
			
		||||
                .Publish()
 | 
			
		||||
                .RefCount();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										38
									
								
								Stars Assistant/Helpers/RxExtensions.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								Stars Assistant/Helpers/RxExtensions.cs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,38 @@
 | 
			
		||||
using System.Reactive.Concurrency;
 | 
			
		||||
using System.Reactive.Linq;
 | 
			
		||||
    
 | 
			
		||||
namespace StarsAssistant.Helpers;
 | 
			
		||||
 | 
			
		||||
public static class RxExtensions
 | 
			
		||||
{
 | 
			
		||||
    /// <summary>
 | 
			
		||||
    /// Wrap an observerble to throttle the incoming changes into a given window. Will delay and collect
 | 
			
		||||
    /// any events fired until the minimum inactivity period has elapsed without events.
 | 
			
		||||
    /// </summary>
 | 
			
		||||
    /// <remarks>Curtesy of https://endjin.com/blog/2024/05/observe-file-system-changes-with-rx-dotnet</remarks>
 | 
			
		||||
    /// <typeparam name="T">The original type being observed</typeparam>
 | 
			
		||||
    /// <param name="src">The source observable to throttle.</param>
 | 
			
		||||
    /// <param name="minimumInactivityPeriod">The minimum inactivity period to wait before propagating.</param>
 | 
			
		||||
    /// <param name="scheduler">The scheduler to use, defaults to the default scheduler.</param>
 | 
			
		||||
    /// <returns>Wrapped observable.</returns>
 | 
			
		||||
    public static IObservable<IList<T>> Quiescent<T>(
 | 
			
		||||
        this IObservable<T> src, 
 | 
			
		||||
        TimeSpan minimumInactivityPeriod, 
 | 
			
		||||
        IScheduler? scheduler = null)
 | 
			
		||||
    {
 | 
			
		||||
        IScheduler realScheduler = scheduler ?? DefaultScheduler.Instance;
 | 
			
		||||
        IObservable<int> onoffs =
 | 
			
		||||
            from _ in src
 | 
			
		||||
            from delta in
 | 
			
		||||
                Observable.Return(1, realScheduler)
 | 
			
		||||
                    .Concat(Observable.Return(-1, realScheduler)
 | 
			
		||||
                        .Delay(minimumInactivityPeriod, realScheduler))
 | 
			
		||||
            select delta;
 | 
			
		||||
 | 
			
		||||
        IObservable<int> outstanding = onoffs.Scan(0, (total, delta) => total + delta);
 | 
			
		||||
        IObservable<int> zeroCrossings = outstanding.Where(total => total == 0);
 | 
			
		||||
 | 
			
		||||
        return src.Buffer(zeroCrossings);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user