-
Notifications
You must be signed in to change notification settings - Fork 68
/
Batching.fs
126 lines (108 loc) · 8.06 KB
/
Batching.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Manages grouping of concurrent requests (typically within a projection scenario) into batches
// Typically to reduce contention on a target resource
namespace Equinox.Core.Batching
open Equinox.Core
open System
/// Thread-safe coordinator that batches concurrent requests for a single <c>dispatch</> invocation such that:
/// - requests arriving together can be coalesced into the batch during the linger period via TryAdd
/// - callers that have had items admitted can concurrently await the shared fate of the dispatch via Await
/// - callers whose TryAdd has been denied can await the completion of the in-flight batch via AwaitCompletion
type internal Batch<'Req, 'Res>() =
let queue = new System.Collections.Concurrent.BlockingCollection<'Req>()
let tryEnqueue item =
if queue.IsAddingCompleted then false
else
// there's a race between the IsAddingCompleted check outcome and the CompleteAdding
// sadly there's no way to detect without a try/catch
try queue.TryAdd(item)
with :? InvalidOperationException -> false
let mutable attempt = Unchecked.defaultof<Lazy<Task<'Res>>>
/// Attempt to add a request to the flight
/// Succeeds during linger interval (which commences when the first caller triggers the workflow via AwaitResult)
/// Fails if this flight has closed (caller should initialize a fresh Batch, potentially holding off until the current attempt completes)
member _.TryAdd(req, dispatch: Func<'Req[], CancellationToken, Task<'Res>>, lingerMs: int, limiter: System.Threading.SemaphoreSlim, ct) =
if not (tryEnqueue req) then false else
// Prepare a new instance, with cancellation under our control (it won't start until the .Value triggers it though)
let newInstance: Lazy<Task<'Res>> = lazy task {
do! Task.Delay(lingerMs, ct)
if not (isNull limiter) then do! limiter.WaitAsync(ct)
try queue.CompleteAdding()
return! dispatch.Invoke(queue.ToArray(), ct)
finally if not (isNull limiter) then limiter.Release() |> ignore }
// If there are concurrent executions, the first through the gate wins; everybody else awaits the attempt the winner wrote
let _ = Interlocked.CompareExchange(&attempt, newInstance, null)
true
/// Await the outcome of dispatching the batch (on the basis that the caller has a stake due to a successful TryAdd/tryEnqueue)
member _.Await() = attempt.Value
/// Manages concurrent work such that concurrent requests targeting a shared resource are dispatched as a series of batched requests
/// Incoming requests are deferred to next batch if a batch in already flight (max one per Batcher)
/// Requests are added to pending batch during the wait period, which consists of two phases:
/// 1. a defined linger period (min 1ms)
/// 2. (optionally) a wait to acquire capacity on a limiter semaphore (e.g. one might have a limit on concurrent dispatches across a pool)
type Batcher<'Req, 'Res>(dispatch: Func<'Req[], CancellationToken, Task<'Res>>) =
let mutable cell = Batch<'Req, 'Res>()
let mutable lingerMs = 1 // concurrent waiters need to add work to the batch across their threads
new (dispatch: 'Req[] -> Async<'Res>) = Batcher(fun items ct -> Async.StartImmediateAsTask(dispatch items, ct))
/// Period to wait for others to join the dispatch. NOTE at least 1ms is required so failed concurrent TryAdd calls can join the next dispatch
member _.Linger with set (value: TimeSpan) =
match int value.TotalMilliseconds with
| x when x < 1 -> invalidArg (nameof value) "Minimum linger period is 1ms"
| x -> lingerMs <- x
/// Optionally extends the linger phase to include a period during which we await capacity on an externally managed Semaphore
/// The Batcher doesn't care, but a typical use is to enable limiting the number of concurrent in-flight dispatches
member val Limiter: System.Threading.SemaphoreSlim = null with get, set
/// Include an item in the batch; await the collective dispatch (subject to the configured linger time and optional limiter acquisition)
member x.ExecuteAsync(req, ct) = task {
let current = cell
// If current has not yet been dispatched, hop on and join
if current.TryAdd(req, dispatch, lingerMs, x.Limiter, ct) then return! current.Await()
else // Any thread that discovers a batch in flight, needs to wait for it to conclude first
do! current.Await().ContinueWith<unit>(fun (_: Task) -> ()) // wait for, but don't observe the exception or result from the in-flight batch
// where competing threads discover a closed flight, we only want a single one to regenerate it
let _ = Interlocked.CompareExchange(&cell, Batch(), current)
return! x.ExecuteAsync(req, ct) } // but everyone attempts to merge their requests into the batch during the linger period
/// Include an item in the batch; await the collective dispatch (subject to the configured linger time)
member x.Execute(req) = Async.call (fun ct -> x.ExecuteAsync(req, ct))
/// <summary>Thread Safe collection intended to manage a collection of <c>Batchers</c> (or instances of an equivalent type)
/// NOTE the memory usage is unbounded; if there are not a small stable number of entries, it's advised to employ a <c>BatcherCache</c></summary>
type BatcherDictionary<'Id, 'Entry>(create: Func<'Id, 'Entry>) =
// Its important we don't risk >1 instance https://andrewlock.net/making-getoradd-on-concurrentdictionary-thread-safe-using-lazy/
// while it would be safe, there would be a risk of incurring the cost of multiple initialization loops
let entries = System.Collections.Concurrent.ConcurrentDictionary<'Id, Lazy<'Entry>>()
let build id = lazy create.Invoke id
member _.GetOrAdd(id: 'Id): 'Entry =
entries.GetOrAdd(id, build).Value
/// <summary>Thread Safe helper that maintains a set of <c>Batchers</c> (or instances of an equivalent type) within a MemoryCache
/// NOTE if the number of items is bounded, <c>BatcherDictionary</c> is significantly more efficient</summary>
type BatcherCache<'Id, 'Entry>(cache: Cache<'Entry>, toKey: Func<'Id, string>, create: Func<'Id, 'Entry>, ?cacheWindow) =
let cacheWindow = defaultArg cacheWindow (TimeSpan.FromMinutes 1)
let cachePolicy = System.Runtime.Caching.CacheItemPolicy(SlidingExpiration = cacheWindow)
/// Maintains the entries in an internal cache limited to the specified size, with entries identified by "{id}"
new(name, create: Func<'Id, 'Entry>, sizeMb: int, ?cacheWindow) =
BatcherCache(Cache<'Entry>.Create(name, sizeMb), Func<'Id, string>(string), create, ?cacheWindow = cacheWindow)
/// Stores entries in the supplied cache, with entries identified by keys of the form "$Batcher-{id}"
static member Prefixed(cache: System.Runtime.Caching.MemoryCache, createEntry: Func<'Id, 'Entry>, ?cacheWindow) =
let mapKey = Func<'Id, string>(fun id -> "$Batcher-" + string id)
BatcherCache(Cache cache, mapKey, createEntry, ?cacheWindow = cacheWindow)
member _.GetOrAdd(id: 'Id) : 'Entry =
// Optimise for low allocations on happy path
let key = toKey.Invoke(id)
match cache.TryGet key with
| ValueSome entry -> entry
| ValueNone ->
match cache.AddOrGet(key, create.Invoke id, cachePolicy) with
| Ok entry -> entry
| Error entry -> entry
and Cache<'Entry>(target: System.Runtime.Caching.MemoryCache) =
static member Create<'Entry>(name, sizeMb) =
let config = System.Collections.Specialized.NameValueCollection(1)
config.Add("cacheMemoryLimitMegabytes", string sizeMb)
Cache(new System.Runtime.Caching.MemoryCache(name, config))
member internal _.TryGet key: 'Entry voption =
match target.Get key with
| null -> ValueNone
| existingEntry -> ValueSome (existingEntry :?> 'Entry)
member internal _.AddOrGet(key, entry, policy) =
match target.AddOrGetExisting(key, entry, policy = policy) with
| null -> Ok entry
| existingEntry -> Error (existingEntry :?> 'Entry)