Skip to content

Commit c61848b

Browse files
authored
Merge pull request #121 from patrikwlund/avoid-hogging-threads
Async wait in delivery loop to avoid thread pool starvation
2 parents 0da8e0e + 2d7b7f4 commit c61848b

File tree

1 file changed

+23
-14
lines changed

1 file changed

+23
-14
lines changed

src/AddUp.FakeRabbitMQ/FakeModel.cs

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Collections.Generic;
44
using System.Linq;
55
using System.Threading;
6+
using System.Threading.Channels;
67
using System.Threading.Tasks;
78
using RabbitMQ.Client;
89
using RabbitMQ.Client.Events;
@@ -14,7 +15,11 @@ internal sealed class FakeModel : IModel
1415
{
1516
private readonly ConcurrentDictionary<ulong, RabbitMessage> workingMessages = new ConcurrentDictionary<ulong, RabbitMessage>();
1617
private readonly ConcurrentDictionary<string, IBasicConsumer> consumers = new ConcurrentDictionary<string, IBasicConsumer>();
17-
private readonly BlockingCollection<Action> deliveries = new BlockingCollection<Action>();
18+
private readonly Channel<Action> deliveries = Channel.CreateUnbounded<Action>(new UnboundedChannelOptions
19+
{
20+
SingleReader = true,
21+
SingleWriter = false,
22+
});
1823
private readonly RabbitServer server;
1924
private readonly Task deliveriesTask;
2025
private long lastDeliveryTag;
@@ -122,9 +127,9 @@ void notifyConsumerOfMessage(RabbitMessage message)
122127
_ = consumers.AddOrUpdate(consumerTag, consumer, updateFunction);
123128

124129
foreach (var message in queueInstance.Messages)
125-
deliveries.Add(() => notifyConsumerOfMessage(message));
130+
_ = deliveries.Writer.TryWrite(() => notifyConsumerOfMessage(message));
126131
queueInstance.MessagePublished += (sender, message) =>
127-
deliveries.Add(() => notifyConsumerOfMessage(message));
132+
_ = deliveries.Writer.TryWrite(() => notifyConsumerOfMessage(message));
128133

129134
if (consumer is IAsyncBasicConsumer asyncBasicConsumer)
130135
asyncBasicConsumer.HandleBasicConsumeOk(consumerTag).GetAwaiter().GetResult();
@@ -257,7 +262,7 @@ private void Close(ushort replyCode, string replyText, bool abort)
257262
try
258263
{
259264
CloseReason = reason;
260-
deliveries.CompleteAdding();
265+
deliveries.Writer.TryComplete();
261266
deliveriesTask.Wait();
262267
ModelShutdown?.Invoke(this, reason);
263268
}
@@ -282,7 +287,6 @@ public void ConfirmSelect()
282287
public void Dispose()
283288
{
284289
if (IsOpen) Abort();
285-
deliveries.Dispose();
286290
}
287291

288292
public void ExchangeBind(string destination, string source, string routingKey, IDictionary<string, object> arguments)
@@ -471,20 +475,25 @@ public bool WaitForConfirms(TimeSpan timeout, out bool timedOut)
471475
/// semantics as running delivery callbacks synchronously can cause deadlocks in
472476
/// code under test.
473477
/// </summary>
474-
private void HandleDeliveries()
478+
private async Task HandleDeliveries()
475479
{
476480
try
477481
{
478-
foreach (var delivery in deliveries.GetConsumingEnumerable())
479-
try
480-
{
481-
delivery();
482-
}
483-
catch (Exception ex)
482+
while (await deliveries.Reader.WaitToReadAsync().ConfigureAwait(false))
483+
{
484+
while (deliveries.Reader.TryRead(out var delivery))
484485
{
485-
var callbackArgs = CallbackExceptionEventArgs.Build(ex, "");
486-
CallbackException(this, callbackArgs);
486+
try
487+
{
488+
delivery();
489+
}
490+
catch (Exception ex)
491+
{
492+
var callbackArgs = CallbackExceptionEventArgs.Build(ex, "");
493+
CallbackException(this, callbackArgs);
494+
}
487495
}
496+
}
488497
}
489498
catch
490499
{

0 commit comments

Comments
 (0)