forked from traderinteractive/mongo-queue-csharp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Queue.cs
667 lines (577 loc) · 29.7 KB
/
Queue.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
using System;
using System.Configuration;
using System.Threading;
using System.Linq;
using MongoDB.Bson;
using MongoDB.Driver;
using System.Reflection;
using System.Security.Cryptography;
using System.Collections.Generic;
using MongoDB.Driver.GridFS;
using System.IO;
using System.Runtime.CompilerServices;
[assembly: AssemblyVersion("2.0.0.*"), InternalsVisibleTo("DominionEnterprises.Mongo.Tests")]
namespace DominionEnterprises.Mongo
{
/// <summary>
/// Abstraction of mongo db collection as priority queue.
/// </summary>
/// <remarks>
/// Tied priorities are then ordered by time. So you may use a single priority for normal queuing (overloads exist for this purpose).
/// Using a random priority achieves random Get()
/// </remarks>
public sealed class Queue
{
internal const int ACK_MULTI_BATCH_SIZE = 1000;
private readonly MongoCollection collection;
private readonly MongoGridFS gridfs;
/// <summary>
/// Construct MongoQueue with url, db name and collection name from app settings keys mongoQueueUrl, mongoQueueDb and mongoQueueCollection
/// </summary>
public Queue()
: this(ConfigurationManager.AppSettings["mongoQueueUrl"], ConfigurationManager.AppSettings["mongoQueueDb"], ConfigurationManager.AppSettings["mongoQueueCollection"])
{ }
/// <summary>
/// Construct MongoQueue
/// </summary>
/// <param name="url">mongo url like mongodb://localhost</param>
/// <param name="db">db name</param>
/// <param name="collection">collection name</param>
/// <exception cref="ArgumentNullException">url, db or collection is null</exception>
public Queue(string url, string db, string collection)
{
if (url == null) throw new ArgumentNullException("url");
if (db == null) throw new ArgumentNullException("db");
if (collection == null) throw new ArgumentNullException("collection");
this.collection = new MongoClient(url).GetServer().GetDatabase(db).GetCollection(collection);
this.gridfs = this.collection.Database.GetGridFS(MongoGridFSSettings.Defaults);
}
/// <summary>
/// Construct MongoQueue
/// </summary>
/// <param name="collection">collection</param>
/// <exception cref="ArgumentNullException">collection is null</exception>
public Queue(MongoCollection collection)
{
if (collection == null) throw new ArgumentNullException("collection");
this.collection = collection;
this.gridfs = collection.Database.GetGridFS(MongoGridFSSettings.Defaults);
}
#region EnsureGetIndex
/// <summary>
/// Ensure index for Get() method with no fields before or after sort fields
/// </summary>
public void EnsureGetIndex()
{
EnsureGetIndex(new IndexKeysDocument());
}
/// <summary>
/// Ensure index for Get() method with no fields after sort fields
/// </summary>
/// <param name="beforeSort">fields in Get() call that should be before the sort fields in the index</param>
/// <exception cref="ArgumentNullException">beforeSort is null</exception>
/// <exception cref="ArgumentException">beforeSort field value is not 1 or -1</exception>
public void EnsureGetIndex(IndexKeysDocument beforeSort)
{
EnsureGetIndex(beforeSort, new IndexKeysDocument());
}
/// <summary>
/// Ensure index for Get() method
/// </summary>
/// <param name="beforeSort">fields in Get() call that should be before the sort fields in the index</param>
/// <param name="afterSort">fields in Get() call that should be after the sort fields in the index</param>
/// <exception cref="ArgumentNullException">beforeSort or afterSort is null</exception>
/// <exception cref="ArgumentException">beforeSort or afterSort field value is not 1 or -1</exception>
public void EnsureGetIndex(IndexKeysDocument beforeSort, IndexKeysDocument afterSort)
{
if (beforeSort == null) throw new ArgumentNullException("beforeSort");
if (afterSort == null) throw new ArgumentNullException("afterSort");
//using general rule: equality, sort, range or more equality tests in that order for index
var completeIndex = new IndexKeysDocument("running", 1);
foreach (var field in beforeSort)
{
if (field.Value != 1 && field.Value != -1) throw new ArgumentException("field values must be 1 or -1 for ascending or descending", "beforeSort");
completeIndex.Add("payload." + field.Name, field.Value);
}
completeIndex.Add("priority", 1);
completeIndex.Add("created", 1);
foreach (var field in afterSort)
{
if (field.Value != -1 && field.Value != 1) throw new ArgumentException("field values must be 1 or -1 for ascending or descending", "afterSort");
completeIndex.Add("payload." + field.Name, field.Value);
}
completeIndex.Add("earliestGet", 1);
EnsureIndex(completeIndex);//main query in Get()
EnsureIndex(new IndexKeysDocument { { "running", 1 }, { "resetTimestamp", 1 } });//for the stuck messages query in Get()
}
#endregion
/// <summary>
/// Ensure index for Count() method
/// Is a no-op if the generated index is a prefix of an existing one. If you have a similar EnsureGetIndex call, call it first.
/// </summary>
/// <param name="index">fields in Count() call</param>
/// <param name="includeRunning">whether running was given to Count() or not</param>
/// <exception cref="ArgumentNullException">index was null</exception>
/// <exception cref="ArgumentException">index field value is not 1 or -1</exception>
public void EnsureCountIndex(IndexKeysDocument index, bool includeRunning)
{
if (index == null) throw new ArgumentNullException("index");
var completeFields = new IndexKeysDocument();
if (includeRunning)
completeFields.Add("running", 1);
foreach (var field in index)
{
if (field.Value != 1 && field.Value != -1) throw new ArgumentException("field values must be 1 or -1 for ascending or descending", "index");
completeFields.Add("payload." + field.Name, field.Value);
}
EnsureIndex(completeFields);
}
#region Get
/// <summary>
/// Get a non running message from queue with a wait of 3 seconds and poll of 200 milliseconds
/// </summary>
/// <param name="query">query where top level fields do not contain operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3}, invalid {$and: [{...}, {...}]}</param>
/// <param name="resetRunning">duration before this message is considered abandoned and will be given with another call to Get()</param>
/// <returns>message or null</returns>
/// <exception cref="ArgumentNullException">query is null</exception>
public Message Get(QueryDocument query, TimeSpan resetRunning)
{
return Get(query, resetRunning, TimeSpan.FromSeconds(3));
}
/// <summary>
/// Get a non running message from queue with a poll of 200 milliseconds
/// </summary>
/// <param name="query">query where top level fields do not contain operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3}, invalid {$and: [{...}, {...}]}</param>
/// <param name="resetRunning">duration before this message is considered abandoned and will be given with another call to Get()</param>
/// <param name="wait">duration to keep polling before returning null</param>
/// <returns>message or null</returns>
/// <exception cref="ArgumentNullException">query is null</exception>
public Message Get(QueryDocument query, TimeSpan resetRunning, TimeSpan wait)
{
return Get(query, resetRunning, wait, TimeSpan.FromMilliseconds(200));
}
/// <summary>
/// Get a non running message from queue with an approxiate wait.
/// </summary>
/// <param name="query">query where top level fields do not contain operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3}, invalid {$and: [{...}, {...}]}</param>
/// <param name="resetRunning">duration before this message is considered abandoned and will be given with another call to Get()</param>
/// <param name="wait">duration to keep polling before returning null</param>
/// <param name="poll">duration between poll attempts</param>
/// <returns>message or null</returns>
/// <exception cref="ArgumentNullException">query is null</exception>
public Message Get(QueryDocument query, TimeSpan resetRunning, TimeSpan wait, TimeSpan poll)
{
return Get(query, resetRunning, wait, poll, true);
}
/// <summary>
/// Get a non running message from queue
/// </summary>
/// <param name="query">query where top level fields do not contain operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3}, invalid {$and: [{...}, {...}]}</param>
/// <param name="resetRunning">duration before this message is considered abandoned and will be given with another call to Get()</param>
/// <param name="wait">duration to keep polling before returning null</param>
/// <param name="poll">duration between poll attempts</param>
/// <param name="approximateWait">whether to fluctuate the wait time randomly by +-10 percent. This ensures Get() calls seperate in time when multiple Queues are used in loops started at the same time</param>
/// <returns>message or null</returns>
/// <exception cref="ArgumentNullException">query is null</exception>
public Message Get(QueryDocument query, TimeSpan resetRunning, TimeSpan wait, TimeSpan poll, bool approximateWait)
{
if (query == null)
throw new ArgumentNullException ("query");
//reset stuck messages
collection.Update(
new QueryDocument { { "running", true }, { "resetTimestamp", new BsonDocument("$lte", DateTime.UtcNow) } },
new UpdateDocument("$set", new BsonDocument("running", false)),
UpdateFlags.Multi
);
var builtQuery = new QueryDocument("running", false);
foreach (var field in query)
builtQuery.Add("payload." + field.Name, field.Value);
builtQuery.Add("earliestGet", new BsonDocument("$lte", DateTime.UtcNow));
var resetTimestamp = DateTime.UtcNow;
try
{
resetTimestamp += resetRunning;
}
catch (ArgumentOutOfRangeException)
{
resetTimestamp = resetRunning > TimeSpan.Zero ? DateTime.MaxValue : DateTime.MinValue;
}
var sort = new SortByDocument { { "priority", 1 }, { "created", 1 } };
var update = new UpdateDocument("$set", new BsonDocument { { "running", true }, { "resetTimestamp", resetTimestamp } });
var fields = new FieldsDocument { { "payload", 1 }, { "streams", 1 } };
var end = DateTime.UtcNow;
try
{
if (approximateWait)
//fluctuate randomly by 10 percent
wait += TimeSpan.FromMilliseconds(wait.TotalMilliseconds * GetRandomDouble(-0.1, 0.1));
end += wait;
}
catch (Exception e)
{
if (!(e is OverflowException) && !(e is ArgumentOutOfRangeException))
throw e;//cant cover
end = wait > TimeSpan.Zero ? DateTime.MaxValue : DateTime.MinValue;
}
while (true)
{
var findModifyArgs = new FindAndModifyArgs { Query = builtQuery, SortBy = sort, Update = update, Fields = fields, Upsert = false };
var message = collection.FindAndModify(findModifyArgs).ModifiedDocument;
if (message != null)
{
var handleStreams = new List<KeyValuePair<BsonValue, Stream>>();
var messageStreams = new Dictionary<string, Stream>();
foreach (var streamId in message["streams"].AsBsonArray)
{
var fileInfo = gridfs.FindOneById(streamId);
var stream = fileInfo.OpenRead();
handleStreams.Add(new KeyValuePair<BsonValue, Stream>(streamId, stream));
messageStreams.Add(fileInfo.Name, stream);
}
var handle = new Handle(message["_id"].AsObjectId, handleStreams);
return new Message(handle, message["payload"].AsBsonDocument, messageStreams);
}
if (DateTime.UtcNow >= end)
return null;
try
{
Thread.Sleep(poll);
}
catch (ArgumentOutOfRangeException)
{
if (poll < TimeSpan.Zero)
poll = TimeSpan.Zero;
else
poll = TimeSpan.FromMilliseconds(int.MaxValue);
Thread.Sleep(poll);
}
if (DateTime.UtcNow >= end)
return null;
}
}
#endregion
#region Count
/// <summary>
/// Count in queue, running true or false
/// </summary>
/// <param name="query">query where top level fields do not contain operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3}, invalid {$and: [{...}, {...}]}</param>
/// <returns>count</returns>
/// <exception cref="ArgumentNullException">query is null</exception>
public long Count(QueryDocument query)
{
if (query == null) throw new ArgumentNullException("query");
var completeQuery = new QueryDocument();
foreach (var field in query)
completeQuery.Add("payload." + field.Name, field.Value);
return collection.Count(completeQuery);
}
/// <summary>
/// Count in queue
/// </summary>
/// <param name="query">query where top level fields do not contain operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3}, invalid {$and: [{...}, {...}]}</param>
/// <param name="running">count running messages or not running</param>
/// <returns>count</returns>
/// <exception cref="ArgumentNullException">query is null</exception>
public long Count(QueryDocument query, bool running)
{
if (query == null) throw new ArgumentNullException("query");
var completeQuery = new QueryDocument("running", running);
foreach (var field in query)
completeQuery.Add("payload." + field.Name, field.Value);
return collection.Count(completeQuery);
}
#endregion
/// <summary>
/// Acknowledge a handle was processed and remove from queue.
/// </summary>
/// <param name="handle">handle received from Get()</param>
/// <exception cref="ArgumentNullException">handle is null</exception>
public void Ack(Handle handle)
{
if (handle == null) throw new ArgumentNullException("handle");
collection.Remove(new QueryDocument("_id", handle.Id));
foreach (var stream in handle.Streams)
{
stream.Value.Dispose();
gridfs.DeleteById(stream.Key);
}
}
/// <summary>
/// Acknowledge multiple handles were processed and remove from queue.
/// </summary>
/// <param name="handles">handles received from Get()</param>
/// <exception cref="ArgumentNullException">handles is null</exception>
public void AckMulti(IEnumerable<Handle> handles)
{
if (handles == null) throw new ArgumentNullException("handles");
var ids = new BsonArray();
foreach (var handle in handles)
{
ids.Add(handle.Id);
if (ids.Count != ACK_MULTI_BATCH_SIZE)
continue;
collection.Remove(new QueryDocument("_id", new BsonDocument("$in", ids)));
ids.Clear();
}
if (ids.Count > 0)
collection.Remove(new QueryDocument("_id", new BsonDocument("$in", ids)));
foreach (var handle in handles)
{
foreach (var stream in handle.Streams)
{
stream.Value.Dispose();
gridfs.DeleteById(stream.Key);
}
}
}
#region AckSend
/// <summary>
/// Ack handle and send payload to queue, atomically, with earliestGet as Now, 0.0 priority, new timestamp and no gridfs streams
/// </summary>
/// <param name="handle">handle to ack received from Get()</param>
/// <param name="payload">payload to send</param>
/// <exception cref="ArgumentNullException">handle or payload is null</exception>
public void AckSend(Handle handle, BsonDocument payload)
{
AckSend(handle, payload, DateTime.UtcNow);
}
/// <summary>
/// Ack handle and send payload to queue, atomically, with 0.0 priority, new timestamp and no gridfs streams
/// </summary>
/// <param name="handle">handle to ack received from Get()</param>
/// <param name="payload">payload to send</param>
/// <param name="earliestGet">earliest instant that a call to Get() can return message</param>
/// <exception cref="ArgumentNullException">handle or payload is null</exception>
public void AckSend(Handle handle, BsonDocument payload, DateTime earliestGet)
{
AckSend(handle, payload, earliestGet, 0.0);
}
/// <summary>
/// Ack handle and send payload to queue, atomically, with new timestamp and no gridfs streams
/// </summary>
/// <param name="handle">handle to ack received from Get()</param>
/// <param name="payload">payload to send</param>
/// <param name="earliestGet">earliest instant that a call to Get() can return message</param>
/// <param name="priority">priority for order out of Get(). 0 is higher priority than 1</param>
/// <exception cref="ArgumentNullException">handle or payload is null</exception>
/// <exception cref="ArgumentException">priority was NaN</exception>
public void AckSend(Handle handle, BsonDocument payload, DateTime earliestGet, double priority)
{
AckSend(handle, payload, earliestGet, priority, true);
}
/// <summary>
/// Ack handle and send payload to queue, atomically, with no gridfs streams
/// </summary>
/// <param name="handle">handle to ack received from Get()</param>
/// <param name="payload">payload to send</param>
/// <param name="earliestGet">earliest instant that a call to Get() can return message</param>
/// <param name="priority">priority for order out of Get(). 0 is higher priority than 1</param>
/// <param name="newTimestamp">true to give the payload a new timestamp or false to use given message timestamp</param>
/// <exception cref="ArgumentNullException">handle or payload is null</exception>
/// <exception cref="ArgumentException">priority was NaN</exception>
public void AckSend(Handle handle, BsonDocument payload, DateTime earliestGet, double priority, bool newTimestamp)
{
AckSend(handle, payload, earliestGet, priority, newTimestamp, new KeyValuePair<string, Stream>[0]);
}
/// <summary>
/// Ack handle and send payload to queue, atomically.
/// </summary>
/// <param name="handle">handle to ack received from Get()</param>
/// <param name="payload">payload to send</param>
/// <param name="earliestGet">earliest instant that a call to Get() can return message</param>
/// <param name="priority">priority for order out of Get(). 0 is higher priority than 1</param>
/// <param name="newTimestamp">true to give the payload a new timestamp or false to use given message timestamp</param>
/// <param name="streams">streams to upload into gridfs or null to forward handle's streams</param>
/// <exception cref="ArgumentNullException">handle or payload is null</exception>
/// <exception cref="ArgumentException">priority was NaN</exception>
public void AckSend(Handle handle, BsonDocument payload, DateTime earliestGet, double priority, bool newTimestamp, IEnumerable<KeyValuePair<string, Stream>> streams)
{
if (handle == null) throw new ArgumentNullException("handle");
if (payload == null) throw new ArgumentNullException("payload");
if (Double.IsNaN(priority)) throw new ArgumentException("priority was NaN", "priority");
var toSet = new BsonDocument
{
{"payload", payload},
{"running", false},
{"resetTimestamp", DateTime.MaxValue},
{"earliestGet", earliestGet},
{"priority", priority},
};
if (newTimestamp)
toSet["created"] = DateTime.UtcNow;
if (streams != null)
{
var streamIds = new BsonArray();
foreach (var stream in streams)
streamIds.Add(gridfs.Upload(stream.Value, stream.Key).Id);
toSet["streams"] = streamIds;
}
//using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY) so we can just send
collection.Update(new QueryDocument("_id", handle.Id), new UpdateDocument("$set", toSet), UpdateFlags.Upsert);
foreach (var existingStream in handle.Streams)
existingStream.Value.Dispose();
if (streams != null)
{
foreach (var existingStream in handle.Streams)
gridfs.DeleteById(existingStream.Key);
}
}
#endregion
#region Send
/// <summary>
/// Send message to queue with earliestGet as Now, 0.0 priority and no gridfs streams
/// </summary>
/// <param name="payload">payload</param>
/// <exception cref="ArgumentNullException">payload is null</exception>
public void Send(BsonDocument payload)
{
Send(payload, DateTime.UtcNow, 0.0);
}
/// <summary>
/// Send message to queue with 0.0 priority and no gridfs streams
/// </summary>
/// <param name="payload">payload</param>
/// <param name="earliestGet">earliest instant that a call to Get() can return message</param>
/// <exception cref="ArgumentNullException">payload is null</exception>
public void Send(BsonDocument payload, DateTime earliestGet)
{
Send(payload, earliestGet, 0.0);
}
/// <summary>
/// Send message to queue with no gridfs streams
/// </summary>
/// <param name="payload">payload</param>
/// <param name="earliestGet">earliest instant that a call to Get() can return message</param>
/// <param name="priority">priority for order out of Get(). 0 is higher priority than 1</param>
/// <exception cref="ArgumentNullException">payload is null</exception>
/// <exception cref="ArgumentException">priority was NaN</exception>
public void Send(BsonDocument payload, DateTime earliestGet, double priority)
{
Send(payload, earliestGet, priority, new List<KeyValuePair<string, Stream>>());
}
/// <summary>
/// Send message to queue.
/// </summary>
/// <param name="payload">payload</param>
/// <param name="earliestGet">earliest instant that a call to Get() can return message</param>
/// <param name="priority">priority for order out of Get(). 0 is higher priority than 1</param>
/// <param name="streams">streams to upload into gridfs</param>
/// <exception cref="ArgumentNullException">payload is null</exception>
/// <exception cref="ArgumentException">priority was NaN</exception>
/// <exception cref="ArgumentNullException">streams is null</exception>
public void Send(BsonDocument payload, DateTime earliestGet, double priority, IEnumerable<KeyValuePair<string, Stream>> streams)
{
if (payload == null) throw new ArgumentNullException("payload");
if (Double.IsNaN(priority)) throw new ArgumentException("priority was NaN", "priority");
if (streams == null) throw new ArgumentNullException("streams");
var streamIds = new BsonArray();
foreach (var stream in streams)
streamIds.Add(gridfs.Upload(stream.Value, stream.Key).Id);
var message = new BsonDocument
{
{"payload", payload},
{"running", false},
{"resetTimestamp", DateTime.MaxValue},
{"earliestGet", earliestGet},
{"priority", priority},
{"created", DateTime.UtcNow},
{"streams", streamIds},
};
collection.Insert(message);
}
#endregion
private void EnsureIndex(IndexKeysDocument index)
{
//if index is a prefix of any existing index we are good
foreach (var existingIndex in collection.GetIndexes())
{
var names = index.Names;
var values = index.Values;
var existingNamesPrefix = existingIndex.Key.Names.Take(names.Count());
var existingValuesPrefix = existingIndex.Key.Values.Take(values.Count());
if (Enumerable.SequenceEqual(names, existingNamesPrefix) && Enumerable.SequenceEqual(values, existingValuesPrefix))
return;
}
for (var i = 0; i < 5; ++i)
{
for (var name = Guid.NewGuid().ToString(); name.Length > 0; name = name.Substring(0, name.Length - 1))
{
//creating an index with the same name and different spec does nothing.
//creating an index with same spec and different name does nothing.
//so we use any generated name, and then find the right spec after we have called, and just go with that name.
try
{
collection.CreateIndex(index, new IndexOptionsDocument { {"name", name }, { "background", true } });
}
catch (MongoCommandException)
{
//this happens when the name was too long
}
foreach (var existingIndex in collection.GetIndexes())
{
if (existingIndex.Key == index)
return;
}
}
}
throw new Exception("couldnt create index after 5 attempts");
}
/// <summary>
/// Gets a random double between min and max using RNGCryptoServiceProvider
/// </summary>
/// <returns>
/// random double.
/// </returns>
internal static double GetRandomDouble(double min, double max)
{
if (Double.IsNaN(min)) throw new ArgumentException("min cannot be NaN");
if (Double.IsNaN(max)) throw new ArgumentException("max cannot be NaN");
if (max < min) throw new ArgumentException("max cannot be less than min");
var buffer = new byte[8];
new RNGCryptoServiceProvider().GetBytes(buffer);
var randomULong = BitConverter.ToUInt64(buffer, 0);
var fraction = (double)randomULong / (double)ulong.MaxValue;
var fractionOfNewRange = fraction * (max - min);
return min + fractionOfNewRange;
}
}
/// <summary>
/// Message to be given out of Get()
/// </summary>
public sealed class Message
{
public readonly Handle Handle;
public readonly BsonDocument Payload;
public readonly IDictionary<string, Stream> Streams;
/// <summary>
/// Construct Message
/// </summary>
/// <param name="handle">handle</param>
/// <param name="payload">payload</param>
/// <param name="streams">streams</param>
internal Message(Handle handle, BsonDocument payload, IDictionary<string, Stream> streams)
{
this.Handle = handle;
this.Payload = payload;
this.Streams = streams;
}
}
/// <summary>
/// Message handle to be given to Ack() and AckSend().
/// </summary>
public sealed class Handle
{
internal readonly BsonObjectId Id;
internal readonly IEnumerable<KeyValuePair<BsonValue, Stream>> Streams;
/// <summary>
/// Construct Handle
/// </summary>
/// <param name="id">id</param>
/// <param name="streams">streams</param>
internal Handle(BsonObjectId id, IEnumerable<KeyValuePair<BsonValue, Stream>> streams)
{
this.Id = id;
this.Streams = streams;
}
}
}