Skip to content

Commit ab437c7

Browse files
Merge pull request #1663 from adrianm64/fb-AsyncMonitor
Alternative interface for the AsyncFtpFolderMonitor
2 parents 544d351 + 74a010e commit ab437c7

File tree

4 files changed

+419
-0
lines changed

4 files changed

+419
-0
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
using System;
2+
using System.IO;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using FluentFTP;
7+
using FluentFTP.Monitors;
8+
9+
namespace Examples {
10+
11+
internal static class MonitorExample {
12+
13+
// Downloads all PDF files from a folder on an FTP server
14+
// when they are fully uploaded (stable)
15+
public static async Task DownloadStablePdfFilesAsync(CancellationToken token) {
16+
var conn = new AsyncFtpClient("127.0.0.1", "ftptest", "ftptest");
17+
18+
await using var monitor = new AsyncFtpMonitor(conn, "path/to/folder");
19+
20+
monitor.PollInterval = TimeSpan.FromMinutes(5);
21+
monitor.WaitTillFileFullyUploaded = true;
22+
monitor.UnstablePollInterval = TimeSpan.FromSeconds(10);
23+
24+
monitor.SetHandler(static async (_, e) => {
25+
foreach (var file in e.Added
26+
.Where(x => x.Type == FtpObjectType.File)
27+
.Where(x => Path.GetExtension(x.Name) == ".pdf")) {
28+
var localFilePath = Path.Combine(@"C:\LocalFolder", file.Name);
29+
await e.FtpClient.DownloadFile(localFilePath, file.FullName, token: e.CancellationToken);
30+
await e.FtpClient.DeleteFile(file.FullName); // don't cancel this operation
31+
}
32+
});
33+
34+
await conn.Connect(token);
35+
await monitor.Start(token);
36+
}
37+
38+
// How to use the monitor in a console application
39+
public static async Task MainAsync() {
40+
using var tokenSource = new CancellationTokenSource();
41+
Console.CancelKeyPress += (_, e) =>
42+
{
43+
e.Cancel = true; // keep running until monitor is stopped
44+
tokenSource.Cancel(); // stop the monitor
45+
};
46+
47+
await DownloadStablePdfFilesAsync(tokenSource.Token);
48+
}
49+
}
50+
}

