-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathQueueScheduler.fs
478 lines (425 loc) · 22.5 KB
/
QueueScheduler.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
// Copyright (c) Microsoft Corporation.
/// A scheduler to handle requests posted on multiple concurrent queueus with
/// distinct priorities, relying on an abstraction of queuing API
module Microsoft.FSharpLu.Actor.QueueScheduler
open Microsoft.FSharpLu
/// Status of a request after being processed by a queue request handler
type RequestStatus<'m, 't> =
/// The request has been processed and can be removed from the scheduling system
| Completed of 't option
/// The request has been suspended (e.g. waiting for another call to return)
/// and should be removed from the scheduling system
| Suspended
/// Mark the request as completed and initiate another request
| Coreturn of 'm
/// The request must be paused for the specified amount of time,
/// once it the time out expires, the scheduler is responsible for resuming
/// processing of the request
| SleepAndResume of System.TimeSpan
/// The request must be paused for the specified amount of time.
/// Once the timeout expires, the scheduler is responsible for processing
/// a new request
| SleepAndResumeWith of System.TimeSpan * 'm
/// Queue interface implemented by the underlying queueing infrastructure
/// (e.g. Azure Queue, mailbox processor, ConcurrentQueue, ...)
type IQueueingAPI<'QueueMessage, 'QueueContent> =
/// queue name
abstract queueName : string
/// update content and visibility of a queue message
abstract update : 'QueueMessage -> 'QueueContent -> System.TimeSpan -> Async<unit>
/// update visibility of a queue message
abstract updateVisibility : 'QueueMessage -> System.TimeSpan -> Async<unit>
/// delete a queue message
abstract delete : 'QueueMessage -> Async<unit>
/// convert a queue message to string for debugging/logging purpose
abstract prettyPrintQueueMessage : 'QueueMessage -> string
/// extract the content of a queue message
abstract tryGetContent : 'QueueMessage -> Result<'QueueContent, System.Exception>
/// get queue message insertion time
abstract insertionTime : 'QueueMessage -> System.DateTimeOffset
/// try to pop specified number of messages from the queue
abstract tryGetMessageBatch : int -> System.TimeSpan -> Async<'QueueMessage list>
/// post a new message onto the queue
abstract post : 'QueueContent -> Async<unit>
/// post a new message onto the queue with the specified visibility
abstract postIn : 'QueueContent -> System.TimeSpan -> Async<unit>
/// Info returned for a request successfully processed
/// 'Request is the type of request
/// 'Result is the request result type
/// 'QueueMessage type representing a queue message by the underlying queueing API
type ProcessedRequestInfo<'Request, 'Result, 'QueueMessage> =
{
/// status of the request execution
executionStatus : RequestStatus<'Request,'Result>
/// the request, if it was properly parsed
request : 'Request
/// date/time when the request was posted on the queu
requestInsertionTime : System.DateTime
/// Time taken to process the request
processingTime : System.TimeSpan
/// queue message object from the underlying queue API
queuedMessage : 'QueueMessage
}
/// Info returned for a request that could not be processed
/// 'Request is the type of request
/// 'QueueMessage type representing a queue message by the underlying queueing API
type FailedRequestInfo<'Request, 'QueueMessage> =
{
/// A description of the error that occurred
errorMessage : string
/// More details about the error
details : string
/// processed request
request : 'Request option
/// date/time when the request was posted on the queu
requestInsertionTime : System.DateTime
/// Time taken to process the request
processingTime : System.TimeSpan
/// queue message object from the underlying queue API
queuedMessage : 'QueueMessage
}
/// Outcome of the processing of a request
/// 'Request is the type of request
/// 'Result is the request result type
/// 'QueueMessage type representing a queue message by the underlying queueing API
type RequestOutcome<'Request, 'Result, 'QueueMessage> =
| Processed of ProcessedRequestInfo<'Request, 'Result, 'QueueMessage>
| Error of FailedRequestInfo<'Request, 'QueueMessage>
| Rejected of FailedRequestInfo<'Request, 'QueueMessage>
| ExceptionThrown of System.Exception * FailedRequestInfo<'Request, 'QueueMessage>
/// A continuation that handles processing of an agent execution status.
/// Defining this as a class type rather than let-binding is a "trick"
/// to prevent the F# typecheker to unify the ghost generic type type parameter 'Result
/// against the first occurrence of k.
/// This means that Continuation.k can be used in the same function
/// for different instances of the type parameter 'Result.
type IContinuation<'Request> =
abstract k<'Request, 'Result> : Async<RequestStatus<'Request, 'Result>> -> Async<unit>
/// Loggger interface for request outcome
type IOutcomeLogger<'Request, 'QueueMessage> =
abstract log : RequestOutcome<'Request, 'Result, 'QueueMessage> -> unit
/// Handler for queued requests
/// 'Request is the type of queued request
/// 'Context is a custom context parameter that will get passed to every request handler
type Handler<'Context, 'Request> = 'Context -> IContinuation<'Request> -> 'Request -> Async<unit>
/// Defines a processor consuming queued request.
/// 'QId defines an enumeration of possible queue Ids
/// 'R is the type of requests handled by the processor
/// 'C is a custom context parameter that will get passed to every request handler
[<NoEquality;NoComparison>]
type QueueProcessor<'QId, 'Request, 'Context> =
{
/// Name of the queue
queueId : 'QId
/// Function handling a request received on a Queue
handler : Handler<'Context, 'Request>
/// Initial value of the maximum expected processing time.
///
/// Note for Azure queues: After this timeout expires Azure automatically reposts the message onto the queue.
/// We want this value to be high enough to cover requests with long processing time (e.g. creating a VM in Azure)
/// but not too high to avoid delays in case a real backend failure occurs.
///
/// A request handler can dynamically override this value if more time is needed to process a request.
/// State machine agents (QueueRequestHandling module) facilitate this by automatically updating the expected
/// processing time on each transition of the state machine.
maxProcessTime : System.TimeSpan
/// Number of messages to pull at once from the a queue
///
/// For Azure Queues, keep this number low for maxium utilization of concurrency between worker role instances
/// Note: maximum allowed is 32 (if greater you'll get a warning followed by an obscure exception
/// from Azure, See https://msdn.microsoft.com/en-us/library/azure/dd179474.aspx)
messageBatchSize : int
}
/// Queue request processor options
type Options =
{
/// Amount of time to sleep when all request queues are empty
SleepDurationWhenAllQueuesAreEmpty : System.TimeSpan
/// Time interval after which a new heartbeat even may be sent
HeartBeatIntervals : System.TimeSpan
/// Number of requests processed concurrently at the same time
ConcurrentRequestWorkers : int
/// Timeout period when waiting for an available processor from the pool.
/// After this time, it is assumed that at least one processor is dead
/// and a new one is allocated.
WorkerReplacementTimeout : System.TimeSpan
}
/// Exception raised by message handlers when a message is rejected
exception RejectedMessage
/// Process a single request from a Queue
let inline processRequest<'Context, 'Request, 'QueueMessage>
(trace:Microsoft.FSharpLu.Logging.Interfaces.ITagsTracer)
(context:'Context)
(queueSystem:IQueueingAPI<'QueueMessage, 'Request>)
(queuedMessage:'QueueMessage)
(handler:Handler<'Context, 'Request>)
(getTags:'Request -> (string*string) list)
(logger: IOutcomeLogger<'Request, 'QueueMessage>) =
async {
let requestInsertionTime = (queueSystem.insertionTime queuedMessage).UtcDateTime
match queueSystem.tryGetContent queuedMessage with
| Result.Error deserializationError ->
logger.log <|
RequestOutcome.Error
{
errorMessage = "Could not parse queued message"
details = deserializationError.ToString()
request = None
requestInsertionTime = requestInsertionTime
processingTime = System.TimeSpan.Zero
queuedMessage = queuedMessage
}
do! queueSystem.delete queuedMessage
| Result.Ok parsedMessage ->
let tags = getTags parsedMessage
trace.info "Processing queued message"
(tags @ ["parsedMessage", (sprintf "%A" parsedMessage)])
let processingTime = System.Diagnostics.Stopwatch()
try
processingTime.Start()
do! handler context
{ new IContinuation<'Request> with
member __.k executionStatusAsync =
async {
// Schedule the remaining execution of the specified queued request
let! executionStatus = executionStatusAsync
processingTime.Stop()
match executionStatus with
| RequestStatus.Completed _ ->
do! queueSystem.delete queuedMessage
| RequestStatus.Suspended ->
do! queueSystem.delete queuedMessage
| RequestStatus.SleepAndResume visibilityTimeout ->
do! queueSystem.updateVisibility queuedMessage visibilityTimeout
| RequestStatus.SleepAndResumeWith (visibilityTimeout, newRequest) ->
do! queueSystem.update queuedMessage newRequest visibilityTimeout
| RequestStatus.Coreturn newRequest ->
do! queueSystem.post newRequest
do! queueSystem.delete queuedMessage
logger.log <|
RequestOutcome.Processed
{
executionStatus = executionStatus
request = parsedMessage
requestInsertionTime = requestInsertionTime
processingTime = processingTime.Elapsed
queuedMessage = queuedMessage
}
}
}
parsedMessage
with
| RejectedMessage ->
processingTime.Stop()
logger.log <|
RequestOutcome.Rejected
{
errorMessage = "This request type was rejected from queue handler"
details = sprintf "Queue name: %s" queueSystem.queueName
request = Some parsedMessage
requestInsertionTime = requestInsertionTime
processingTime = processingTime.Elapsed
queuedMessage = queuedMessage
}
do! queueSystem.delete queuedMessage
| e ->
processingTime.Stop()
let elapsed = processingTime.Elapsed
trace.trackException e (tags @[
"postedTime", requestInsertionTime.ToString()
"processingTime", elapsed.ToString()
"request", queueSystem.prettyPrintQueueMessage queuedMessage
])
logger.log <|
RequestOutcome.ExceptionThrown
(e,
{
errorMessage = "Exception raised while processing request"
details = sprintf "Exception: %O" e
request = Some parsedMessage
requestInsertionTime = requestInsertionTime
processingTime = processingTime.Elapsed
queuedMessage = queuedMessage
})
do! queueSystem.delete queuedMessage
}
exception ProcessingLoopCancelled
/// A processing loop handling requests posted on multiple
/// queues with different assigned priorities
let inline processingLoopMultipleQueues<'QueueId, 'Request, 'Context, 'QueueMessage>
(trace:Microsoft.FSharpLu.Logging.Interfaces.ITagsTracer)
(options:Options)
(queueProcessorsOrderedByPriority: (QueueProcessor<'QueueId, 'Request, 'Context>) list)
(getQueue:'QueueId -> IQueueingAPI<_,_>)
(createRequestContext : IQueueingAPI<_,_> -> 'QueueMessage -> 'Context)
(signalHeartBeat : unit -> unit)
(terminationRequested: System.Threading.CancellationToken)
getTags
(logger:IOutcomeLogger<'Request, 'QueueMessage>)
=
async {
use processingPool = new Microsoft.FSharpLu.Async.Synchronization.Pool(options.ConcurrentRequestWorkers)
let pool = processingPool :> Microsoft.FSharpLu.Async.Synchronization.IPool
/// Find pending request from the queue with highest priority
let rec findHighestPriorityRequests (queues:(IQueueingAPI<_,_>*QueueProcessor<'QueueId, 'Request, 'Context>) list) =
async {
match queues with
| [] -> return None
| (queue, handling : QueueProcessor<'QueueId, 'Request, 'Context>)::rest ->
let! m = queue.tryGetMessageBatch handling.messageBatchSize handling.maxProcessTime
match m with
| [] -> return! findHighestPriorityRequests rest
| messages -> return Some (queue, handling, messages)
}
/// Process a batch of messages
let inline processMessageBatch handler queue (queueMessageBatch:'QueueMessage list) =
Async.Parallel
[
for queuedMessage in queueMessageBatch ->
async {
use! resourceAvailable =
pool.AcquireAsync(Some(options.WorkerReplacementTimeout.TotalMilliseconds |> int))
if terminationRequested.IsCancellationRequested then
raise ProcessingLoopCancelled
let context = createRequestContext queue queuedMessage
do! processRequest
trace
context
queue
queuedMessage
handler
getTags
logger
}
|> Async.Catch // Individual request should be able to fail independently without taking down the entire batch
]
|> Async.Ignore
let queuesOrderedByPriority =
queueProcessorsOrderedByPriority
|> Seq.map (fun handling ->
let queue = getQueue handling.queueId
trace.info "Monitoring queue" ["queueName", queue.queueName]
queue, handling)
|> Seq.toList
/// Outermost loop: process requests from each channel by priority
/// Note: One drawback is that higher-priority queues may starve lower-priority ones
let rec processChannels (heartBeatWatch:System.Diagnostics.Stopwatch) =
async {
// Send heartbeat if sufficient time has passed since the last one
if heartBeatWatch.Elapsed > options.HeartBeatIntervals then
signalHeartBeat ()
heartBeatWatch.Restart()
if terminationRequested.IsCancellationRequested then
raise ProcessingLoopCancelled
else
let sleepDurationWhenAllQueuesAreEmptyInMs =
options.SleepDurationWhenAllQueuesAreEmpty.TotalMilliseconds |> int
let! highestRequest = findHighestPriorityRequests queuesOrderedByPriority
match highestRequest with
| None ->
// all channels are empty: we rest for a bit
do! Async.Sleep(sleepDurationWhenAllQueuesAreEmptyInMs)
| Some (queue, handling, firstMessageBatch) ->
do! processMessageBatch handling.handler queue firstMessageBatch
return! processChannels heartBeatWatch
}
let heartBeatWatch = System.Diagnostics.Stopwatch()
heartBeatWatch.Start()
do! processChannels heartBeatWatch
}
/// In-memory queue system implemented with System.Collections.ConcurrentQueue
/// with not fault-tolerance (if the process crashes the queue content is lost)
/// and not visiblity timeout when consuming messages
module InMemoryQueue =
type QueueEntry<'QueueContent> =
{
insertionTime : System.DateTimeOffset
content : 'QueueContent
visible : bool
}
/// Create an instance of an queue processor based on mailbox processor
type InMemoryQueueProcessor<'QueueId, 'QueueContent>(queueNamePrefix, queueId:'QueueId) =
let queueName = sprintf "%s-%A" queueNamePrefix queueId
let queue = new System.Collections.Concurrent.ConcurrentDictionary<System.Guid, QueueEntry<'QueueContent>>()
interface IQueueingAPI<System.Guid, 'QueueContent> with
member __.queueName = queueName
member __.update queuedRequest queueContent visibility = async {
let newContent =
queue.AddOrUpdate(
queuedRequest,
(fun _ -> { insertionTime = System.DateTimeOffset.UtcNow
content = queueContent
visible = true }),
(fun _ c -> { c with content = queueContent }))
return ()
}
// no-op: visiblity timeout is infinite in this implementation
member __.updateVisibility queuedRequest visibilityTimeout = async {
if not <| queue.ContainsKey(queuedRequest) then
failwith "could not find queue entry"
}
member __.delete queuedRequest =
async {
let success, _ = queue.TryRemove(queuedRequest)
if not success then
failwith "could not remove entry from queue"
}
member __.insertionTime queueMessage =
let success, message = queue.TryGetValue(queueMessage)
if not success then
failwith "could not find queue entry"
message.insertionTime
member __.prettyPrintQueueMessage queueMessage =
let success, message = queue.TryGetValue(queueMessage)
if not success then
failwith "could not find queue entry"
sprintf "%A"message.content
member __.tryGetContent k : Result<'QueueContent, System.Exception> =
match queue.TryGetValue k with
| true, c -> Result.Ok c.content
| false, _ -> Result.Error (System.ArgumentException("Specified message queue ID not found") :> System.Exception)
member __.tryGetMessageBatch batchSize _ =
async {
let nextBatch =
lock queue (fun () ->
queue :> seq<_>
|> Seq.where (fun m -> m.Value.visible)
|> Seq.sortBy (fun m -> m.Value.insertionTime)
|> Seq.map (fun c ->
if not <| queue.TryUpdate(c.Key, { c.Value with visible = false }, c.Value) then
failwith "impossible: queue entry disappeard!"
c.Key
)
|> Seq.truncate batchSize
|> Seq.toList
)
Microsoft.FSharpLu.Logging.Trace.info "Batch: %A" nextBatch
return nextBatch
}
member __.post (content:'QueueContent) =
async {
let id = System.Guid.NewGuid()
let queueEntry =
{ insertionTime = System.DateTimeOffset.UtcNow
content = content
visible = true
}
if not <| queue.TryAdd(id, queueEntry) then
failwith "impossible: guid collision"
}
member __.postIn (content:'QueueContent) delay =
async {
let id = System.Guid.NewGuid()
let queueEntry =
{ insertionTime = System.DateTimeOffset.UtcNow
content = content
visible = true
}
let child = async {
do! Async.Sleep (int <| delay.TotalMilliseconds)
if not <| queue.TryAdd(id, queueEntry) then
failwith "impossible: guid collision"
}
let r = Async.StartChild child
return ()
}