-
Notifications
You must be signed in to change notification settings - Fork 48
/
Copy pathTestRabbitMqReconnection.cs
140 lines (113 loc) · 4.55 KB
/
TestRabbitMqReconnection.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotNet.Testcontainers.Containers;
using NUnit.Framework;
using Rebus.Activation;
using Rebus.Bus;
using Rebus.Config;
using Rebus.Logging;
using Rebus.Routing.TypeBased;
using Rebus.Tests.Contracts;
using Testcontainers.RabbitMq;
#pragma warning disable 1998
namespace Rebus.RabbitMq.Tests;
[TestFixture]
[Description("Simulates a lost connection by restarting RabbitMQ while an endpoint is receiving messages")]
public class TestRabbitMqReconnection : FixtureBase
{
readonly string _receiverQueueName = TestConfig.GetName("receiver");
RabbitMqContainer _rabbitMqContainer;
BuiltinHandlerActivator _receiver;
string _initialConnectionString;
IBus _sender;
protected override void SetUp()
{
// We have to force the port here, otherwise restarting the container will give us a new port
// which obviously won't work with the previous connection string.
_rabbitMqContainer = new RabbitMqBuilder().WithPortBinding(25672, 5672).Build();
_rabbitMqContainer.StartAsync().GetAwaiter().GetResult();
Thread.Sleep(5000);
_initialConnectionString = _rabbitMqContainer.GetConnectionString();
using (var transport = new RabbitMqTransport(_initialConnectionString, _receiverQueueName, new NullLoggerFactory()))
{
transport.PurgeInputQueue().GetAwaiter().GetResult();
}
_receiver = Using(new BuiltinHandlerActivator());
Configure.With(_receiver)
.Logging(l => l.Console(LogLevel.Info))
.Transport(t => t.UseRabbitMq(_initialConnectionString, _receiverQueueName).Prefetch(1))
.Options(o =>
{
o.SetNumberOfWorkers(0);
o.SetMaxParallelism(1);
})
.Start();
var senderActivator = Using(new BuiltinHandlerActivator());
_sender = Configure.With(senderActivator)
.Logging(l => l.Console(LogLevel.Info))
.Transport(t => t.UseRabbitMqAsOneWayClient(_initialConnectionString))
.Routing(r => r.TypeBased().MapFallback(_receiverQueueName))
.Start();
}
protected override void TearDown()
{
_rabbitMqContainer.StopAsync().GetAwaiter().GetResult();
_rabbitMqContainer.DisposeAsync().AsTask().GetAwaiter().GetResult();
base.TearDown();
}
[Test]
public void WeGetAllMessagesEvenThoughRabbitMqRestarts()
{
var messages = new ConcurrentDictionary<string, bool>();
_receiver.Handle<string>(async message =>
{
Console.WriteLine($"Received '{message}'");
await Task.Delay(500);
messages[message] = true;
});
_receiver.Bus.Advanced.Workers.SetNumberOfWorkers(1);
Console.WriteLine("Sending messages...");
Enumerable.Range(0, 40)
.Select(i => $"message number {i}")
.ToList()
.ForEach(message =>
{
messages[message] = false;
_sender.Send(message).Wait();
});
Console.WriteLine("Waiting for all messages to have been handled...");
// restart RabbitMQ while we are receiving messages
ThreadPool.QueueUserWorkItem(async _ =>
{
Thread.Sleep(5000);
Console.WriteLine("Stopping RabbitMQ....");
await _rabbitMqContainer.StopAsync();
Thread.Sleep(1000);
Console.WriteLine("Starting RabbitMQ....");
await _rabbitMqContainer.StartAsync();
Assert.That(_rabbitMqContainer.GetConnectionString(), Is.EqualTo(_initialConnectionString));
});
var stopwatch = Stopwatch.StartNew();
while (true)
{
Thread.Sleep(1000);
if (messages.All(kvp => kvp.Value))
{
Console.WriteLine("All messages received :)");
break;
}
var received = messages.Count(v => v.Value);
Console.WriteLine($"Messages correctly received at this point: {received}");
if ((_rabbitMqContainer.State & TestcontainersStates.Running) != 0)
{
Assert.That(_rabbitMqContainer.GetConnectionString(), Is.EqualTo(_initialConnectionString));
}
if (stopwatch.Elapsed < TimeSpan.FromMinutes(2)) continue;
throw new TimeoutException("Waited too long!");
}
}
}