FluentFTP.VBExamples/Monitor.vb

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
Imports System.IO
2+
Imports System.Threading
3+
Imports FluentFTP
4+
Imports FluentFTP.Monitors
5+
6+
Namespace Examples
7+
Friend Module MonitorExample
8+
9+
' Downloads all PDF files from a folder on an FTP server
10+
' when they are fully uploaded (stable)
11+
Async Function DownloadStablePdfFilesAsync(token As CancellationToken) As Task
12+
Dim conn As New AsyncFtpClient("127.0.0.1", "ftptest", "ftptest")
13+
14+
Using monitor As new AsyncFtpMonitor(conn, "path/to/folder")
15+
16+
monitor.PollInterval = TimeSpan.FromMinutes(5)
17+
monitor.WaitTillFileFullyUploaded = True
18+
monitor.UnstablePollInterval = TimeSpan.FromSeconds(10)
19+
20+
monitor.SetHandler(Async Function(source, e)
21+
For Each file In From listItem In e.Added
22+
Where listItem.Type = FtpObjectType.File
23+
Where Path.GetExtension(listItem.Name) = ".pdf"
24+
25+
Dim localFilePath = Path.Combine("C:\LocalFolder", file.Name)
26+
Await e.FtpClient.DownloadFile(localFilePath, file.FullName, token := e.CancellationToken)
27+
Await e.FtpClient.DeleteFile(file.FullName) ' don't cancel this operation
28+
Next
29+
End Function)
30+
31+
Await conn.Connect(token)
32+
Await monitor.Start(token)
33+
End Using
34+
End Function
35+
36+
' How to use the monitor in a console application
37+
Public Async Function MainAsync() As Task
38+
Using tokenSource = New CancellationTokenSource()
39+
AddHandler Console.CancelKeyPress, Sub (source, e)
40+
e.Cancel = True ' keep running until monitor is stopped
41+
tokenSource.Cancel() ' stop the monitor
42+
End Sub
43+
44+
Await DownloadStablePdfFilesAsync(tokenSource.Token)
45+
End Using
46+
End Function
47+
End Module
48+
End Namespace
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace FluentFTP.Monitors {
8+
9+
/// <summary>
10+
/// An async FTP folder monitor that monitors specified remote folder(s) on the FTP server.
11+
/// It triggers events when list items are added, changed or removed.
12+
/// Internally it polls the remote folder(s) every <see cref="M:PollInterval"/> and checks for changed list items.
13+
/// If `WaitTillFileFullyUploaded` is true, then the list items is only detected as an added when the size is stable.
14+
/// </summary>
15+
#if NETSTANDARD2_1_OR_GREATER || NET5_0_OR_GREATER
16+
// IAsyncDisposable can be used
17+
public sealed class AsyncFtpMonitor : IDisposable, IAsyncDisposable {
18+
#else
19+
// IAsyncDisposable is not available
20+
public sealed class AsyncFtpMonitor : IDisposable {
21+
#endif
22+
private readonly IAsyncFtpClient _ftpClient;
23+
24+
private readonly Dictionary<string, long> _unstableListItems = new Dictionary<string, long>();
25+
26+
private Dictionary<string, FtpListItem> _lastListing = new Dictionary<string, FtpListItem>();
27+
28+
// the handler can not be exposed as a public event because it is async
29+
// the handler can not be exposed as a public property because it would allow multiple handlers (+=)
30+
// which does not work well with async handlers
31+
private Func<AsyncFtpMonitor, AsyncFtpMonitorEventArgs, Task> _handler;
32+
33+
private FtpListOption _options = FtpListOption.Modify | FtpListOption.Size;
34+
35+
/// <summary>
36+
/// Create a new FTP monitor.
37+
/// Provide a valid FTP client, and then do not use this client for any other purpose.
38+
/// This FTP client would then be owned and controlled by this class.
39+
/// The client can be used in the handler to perform FTP operations.
40+
/// </summary>
41+
public AsyncFtpMonitor(IAsyncFtpClient ftpClient, params string[] folderPaths) {
42+
_ftpClient = ftpClient ?? throw new ArgumentNullException(nameof(ftpClient));
43+
if (folderPaths == null || folderPaths.Length == 0) {
44+
throw new ArgumentNullException(nameof(folderPaths));
45+
}
46+
FolderPaths = folderPaths;
47+
}
48+
49+
/// <summary>
50+
/// Gets the monitored FTP folder path(s)
51+
/// </summary>
52+
public string[] FolderPaths { get; }
53+
54+
/// <summary>
55+
/// Gets or sets the polling interval. Default is 10 minutes.
56+
/// </summary>
57+
public TimeSpan PollInterval { get; set; } = TimeSpan.FromMinutes(10);
58+
59+
/// <summary>
60+
/// Gets or sets whether to wait for list items to have stable size before reporting them as added.
61+
/// </summary>
62+
public bool WaitTillFileFullyUploaded { get; set; } = true;
63+
64+
/// <summary>
65+
/// Gets or sets the polling interval to check for stable list items sizes
66+
/// when <see cref="P:WaitTillFileFullyUploaded"/> is <see langword="true"/>.
67+
/// <see langword="null"/> (default) to use the <see cref="P:PollInterval"/> as the unstable poll interval.
68+
/// </summary>
69+
public TimeSpan? UnstablePollInterval { get; set; }
70+
71+
/// <summary>
72+
/// Gets or sets the options used when listing the FTP folder
73+
/// Default is <see cref="F:FluentFTP.FtpListOption.Modify"/> and <see cref="F:FluentFTP.FtpListOption.Size"/>
74+
/// </summary>
75+
/// <remarks>Setting this property will reset the change tracking, i.e. all existing list items are assumed added</remarks>
76+
/// <example><code lang="cs">
77+
/// monitor.Options |= FtpListOption.Recursive;
78+
/// </code></example>
79+
public FtpListOption Options {
80+
get => _options;
81+
set {
82+
_options = value;
83+
_lastListing.Clear();
84+
_unstableListItems.Clear();
85+
}
86+
}
87+
88+
/// <summary>
89+
/// Sets the handler that is called when changes are detected in the monitored folder(s)
90+
/// </summary>
91+
/// <param name="handler">The handler to call</param>
92+
public void SetHandler(Func<AsyncFtpMonitor, AsyncFtpMonitorEventArgs, Task> handler) => _handler = handler;
93+
94+
/// <summary>
95+
/// Monitor the FTP folder(s) until the token is cancelled
96+
/// or an exception occurs in the FtpClient or the handler
97+
/// </summary>
98+
public async Task Start(CancellationToken token) {
99+
while (true) {
100+
try {
101+
var startTimeUtc = DateTime.UtcNow;
102+
103+
await PollFolder(token).ConfigureAwait(false);
104+
105+
var pollInterval = _unstableListItems.Count > 0 && UnstablePollInterval != null ? UnstablePollInterval.Value : PollInterval;
106+
var waitTime = pollInterval - (DateTime.UtcNow - startTimeUtc);
107+
108+
if (waitTime > TimeSpan.Zero) {
109+
await Task.Delay(waitTime, token).ConfigureAwait(false);
110+
}
111+
else {
112+
token.ThrowIfCancellationRequested();
113+
}
114+
}
115+
catch (OperationCanceledException)
116+
when (token.IsCancellationRequested) {
117+
break;
118+
}
119+
}
120+
}
121+
122+
public void Dispose() {
123+
_ftpClient?.Dispose();
124+
}
125+
126+
#if NETSTANDARD2_1_OR_GREATER || NET5_0_OR_GREATER
127+
public async ValueTask DisposeAsync() {
128+
if (_ftpClient != null) {
129+
await _ftpClient.DisposeAsync().ConfigureAwait(false);
130+
}
131+
}
132+
#endif
133+
public override string ToString() {
134+
return $"FolderPaths = \"{string.Join("\",\"", FolderPaths)}\" PollInterval = {PollInterval} WaitTillFileFullyUploaded = {WaitTillFileFullyUploaded}";
135+
}
136+
137+
/// <summary>
138+
/// Polls the FTP folder(s) for changes
139+
/// </summary>
140+
private async Task PollFolder(CancellationToken token) {
141+
// Step 1: Get the current listing
142+
var currentListing = await GetCurrentListing(token).ConfigureAwait(false);
143+
144+
// Step 2: Handle unstable list items if WaitTillFileFullyUploaded is true
145+
if (WaitTillFileFullyUploaded) {
146+
currentListing = StableListItems(currentListing);
147+
}
148+
149+
// Step 3: Compare current listing to last listing
150+
var changes = ListItemStatus(currentListing, _lastListing);
151+
152+
// Step 4: Update last listing
153+
_lastListing = currentListing;
154+
155+
if (changes.Added.Count == 0 && changes.Changed.Count == 0 && changes.Deleted.Count == 0) {
156+
return;
157+
}
158+
159+
// Step 5: Raise event
160+
var handler = _handler;
161+
if (handler == null) {
162+
return;
163+
}
164+
165+
try {
166+
var args = new AsyncFtpMonitorEventArgs(FolderPaths, changes.Added, changes.Changed, changes.Deleted, _ftpClient, token);
167+
await handler(this, args).ConfigureAwait(false);
168+
}
169+
catch (OperationCanceledException)
170+
when (token.IsCancellationRequested) {
171+
}
172+
}
173+
174+
private static ListItemChanges ListItemStatus(Dictionary<string, FtpListItem> currentListing,
175+
Dictionary<string, FtpListItem> lastListing)
176+
{
177+
var listItemsAdded = new List<FtpListItem>();
178+
var listItemsChanged = new List<FtpListItem>();
179+
180+
foreach (var listItem in currentListing) {
181+
if (!lastListing.TryGetValue(listItem.Key, out var lastItem)) {
182+
listItemsAdded.Add(listItem.Value);
183+
}
184+
else if (lastItem.Size != listItem.Value.Size || lastItem.Modified != listItem.Value.Modified) {
185+
listItemsChanged.Add(listItem.Value);
186+
}
187+
}
188+
189+
var listItemsDeleted = lastListing.Where(x => !currentListing.ContainsKey(x.Key))
190+
.Select(x => x.Value)
191+
.ToList();
192+
193+
return new ListItemChanges(added: listItemsAdded, changed: listItemsChanged, deleted: listItemsDeleted);
194+
}
195+
196+
private Dictionary<string, FtpListItem> StableListItems(Dictionary<string, FtpListItem> currentListing) {
197+
var stableListItems = new Dictionary<string, FtpListItem>();
198+
199+
foreach (var listItem in currentListing) {
200+
if (_unstableListItems.TryGetValue(listItem.Key, out long previousSize)) {
201+
if (previousSize == listItem.Value.Size) {
202+
// Size has not changed, add to stable
203+
stableListItems.Add(listItem.Key, listItem.Value);
204+
_unstableListItems.Remove(listItem.Key);
205+
}
206+
else {
207+
// Size is still changing, update unstable
208+
_unstableListItems[listItem.Key] = listItem.Value.Size;
209+
}
210+
}
211+
else if (!_lastListing.ContainsKey(listItem.Key)) {
212+
// New listItem, add to unstable
213+
_unstableListItems.Add(listItem.Key, listItem.Value.Size);
214+
}
215+
else {
216+
// Existing unchanged list item, add to stable
217+
stableListItems.Add(listItem.Key, listItem.Value);
218+
}
219+
}
220+
221+
// Remove any unstable that are no longer present
222+
var missingListItems = _unstableListItems.Keys.Except(currentListing.Keys).ToList();
223+
foreach (var listItem in missingListItems) {
224+
_unstableListItems.Remove(listItem);
225+
}
226+
227+
return stableListItems;
228+
}
229+
230+
/// <summary>
231+
/// Gets the current list items from the FTP server
232+
/// </summary>
233+
private async Task<Dictionary<string, FtpListItem>> GetCurrentListing(CancellationToken token) {
234+
FtpListOption options = Options;
235+
236+
if (_ftpClient.Capabilities.Contains(FtpCapability.STAT)) {
237+
options |= FtpListOption.UseStat;
238+
}
239+
240+
var listItems = new Dictionary<string, FtpListItem>();
241+
foreach (var folderPath in FolderPaths) {
242+
var folderListItems = await _ftpClient.GetListing(folderPath, options, token).ConfigureAwait(false);
243+
foreach (var folderListItem in folderListItems) {
244+
listItems[folderListItem.FullName] = folderListItem;
245+
}
246+
}
247+
248+
return listItems;
249+
}
250+
251+
// Tuples are not supported in oldest dotnet version supported
252+
private readonly struct ListItemChanges {
253+
public ListItemChanges(List<FtpListItem> added, List<FtpListItem> changed, List<FtpListItem> deleted) {
254+
Added = added;
255+
Changed = changed;
256+
Deleted = deleted;
257+
}
258+
259+
public List<FtpListItem> Added { get; }
260+
261+
public List<FtpListItem> Changed { get; }
262+
263+
public List<FtpListItem> Deleted { get; }
264+
}
265+
}
266+
}

0 commit comments

Comments
 (0)