-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathAsyncPump.cs
103 lines (91 loc) · 3.93 KB
/
AsyncPump.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
//-----------------------------------------------------------------------
// <copyright file="AsyncPump.cs" company="Studio A&T s.r.l.">
// Copyright (c) Studio A&T s.r.l. All rights reserved.
// </copyright>
// <author>Nicogis</author>
//-----------------------------------------------------------------------
namespace Studioat.ArcGISServer.UsageReports
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// Provides a pump that supports running asynchronous methods on the current thread.
/// </summary>
public static class AsyncPump
{
/// <summary>Runs the specified asynchronous function.</summary>
/// <param name="func">The asynchronous function to execute.</param>
public static void Run(Func<Task> func)
{
if (func == null)
{
throw new ArgumentNullException("func");
}
var prevCtx = SynchronizationContext.Current;
try
{
// Establish the new context
var syncCtx = new SingleThreadSynchronizationContext();
SynchronizationContext.SetSynchronizationContext(syncCtx);
// Invoke the function and alert the context to when it completes
var t = func();
if (t == null)
{
throw new InvalidOperationException("No task provided.");
}
t.ContinueWith(delegate { syncCtx.Complete(); }, TaskScheduler.Default);
// Pump continuations and propagate any exceptions
syncCtx.RunOnCurrentThread();
t.GetAwaiter().GetResult();
}
finally
{
SynchronizationContext.SetSynchronizationContext(prevCtx);
}
}
/// <summary>Provides a SynchronizationContext that's single-threaded.</summary>
private sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
/// <summary>The queue of work items.</summary>
private readonly BlockingCollection<KeyValuePair<SendOrPostCallback, object>> queue = new BlockingCollection<KeyValuePair<SendOrPostCallback, object>>();
/// <summary>The processing thread.</summary>
private readonly Thread thread = Thread.CurrentThread;
/// <summary>Dispatches an asynchronous message to the synchronization context.</summary>
/// <param name="d">The System.Threading.SendOrPostCallback delegate to call.</param>
/// <param name="state">The object passed to the delegate.</param>
public override void Post(SendOrPostCallback d, object state)
{
if (d == null)
{
throw new ArgumentNullException("d");
}
this.queue.Add(new KeyValuePair<SendOrPostCallback, object>(d, state));
}
/// <summary>
/// Not supported.
/// </summary>
/// <param name="d">object SendOrPostCallback</param>
/// <param name="state">object state</param>
public override void Send(SendOrPostCallback d, object state)
{
throw new NotSupportedException("Synchronously sending is not supported.");
}
/// <summary>Runs an loop to process all queued work items.</summary>
public void RunOnCurrentThread()
{
foreach (var workItem in this.queue.GetConsumingEnumerable())
{
workItem.Key(workItem.Value);
}
}
/// <summary>Notifies the context that no more work will arrive.</summary>
public void Complete()
{
this.queue.CompleteAdding();
}
}
}
}