Skip to content

Commit 057bff3

Browse files
committed
* Add a default consumer to see if perhaps that's why these tests sometimes fail.
1 parent afafcbc commit 057bff3

File tree

2 files changed

+57
-2
lines changed

2 files changed

+57
-2
lines changed

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace RabbitMQ.Client.ConsumerDispatching
77
#nullable enable
88
internal abstract class ConsumerDispatcherBase
99
{
10-
private static readonly FallbackConsumer fallbackConsumer = new FallbackConsumer();
10+
private static readonly FallbackConsumer s_fallbackConsumer = new FallbackConsumer();
1111
private readonly Dictionary<string, IBasicConsumer> _consumers = new Dictionary<string, IBasicConsumer>();
1212

1313
public IBasicConsumer? DefaultConsumer { get; set; }
@@ -74,7 +74,7 @@ private void DoShutdownConsumers(ShutdownEventArgs reason)
7474
[MethodImpl(MethodImplOptions.NoInlining)]
7575
private IBasicConsumer GetDefaultOrFallbackConsumer()
7676
{
77-
return DefaultConsumer ?? fallbackConsumer;
77+
return DefaultConsumer ?? s_fallbackConsumer;
7878
}
7979
}
8080
}

projects/Test/Integration/TestAsyncConsumer.cs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public TestAsyncConsumer(ITestOutputHelper output)
5252
[Fact]
5353
public async Task TestBasicRoundtripConcurrent()
5454
{
55+
_channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output);
56+
5557
QueueDeclareOk q = await _channel.QueueDeclareAsync();
5658

5759
const int length = 4096;
@@ -129,6 +131,8 @@ public async Task TestBasicRoundtripConcurrent()
129131
[Fact]
130132
public async Task TestBasicRoundtripConcurrentManyMessages()
131133
{
134+
_channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output);
135+
132136
const int publish_total = 4096;
133137
const int length = 512;
134138
string queueName = GenerateQueueName();
@@ -185,6 +189,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
185189
};
186190
using (IChannel publishChannel = await publishConn.CreateChannelAsync())
187191
{
192+
publishChannel.DefaultConsumer = new DefaultAsyncConsumer("publishChannel,", _output);
188193
publishChannel.ChannelShutdown += (o, ea) =>
189194
{
190195
HandleChannelShutdown(publishChannel, ea, (args) =>
@@ -221,6 +226,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
221226
};
222227
using (IChannel consumeChannel = await consumeConn.CreateChannelAsync())
223228
{
229+
consumeChannel.DefaultConsumer = new DefaultAsyncConsumer("consumeChannel,", _output);
224230
consumeChannel.ChannelShutdown += (o, ea) =>
225231
{
226232
HandleChannelShutdown(consumeChannel, ea, (args) =>
@@ -659,5 +665,54 @@ private static bool ByteArraysEqual(ReadOnlySpan<byte> a1, ReadOnlySpan<byte> a2
659665
{
660666
return a1.SequenceEqual(a2);
661667
}
668+
669+
private class DefaultAsyncConsumer : AsyncDefaultBasicConsumer
670+
{
671+
private readonly string _logPrefix;
672+
private readonly ITestOutputHelper _output;
673+
674+
public DefaultAsyncConsumer(string logPrefix, ITestOutputHelper output)
675+
{
676+
_logPrefix = logPrefix;
677+
_output = output;
678+
}
679+
680+
public override Task HandleBasicCancel(string consumerTag)
681+
{
682+
_output.WriteLine("[ERROR] {0} HandleBasicCancel {1}", _logPrefix, consumerTag);
683+
return base.HandleBasicCancel(consumerTag);
684+
}
685+
686+
public override Task HandleBasicCancelOk(string consumerTag)
687+
{
688+
_output.WriteLine("[ERROR] {0} HandleBasicCancelOk {1}", _logPrefix, consumerTag);
689+
return base.HandleBasicCancelOk(consumerTag);
690+
}
691+
692+
public override Task HandleBasicConsumeOk(string consumerTag)
693+
{
694+
_output.WriteLine("[ERROR] {0} HandleBasicConsumeOk {1}", _logPrefix, consumerTag);
695+
return base.HandleBasicConsumeOk(consumerTag);
696+
}
697+
698+
public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered,
699+
string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
700+
{
701+
_output.WriteLine("[ERROR] {0} HandleBasicDeliver {1}", _logPrefix, consumerTag);
702+
return base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
703+
}
704+
705+
public override Task HandleChannelShutdown(object channel, ShutdownEventArgs reason)
706+
{
707+
_output.WriteLine("[ERROR] {0} HandleChannelShutdown", _logPrefix);
708+
return base.HandleChannelShutdown(channel, reason);
709+
}
710+
711+
public override Task OnCancel(params string[] consumerTags)
712+
{
713+
_output.WriteLine("[ERROR] {0} OnCancel {1}", _logPrefix, consumerTags[0]);
714+
return base.OnCancel(consumerTags);
715+
}
716+
}
662717
}
663718
}

0 commit comments

Comments
 (0)