@@ -30,14 +30,13 @@ sealed class MessagePump : IPushMessages, IDisposable
3030 PushSettings settings ;
3131 CriticalError criticalError ;
3232 MessagePumpConnectionFailedCircuitBreaker circuitBreaker ;
33- TaskScheduler exclusiveScheduler ;
3433
3534 // Start
3635 int maxConcurrency ;
37- SemaphoreSlim semaphore ;
36+ long numberOfExecutingReceives ;
3837 CancellationTokenSource messageProcessing ;
3938 IConnection connection ;
40- EventingBasicConsumer consumer ;
39+ AsyncEventingBasicConsumer consumer ;
4140
4241 // Stop
4342 TaskCompletionSource < bool > connectionShutdownCompleted ;
@@ -63,8 +62,6 @@ public Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext, Task<E
6362
6463 circuitBreaker = new MessagePumpConnectionFailedCircuitBreaker ( $ "'{ settings . InputQueue } MessagePump'", timeToWaitBeforeTriggeringCircuitBreaker , criticalError ) ;
6564
66- exclusiveScheduler = new ConcurrentExclusiveSchedulerPair ( ) . ExclusiveScheduler ;
67-
6865 if ( settings . PurgeOnStartup )
6966 {
7067 queuePurger . Purge ( settings . InputQueue ) ;
@@ -76,10 +73,9 @@ public Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext, Task<E
7673 public void Start ( PushRuntimeSettings limitations )
7774 {
7875 maxConcurrency = limitations . MaxConcurrency ;
79- semaphore = new SemaphoreSlim ( limitations . MaxConcurrency , limitations . MaxConcurrency ) ;
8076 messageProcessing = new CancellationTokenSource ( ) ;
8177
82- connection = connectionFactory . CreateConnection ( $ "{ settings . InputQueue } MessagePump") ;
78+ connection = connectionFactory . CreateConnection ( $ "{ settings . InputQueue } MessagePump", consumerDispatchConcurrency : maxConcurrency ) ;
8379
8480 var channel = connection . CreateModel ( ) ;
8581
@@ -102,7 +98,7 @@ public void Start(PushRuntimeSettings limitations)
10298
10399 channel . BasicQos ( 0 , ( ushort ) Math . Min ( prefetchCount , ushort . MaxValue ) , false ) ;
104100
105- consumer = new EventingBasicConsumer ( channel ) ;
101+ consumer = new AsyncEventingBasicConsumer ( channel ) ;
106102
107103 consumer . Registered += Consumer_Registered ;
108104 connection . ConnectionShutdown += Connection_ConnectionShutdown ;
@@ -117,7 +113,7 @@ public async Task Stop()
117113 consumer . Received -= Consumer_Received ;
118114 messageProcessing . Cancel ( ) ;
119115
120- while ( semaphore . CurrentCount != maxConcurrency )
116+ while ( Interlocked . Read ( ref numberOfExecutingReceives ) > 0 )
121117 {
122118 await Task . Delay ( 50 ) . ConfigureAwait ( false ) ;
123119 }
@@ -136,9 +132,10 @@ public async Task Stop()
136132 await connectionShutdownCompleted . Task . ConfigureAwait ( false ) ;
137133 }
138134
139- void Consumer_Registered ( object sender , ConsumerEventArgs e )
135+ Task Consumer_Registered ( object sender , ConsumerEventArgs e )
140136 {
141137 circuitBreaker . Success ( ) ;
138+ return Task . CompletedTask ;
142139 }
143140
144141 void Connection_ConnectionShutdown ( object sender , ShutdownEventArgs e )
@@ -153,10 +150,16 @@ void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
153150 }
154151 }
155152
156- async void Consumer_Received ( object sender , BasicDeliverEventArgs eventArgs )
153+ async Task Consumer_Received ( object sender , BasicDeliverEventArgs eventArgs )
157154 {
158- var eventRaisingThreadId = Thread . CurrentThread . ManagedThreadId ;
155+ if ( messageProcessing . Token . IsCancellationRequested )
156+ {
157+ return ;
158+ }
159+
160+ Interlocked . Increment ( ref numberOfExecutingReceives ) ;
159161
162+ // technically we don't need this anymore
160163 var messageBody = eventArgs . Body . ToArray ( ) ;
161164
162165 var eventArgsCopy = new BasicDeliverEventArgs (
@@ -171,49 +174,16 @@ async void Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
171174
172175 try
173176 {
174- await semaphore . WaitAsync ( messageProcessing . Token ) . ConfigureAwait ( false ) ;
175- }
176- catch ( OperationCanceledException )
177- {
178- return ;
179- }
180-
181- try
182- {
183- // The current thread will be the event-raising thread if either:
184- //
185- // a) the semaphore was entered synchronously (did not have to wait).
186- // b) the event was raised on a thread pool thread,
187- // and the semaphore was entered asynchronously (had to wait),
188- // and the continuation happened to be scheduled back onto the same thread.
189- if ( Thread . CurrentThread . ManagedThreadId == eventRaisingThreadId )
190- {
191- // In RabbitMQ.Client 4.1.0, the event is raised by reusing a single, explicitly created thread,
192- // so we are in scenario (a) described above.
193- // We must yield to allow the thread to raise more events while we handle this one,
194- // otherwise we will never process messages concurrently.
195- //
196- // If a future version of RabbitMQ.Client changes its threading model, then either:
197- //
198- // 1) we are in scenario (a), but we *may not* need to yield.
199- // E.g. the client may raise the event on a new, explicitly created thread each time.
200- // 2) we cannot tell whether we are in scenario (a) or scenario (b).
201- // E.g. the client may raise the event on a thread pool thread.
202- //
203- // In both cases, we cannot tell whether we need to yield or not, so we must yield.
204- await Task . Yield ( ) ;
205- }
206-
207177 await Process ( eventArgsCopy , messageBody ) . ConfigureAwait ( false ) ;
208178 }
209179 catch ( Exception ex )
210180 {
211181 Logger . Warn ( "Failed to process message. Returning message to queue..." , ex ) ;
212- await consumer . Model . BasicRejectAndRequeueIfOpen ( eventArgs . DeliveryTag , exclusiveScheduler ) . ConfigureAwait ( false ) ;
182+ await consumer . Model . BasicRejectAndRequeueIfOpen ( eventArgs . DeliveryTag ) . ConfigureAwait ( false ) ;
213183 }
214184 finally
215185 {
216- semaphore . Release ( ) ;
186+ Interlocked . Decrement ( ref numberOfExecutingReceives ) ;
217187 }
218188 }
219189
@@ -286,7 +256,7 @@ async Task Process(BasicDeliverEventArgs message, byte[] messageBody)
286256 catch ( Exception ex )
287257 {
288258 criticalError . Raise ( $ "Failed to execute recoverability policy for message with native ID: `{ messageId } `", ex ) ;
289- await consumer . Model . BasicRejectAndRequeueIfOpen ( message . DeliveryTag , exclusiveScheduler ) . ConfigureAwait ( false ) ;
259+ await consumer . Model . BasicRejectAndRequeueIfOpen ( message . DeliveryTag ) . ConfigureAwait ( false ) ;
290260
291261 return ;
292262 }
@@ -295,13 +265,13 @@ async Task Process(BasicDeliverEventArgs message, byte[] messageBody)
295265
296266 if ( processed && tokenSource . IsCancellationRequested )
297267 {
298- await consumer . Model . BasicRejectAndRequeueIfOpen ( message . DeliveryTag , exclusiveScheduler ) . ConfigureAwait ( false ) ;
268+ await consumer . Model . BasicRejectAndRequeueIfOpen ( message . DeliveryTag ) . ConfigureAwait ( false ) ;
299269 }
300270 else
301271 {
302272 try
303273 {
304- await consumer . Model . BasicAckSingle ( message . DeliveryTag , exclusiveScheduler ) . ConfigureAwait ( false ) ;
274+ await consumer . Model . BasicAckSingle ( message . DeliveryTag ) . ConfigureAwait ( false ) ;
305275 }
306276 catch ( AlreadyClosedException ex )
307277 {
@@ -329,14 +299,14 @@ async Task MovePoisonMessage(BasicDeliverEventArgs message, string queue)
329299 catch ( Exception ex )
330300 {
331301 Logger . Error ( $ "Failed to move poison message to queue '{ queue } '. Returning message to original queue...", ex ) ;
332- await consumer . Model . BasicRejectAndRequeueIfOpen ( message . DeliveryTag , exclusiveScheduler ) . ConfigureAwait ( false ) ;
302+ await consumer . Model . BasicRejectAndRequeueIfOpen ( message . DeliveryTag ) . ConfigureAwait ( false ) ;
333303
334304 return ;
335305 }
336306
337307 try
338308 {
339- await consumer . Model . BasicAckSingle ( message . DeliveryTag , exclusiveScheduler ) . ConfigureAwait ( false ) ;
309+ await consumer . Model . BasicAckSingle ( message . DeliveryTag ) . ConfigureAwait ( false ) ;
340310 }
341311 catch ( AlreadyClosedException ex )
342312 {
@@ -347,7 +317,6 @@ async Task MovePoisonMessage(BasicDeliverEventArgs message, string queue)
347317 public void Dispose ( )
348318 {
349319 circuitBreaker ? . Dispose ( ) ;
350- semaphore ? . Dispose ( ) ;
351320 messageProcessing ? . Dispose ( ) ;
352321 connection ? . Dispose ( ) ;
353322 }
0 commit comments