-
Notifications
You must be signed in to change notification settings - Fork 48
/
Copy pathRabbitMqMandatoryDeliveryTest.cs
140 lines (114 loc) · 4.66 KB
/
RabbitMqMandatoryDeliveryTest.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using Rebus.Activation;
using Rebus.Bus;
using Rebus.Config;
using Rebus.Exceptions;
using Rebus.Logging;
using Rebus.Retry.Simple;
using Rebus.Tests.Contracts;
using Rebus.Tests.Contracts.Extensions;
namespace Rebus.RabbitMq.Tests;
[TestFixture]
public class RabbitMqMandatoryDeliveryTest : FixtureBase
{
readonly string _noneExistingQueueName = TestConfig.GetName("non-existing-queue");
readonly string _mandatoryQueue = TestConfig.GetName("mandatory-queue");
protected override void SetUp()
{
RabbitMqTransportFactory.DeleteQueue(_noneExistingQueueName).GetAwaiter().GetResult();
RabbitMqTransportFactory.DeleteQueue(_mandatoryQueue).GetAwaiter().GetResult();
}
[Test]
public void MandatoryHeaderWithoutHandlerThrows()
{
var bus = StartOneWayClient(null);
Assert.ThrowsAsync<MandatoryDeliveryException>(async () =>
{
await bus.Advanced.Routing.Send(_noneExistingQueueName, "I'm mandatory", new Dictionary<string, string>
{
[RabbitMqHeaders.Mandatory] = bool.TrueString,
});
});
}
[Test]
public async Task MandatoryHeaderCallback()
{
var messageId = Guid.NewGuid();
var gotCallback = new ManualResetEvent(false);
void Callback(object sender, BasicReturnEventArgs eventArgs)
{
if (eventArgs.Message.GetMessageId().Equals(messageId.ToString()))
{
gotCallback.Set();
}
}
var bus = StartOneWayClient(Callback);
try
{
await bus.Advanced.Routing.Send(_noneExistingQueueName, "I'm mandatory", new Dictionary<string, string>
{
[RabbitMqHeaders.MessageId] = messageId.ToString(),
[RabbitMqHeaders.Mandatory] = bool.TrueString,
});
} catch(RebusApplicationException){}
gotCallback.WaitOrDie(TimeSpan.FromSeconds(2));
}
/// <summary>
/// Exposes a potential problem with the mandatory delivery method:
/// When the mandatory header is used and an exception is thrown in the handler.
/// Then the delivery to the error queue throws a MandatoryDeliveryException because a handler for BasicReturn is not configured on the server
///
/// sender -> mandatory message -> server
/// server -> handler throws exception
/// rebus -> transfer to error queue
/// rabbit transport -> throws MandatoryDeliveryException since server does not have th required handler
/// </summary>
/// <returns></returns>
[Test, Ignore("Exposes an issue with the mandatory delivery")]
public async Task CanTransferMandatoryHeaderToErrorQueue()
{
var recievedMessageOnErrorQueue = new ManualResetEvent(false);
var inputQueueServer = StartServer(_mandatoryQueue).Handle<string>(str =>
{
Console.WriteLine($"Message arrived on queue: {_mandatoryQueue}: {str}");
throw new Exception("Exception in handler");
});
Using(inputQueueServer);
var errorQueueServer = StartServer("error").Handle<string>(str =>
{
Console.WriteLine($"Message arrived on queue: error: {str}");
recievedMessageOnErrorQueue.Set();
return Task.FromResult(0);
});
Using(errorQueueServer);
var bus = StartOneWayClient((_, _) => { /* Required eventhandler for mandatory delivery */ });
await bus.Advanced.Routing.Send(_mandatoryQueue, "I'm mandatory and throws exception", new Dictionary<string, string>
{
[RabbitMqHeaders.Mandatory] = bool.TrueString,
});
recievedMessageOnErrorQueue.WaitOrDie(TimeSpan.FromSeconds(5));
}
IBus StartOneWayClient(Action<object, BasicReturnEventArgs> basicReturnCallback)
{
var client = Using(new BuiltinHandlerActivator());
return Configure.With(client)
.Logging(l => l.Console(minLevel: LogLevel.Warn))
.Transport(t => t.UseRabbitMqAsOneWayClient(RabbitMqTransportFactory.ConnectionString)
.Mandatory(basicReturnCallback))
.Start();
}
BuiltinHandlerActivator StartServer(string queueName)
{
var activator = Using(new BuiltinHandlerActivator());
Configure.With(activator)
.Logging(l => l.Console(minLevel: LogLevel.Warn))
.Transport(t => t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, queueName))
.Options(o => o.RetryStrategy(maxDeliveryAttempts: 1))
.Start();
return activator;
}
}