@@ -47,6 +47,12 @@ internal static class WorkStealingQueueList
47
47
48
48
public static WorkStealingQueue [ ] Queues => _queues ;
49
49
50
+ // Track whether the WorkStealingQueueList is empty
51
+ // Three states simplifies race conditions. They may be considered.
52
+ // Now Active --> Maybe Inactive -> Confirmed Inactive
53
+ public const int WsqNowActive = 2 ;
54
+ public static int wsqActive ;
55
+
50
56
public static void Add ( WorkStealingQueue queue )
51
57
{
52
58
Debug . Assert ( queue != null ) ;
@@ -441,6 +447,18 @@ public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal)
441
447
if ( null != tl )
442
448
{
443
449
tl . workStealingQueue . LocalPush ( callback ) ;
450
+
451
+ // We must guarantee wsqActive is set to WsqNowActive after we push
452
+ // The ordering must be global because we rely on other threads
453
+ // observing in this order
454
+ Interlocked . MemoryBarrier ( ) ;
455
+
456
+ // We do not want to simply write. We want to prevent unnecessary writes
457
+ // which would invalidate reader's caches
458
+ if ( WorkStealingQueueList . wsqActive != WorkStealingQueueList . WsqNowActive )
459
+ {
460
+ Volatile . Write ( ref WorkStealingQueueList . wsqActive , WorkStealingQueueList . WsqNowActive ) ;
461
+ }
444
462
}
445
463
else
446
464
{
@@ -458,33 +476,56 @@ internal bool LocalFindAndPop(IThreadPoolWorkItem callback)
458
476
459
477
public IThreadPoolWorkItem Dequeue ( ThreadPoolWorkQueueThreadLocals tl , ref bool missedSteal )
460
478
{
461
- WorkStealingQueue localWsq = tl . workStealingQueue ;
462
479
IThreadPoolWorkItem callback ;
463
-
464
- if ( ( callback = localWsq . LocalPop ( ) ) == null && // first try the local queue
465
- ! workItems . TryDequeue ( out callback ) ) // then try the global queue
480
+ int wsqActiveObserved = WorkStealingQueueList . wsqActive ;
481
+ if ( wsqActiveObserved > 0 )
466
482
{
467
- // finally try to steal from another thread's local queue
468
- WorkStealingQueue [ ] queues = WorkStealingQueueList . Queues ;
469
- int c = queues . Length ;
470
- Debug . Assert ( c > 0 , "There must at least be a queue for this thread." ) ;
471
- int maxIndex = c - 1 ;
472
- int i = tl . random . Next ( c ) ;
473
- while ( c > 0 )
483
+ WorkStealingQueue localWsq = tl . workStealingQueue ;
484
+
485
+ if ( ( callback = localWsq . LocalPop ( ) ) == null && // first try the local queue
486
+ ! workItems . TryDequeue ( out callback ) ) // then try the global queue
474
487
{
475
- i = ( i < maxIndex ) ? i + 1 : 0 ;
476
- WorkStealingQueue otherQueue = queues [ i ] ;
477
- if ( otherQueue != localWsq && otherQueue . CanSteal )
488
+ // finally try to steal from another thread's local queue
489
+ WorkStealingQueue [ ] queues = WorkStealingQueueList . Queues ;
490
+ int c = queues . Length ;
491
+ Debug . Assert ( c > 0 , "There must at least be a queue for this thread." ) ;
492
+ int maxIndex = c - 1 ;
493
+ int i = tl . random . Next ( c ) ;
494
+ while ( c > 0 )
478
495
{
479
- callback = otherQueue . TrySteal ( ref missedSteal ) ;
480
- if ( callback != null )
496
+ i = ( i < maxIndex ) ? i + 1 : 0 ;
497
+ WorkStealingQueue otherQueue = queues [ i ] ;
498
+ if ( otherQueue != localWsq && otherQueue . CanSteal )
481
499
{
482
- break ;
500
+ callback = otherQueue . TrySteal ( ref missedSteal ) ;
501
+ if ( callback != null )
502
+ {
503
+ break ;
504
+ }
483
505
}
506
+ c -- ;
507
+ }
508
+ if ( ( callback == null ) && ! missedSteal )
509
+ {
510
+ // Only decrement if the value is unchanged since we started looking for work
511
+ // This prevents multiple threads decrementing based on overlapping scans.
512
+ //
513
+ // When we decrement from active, the producer may have inserted a queue item during our scan
514
+ // therefore we cannot transition to empty
515
+ //
516
+ // When we decrement from Maybe Inactive, if the producer inserted a queue item during our scan,
517
+ // the producer must write Active. We may transition to empty briefly if we beat the
518
+ // producer's write, but the producer will then overwrite us before waking threads.
519
+ // So effectively we cannot mark the queue empty when an item is in the queue.
520
+ Interlocked . CompareExchange ( ref WorkStealingQueueList . wsqActive , wsqActiveObserved - 1 , wsqActiveObserved ) ;
484
521
}
485
- c -- ;
486
522
}
487
523
}
524
+ else
525
+ {
526
+ // We only need to look at the global queue since WorkStealingQueueList is inactive
527
+ workItems . TryDequeue ( out callback ) ;
528
+ }
488
529
489
530
return callback ;
490
531
}
0 commit comments