Skip to content

Commit 1e9b93e

Browse files
committed
Adjust to client changes embracing the new concurrency setting
1 parent 5fea02d commit 1e9b93e

File tree

4 files changed

+32
-60
lines changed

4 files changed

+32
-60
lines changed

src/NServiceBus.Transport.RabbitMQ/Connection/ConnectionFactory.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public ConnectionFactory(string endpointName, ConnectionConfiguration connection
4848
Password = connectionConfiguration.Password,
4949
RequestedHeartbeat = heartbeatInterval ?? connectionConfiguration.RequestedHeartbeat,
5050
NetworkRecoveryInterval = networkRecoveryInterval ?? connectionConfiguration.RetryDelay,
51-
UseBackgroundThreadsForIO = true
51+
UseBackgroundThreadsForIO = true,
52+
DispatchConsumersAsync = true
5253
};
5354

5455
connectionFactory.Ssl.ServerName = connectionConfiguration.Host;
@@ -82,12 +83,14 @@ public ConnectionFactory(string endpointName, ConnectionConfiguration connection
8283

8384
public IConnection CreateAdministrationConnection() => CreateConnection($"{endpointName} Administration", false);
8485

85-
public IConnection CreateConnection(string connectionName, bool automaticRecoveryEnabled = true)
86+
public IConnection CreateConnection(string connectionName, bool automaticRecoveryEnabled = true, int consumerDispatchConcurrency = 1)
8687
{
8788
lock (lockObject)
8889
{
8990
connectionFactory.AutomaticRecoveryEnabled = automaticRecoveryEnabled;
9091
connectionFactory.ClientProperties["connected"] = DateTime.Now.ToString("G");
92+
// it is OK to modify as long as we are under a lock
93+
connectionFactory.ConsumerDispatchConcurrency = consumerDispatchConcurrency;
9194

9295
var connection = connectionFactory.CreateConnection(connectionName);
9396

src/NServiceBus.Transport.RabbitMQ/NServiceBus.Transport.RabbitMQ.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<PackageReference Include="Obsolete.Fody" Version="5.2.1" PrivateAssets="All" />
1313
<PackageReference Include="Particular.CodeRules" Version="0.3.0" PrivateAssets="All" />
1414
<PackageReference Include="Particular.Packaging" Version="0.9.0" PrivateAssets="All" />
15-
<PackageReference Include="RabbitMQ.Client" Version="[6.1.0, 7.0.0)" />
15+
<PackageReference Include="RabbitMQ.Client" Version="[6.2.1, 7.0.0)" />
1616
</ItemGroup>
1717

1818
<PropertyGroup>

src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs

Lines changed: 22 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,13 @@ sealed class MessagePump : IPushMessages, IDisposable
3030
PushSettings settings;
3131
CriticalError criticalError;
3232
MessagePumpConnectionFailedCircuitBreaker circuitBreaker;
33-
TaskScheduler exclusiveScheduler;
3433

3534
// Start
3635
int maxConcurrency;
37-
SemaphoreSlim semaphore;
36+
int numberOfExecutingReceives;
3837
CancellationTokenSource messageProcessing;
3938
IConnection connection;
40-
EventingBasicConsumer consumer;
39+
AsyncEventingBasicConsumer consumer;
4140

4241
// Stop
4342
TaskCompletionSource<bool> connectionShutdownCompleted;
@@ -63,8 +62,6 @@ public Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext, Task<E
6362

6463
circuitBreaker = new MessagePumpConnectionFailedCircuitBreaker($"'{settings.InputQueue} MessagePump'", timeToWaitBeforeTriggeringCircuitBreaker, criticalError);
6564

66-
exclusiveScheduler = new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler;
67-
6865
if (settings.PurgeOnStartup)
6966
{
7067
queuePurger.Purge(settings.InputQueue);
@@ -76,10 +73,9 @@ public Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext, Task<E
7673
public void Start(PushRuntimeSettings limitations)
7774
{
7875
maxConcurrency = limitations.MaxConcurrency;
79-
semaphore = new SemaphoreSlim(limitations.MaxConcurrency, limitations.MaxConcurrency);
8076
messageProcessing = new CancellationTokenSource();
8177

82-
connection = connectionFactory.CreateConnection($"{settings.InputQueue} MessagePump");
78+
connection = connectionFactory.CreateConnection($"{settings.InputQueue} MessagePump", consumerDispatchConcurrency: maxConcurrency);
8379

8480
var channel = connection.CreateModel();
8581

@@ -102,7 +98,7 @@ public void Start(PushRuntimeSettings limitations)
10298

10399
channel.BasicQos(0, (ushort)Math.Min(prefetchCount, ushort.MaxValue), false);
104100

105-
consumer = new EventingBasicConsumer(channel);
101+
consumer = new AsyncEventingBasicConsumer(channel);
106102

107103
consumer.Registered += Consumer_Registered;
108104
connection.ConnectionShutdown += Connection_ConnectionShutdown;
@@ -117,7 +113,7 @@ public async Task Stop()
117113
consumer.Received -= Consumer_Received;
118114
messageProcessing.Cancel();
119115

120-
while (semaphore.CurrentCount != maxConcurrency)
116+
while (Volatile.Read(ref numberOfExecutingReceives) != maxConcurrency)
121117
{
122118
await Task.Delay(50).ConfigureAwait(false);
123119
}
@@ -136,9 +132,10 @@ public async Task Stop()
136132
await connectionShutdownCompleted.Task.ConfigureAwait(false);
137133
}
138134

139-
void Consumer_Registered(object sender, ConsumerEventArgs e)
135+
Task Consumer_Registered(object sender, ConsumerEventArgs e)
140136
{
141137
circuitBreaker.Success();
138+
return Task.CompletedTask;
142139
}
143140

144141
void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
@@ -153,10 +150,16 @@ void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
153150
}
154151
}
155152

156-
async void Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
153+
async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
157154
{
158-
var eventRaisingThreadId = Thread.CurrentThread.ManagedThreadId;
155+
if (messageProcessing.Token.IsCancellationRequested)
156+
{
157+
return;
158+
}
159+
160+
Interlocked.Increment(ref numberOfExecutingReceives);
159161

162+
// technically we don't need this anymore
160163
var messageBody = eventArgs.Body.ToArray();
161164

162165
var eventArgsCopy = new BasicDeliverEventArgs(
@@ -171,49 +174,16 @@ async void Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
171174

172175
try
173176
{
174-
await semaphore.WaitAsync(messageProcessing.Token).ConfigureAwait(false);
175-
}
176-
catch (OperationCanceledException)
177-
{
178-
return;
179-
}
180-
181-
try
182-
{
183-
// The current thread will be the event-raising thread if either:
184-
//
185-
// a) the semaphore was entered synchronously (did not have to wait).
186-
// b) the event was raised on a thread pool thread,
187-
// and the semaphore was entered asynchronously (had to wait),
188-
// and the continuation happened to be scheduled back onto the same thread.
189-
if (Thread.CurrentThread.ManagedThreadId == eventRaisingThreadId)
190-
{
191-
// In RabbitMQ.Client 4.1.0, the event is raised by reusing a single, explicitly created thread,
192-
// so we are in scenario (a) described above.
193-
// We must yield to allow the thread to raise more events while we handle this one,
194-
// otherwise we will never process messages concurrently.
195-
//
196-
// If a future version of RabbitMQ.Client changes its threading model, then either:
197-
//
198-
// 1) we are in scenario (a), but we *may not* need to yield.
199-
// E.g. the client may raise the event on a new, explicitly created thread each time.
200-
// 2) we cannot tell whether we are in scenario (a) or scenario (b).
201-
// E.g. the client may raise the event on a thread pool thread.
202-
//
203-
// In both cases, we cannot tell whether we need to yield or not, so we must yield.
204-
await Task.Yield();
205-
}
206-
207177
await Process(eventArgsCopy, messageBody).ConfigureAwait(false);
208178
}
209179
catch (Exception ex)
210180
{
211181
Logger.Warn("Failed to process message. Returning message to queue...", ex);
212-
await consumer.Model.BasicRejectAndRequeueIfOpen(eventArgs.DeliveryTag, exclusiveScheduler).ConfigureAwait(false);
182+
await consumer.Model.BasicRejectAndRequeueIfOpen(eventArgs.DeliveryTag).ConfigureAwait(false);
213183
}
214184
finally
215185
{
216-
semaphore.Release();
186+
Interlocked.Decrement(ref numberOfExecutingReceives);
217187
}
218188
}
219189

@@ -286,7 +256,7 @@ async Task Process(BasicDeliverEventArgs message, byte[] messageBody)
286256
catch (Exception ex)
287257
{
288258
criticalError.Raise($"Failed to execute recoverability policy for message with native ID: `{messageId}`", ex);
289-
await consumer.Model.BasicRejectAndRequeueIfOpen(message.DeliveryTag, exclusiveScheduler).ConfigureAwait(false);
259+
await consumer.Model.BasicRejectAndRequeueIfOpen(message.DeliveryTag).ConfigureAwait(false);
290260

291261
return;
292262
}
@@ -295,13 +265,13 @@ async Task Process(BasicDeliverEventArgs message, byte[] messageBody)
295265

296266
if (processed && tokenSource.IsCancellationRequested)
297267
{
298-
await consumer.Model.BasicRejectAndRequeueIfOpen(message.DeliveryTag, exclusiveScheduler).ConfigureAwait(false);
268+
await consumer.Model.BasicRejectAndRequeueIfOpen(message.DeliveryTag).ConfigureAwait(false);
299269
}
300270
else
301271
{
302272
try
303273
{
304-
await consumer.Model.BasicAckSingle(message.DeliveryTag, exclusiveScheduler).ConfigureAwait(false);
274+
await consumer.Model.BasicAckSingle(message.DeliveryTag).ConfigureAwait(false);
305275
}
306276
catch (AlreadyClosedException ex)
307277
{
@@ -329,14 +299,14 @@ async Task MovePoisonMessage(BasicDeliverEventArgs message, string queue)
329299
catch (Exception ex)
330300
{
331301
Logger.Error($"Failed to move poison message to queue '{queue}'. Returning message to original queue...", ex);
332-
await consumer.Model.BasicRejectAndRequeueIfOpen(message.DeliveryTag, exclusiveScheduler).ConfigureAwait(false);
302+
await consumer.Model.BasicRejectAndRequeueIfOpen(message.DeliveryTag).ConfigureAwait(false);
333303

334304
return;
335305
}
336306

337307
try
338308
{
339-
await consumer.Model.BasicAckSingle(message.DeliveryTag, exclusiveScheduler).ConfigureAwait(false);
309+
await consumer.Model.BasicAckSingle(message.DeliveryTag).ConfigureAwait(false);
340310
}
341311
catch (AlreadyClosedException ex)
342312
{
@@ -347,7 +317,6 @@ async Task MovePoisonMessage(BasicDeliverEventArgs message, string queue)
347317
public void Dispose()
348318
{
349319
circuitBreaker?.Dispose();
350-
semaphore?.Dispose();
351320
messageProcessing?.Dispose();
352321
connection?.Dispose();
353322
}

src/NServiceBus.Transport.RabbitMQ/Receiving/ModelExtensions.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,17 @@ class MessageState
1313
public ulong DeliveryTag { get; set; }
1414
}
1515

16-
public static Task BasicAckSingle(this IModel channel, ulong deliveryTag, TaskScheduler scheduler) =>
16+
public static Task BasicAckSingle(this IModel channel, ulong deliveryTag) =>
1717
TaskEx.StartNew(
1818
new MessageState { Channel = channel, DeliveryTag = deliveryTag },
1919
state =>
2020
{
2121
var messageState = (MessageState)state;
2222
messageState.Channel.BasicAck(messageState.DeliveryTag, false);
2323
},
24-
scheduler);
24+
TaskScheduler.Default);
2525

26-
public static Task BasicRejectAndRequeueIfOpen(this IModel channel, ulong deliveryTag, TaskScheduler scheduler) =>
26+
public static Task BasicRejectAndRequeueIfOpen(this IModel channel, ulong deliveryTag) =>
2727
TaskEx.StartNew(
2828
new MessageState { Channel = channel, DeliveryTag = deliveryTag },
2929
state =>
@@ -41,6 +41,6 @@ public static Task BasicRejectAndRequeueIfOpen(this IModel channel, ulong delive
4141
}
4242
}
4343
},
44-
scheduler);
44+
TaskScheduler.Default);
4545
}
4646
}

0 commit comments

Comments
 (0)