11using System ;
22using System . Collections . Concurrent ;
33using System . Threading ;
4+ using System . Threading . Channels ;
45using System . Threading . Tasks ;
56
67namespace RabbitMQ . Client . Impl
@@ -61,20 +62,16 @@ internal Task StopWorkAsync(IModel model)
6162
6263 class WorkPool
6364 {
64- readonly ConcurrentQueue < Action > _actions ;
65- readonly CancellationTokenSource _tokenSource ;
66- readonly CancellationTokenRegistration _tokenRegistration ;
67- volatile TaskCompletionSource < bool > _syncSource = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
65+ private readonly Channel < Action > _channel ;
6866 private readonly int _concurrency ;
6967 private Task _worker ;
68+ CancellationTokenSource _tokenSource ;
7069 private SemaphoreSlim _limiter ;
7170
7271 public WorkPool ( int concurrency )
7372 {
7473 _concurrency = concurrency ;
75- _actions = new ConcurrentQueue < Action > ( ) ;
76- _tokenSource = new CancellationTokenSource ( ) ;
77- _tokenRegistration = _tokenSource . Token . Register ( ( ) => _syncSource . TrySetCanceled ( ) ) ;
74+ _channel = Channel . CreateUnbounded < Action > ( new UnboundedChannelOptions { SingleReader = true , SingleWriter = false , AllowSynchronousContinuations = false } ) ;
7875 }
7976
8077 public void Start ( )
@@ -86,37 +83,27 @@ public void Start()
8683 else
8784 {
8885 _limiter = new SemaphoreSlim ( _concurrency ) ;
86+ _tokenSource = new CancellationTokenSource ( ) ;
8987 _worker = Task . Run ( ( ) => LoopWithConcurrency ( _tokenSource . Token ) , CancellationToken . None ) ;
9088 }
9189 }
9290
9391 public void Enqueue ( Action action )
9492 {
95- _actions . Enqueue ( action ) ;
96- _syncSource . TrySetResult ( true ) ;
93+ _channel . Writer . TryWrite ( action ) ;
9794 }
9895
9996 async Task Loop ( )
10097 {
101- while ( _tokenSource . IsCancellationRequested == false )
98+ while ( await _channel . Reader . WaitToReadAsync ( ) . ConfigureAwait ( false ) )
10299 {
103- try
104- {
105- await _syncSource . Task . ConfigureAwait ( false ) ;
106- _syncSource = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
107- }
108- catch ( TaskCanceledException )
109- {
110- // Swallowing the task cancellation exception for the semaphore in case we are stopping.
111- }
112-
113- while ( _actions . TryDequeue ( out Action action ) )
100+ while ( _channel . Reader . TryRead ( out Action work ) )
114101 {
115102 try
116103 {
117- action ( ) ;
104+ work ( ) ;
118105 }
119- catch ( Exception )
106+ catch ( Exception )
120107 {
121108 // ignored
122109 }
@@ -126,36 +113,37 @@ async Task Loop()
126113
127114 async Task LoopWithConcurrency ( CancellationToken cancellationToken )
128115 {
129- while ( _tokenSource . IsCancellationRequested == false )
116+ try
130117 {
131- try
132- {
133- await _syncSource . Task . ConfigureAwait ( false ) ;
134- _syncSource = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
135- }
136- catch ( TaskCanceledException )
118+ while ( await _channel . Reader . WaitToReadAsync ( cancellationToken ) . ConfigureAwait ( false ) )
137119 {
138- // Swallowing the task cancellation exception for the semaphore in case we are stopping.
139- }
140-
141- while ( _actions . TryDequeue ( out Action action ) )
142- {
143- // Do a quick synchronous check before we resort to async/await with the state-machine overhead.
144- if ( ! _limiter . Wait ( 0 ) )
120+ while ( _channel . Reader . TryRead ( out Action action ) )
145121 {
146- await _limiter . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
147- }
122+ // Do a quick synchronous check before we resort to async/await with the state-machine overhead.
123+ if ( ! _limiter . Wait ( 0 ) )
124+ {
125+ await _limiter . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
126+ }
148127
149- _ = OffloadToWorkerThreadPool ( action , _limiter ) ;
128+ _ = OffloadToWorkerThreadPool ( action , _limiter ) ;
129+ }
150130 }
151131 }
132+ catch ( OperationCanceledException )
133+ {
134+ // ignored
135+ }
152136 }
153137
154138 static async Task OffloadToWorkerThreadPool ( Action action , SemaphoreSlim limiter )
155139 {
156140 try
157141 {
158- await Task . Run ( ( ) => action ( ) ) ;
142+ // like Task.Run but doesn't closure allocate
143+ await Task . Factory . StartNew ( state =>
144+ {
145+ ( ( Action ) state ) ( ) ;
146+ } , action , CancellationToken . None , TaskCreationOptions . DenyChildAttach , TaskScheduler . Default ) ;
159147 }
160148 catch ( Exception )
161149 {
@@ -169,8 +157,8 @@ static async Task OffloadToWorkerThreadPool(Action action, SemaphoreSlim limiter
169157
170158 public Task Stop ( )
171159 {
172- _tokenSource . Cancel ( ) ;
173- _tokenRegistration . Dispose ( ) ;
160+ _channel . Writer . Complete ( ) ;
161+ _tokenSource ? . Cancel ( ) ;
174162 _limiter ? . Dispose ( ) ;
175163 return _worker ;
176164 }
0 commit comments