-
Notifications
You must be signed in to change notification settings - Fork 48
/
Copy pathRabbitMqTransportFactory.cs
151 lines (125 loc) · 5.23 KB
/
RabbitMqTransportFactory.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.Client;
using Rebus.Logging;
using Rebus.Tests.Contracts.Transports;
using Rebus.Transport;
namespace Rebus.RabbitMq.Tests;
public class RabbitMqTransportFactory : ITransportFactory
{
// connection string for default docker instance
public static string ConnectionString => RabbitMqTestContainerManager.GetConnectionString();// "amqp://guest:guest@localhost:5672";
readonly List<IDisposable> _disposables = new();
private readonly List<IAsyncDisposable> _asyncDisposables = new();
readonly HashSet<string> _queuesToDelete = new();
public ITransport CreateOneWayClient()
{
return Create(null);
}
public ITransport Create(string inputQueueAddress)
{
var transport = CreateRabbitMqTransport(inputQueueAddress);
_asyncDisposables.Add(transport);
if (inputQueueAddress != null)
{
transport.PurgeInputQueue().GetAwaiter().GetResult();
}
transport.Initialize();
if (inputQueueAddress != null)
{
_queuesToDelete.Add(inputQueueAddress);
}
return transport;
}
public void CleanUp()
{
foreach (var disposable in _disposables)
{
disposable.Dispose();
}
_disposables.Clear();
foreach (var asyncDisposable in _asyncDisposables)
{
asyncDisposable.DisposeAsync().AsTask().GetAwaiter().GetResult();
}
foreach (var queue in _queuesToDelete)
{
DeleteQueue(queue).GetAwaiter().GetResult();
}
_queuesToDelete.Clear();
}
public static async Task CreateQueue(string queueName)
{
var connectionFactory = new ConnectionFactory { Uri = new Uri(ConnectionString) };
await using var connection = await connectionFactory.CreateConnectionAsync();
await using var model = await connection.CreateChannelAsync();
await model.QueueDeclareAsync(queueName, true, false, false);
}
public static async Task DeleteQueue(string queueName)
{
var connectionFactory = new ConnectionFactory { Uri = new Uri(ConnectionString) };
await using var connection = await connectionFactory.CreateConnectionAsync();
await using var model = await connection.CreateChannelAsync();
await model.QueueDeleteAsync(queueName);
}
public static async Task<bool> QueueExists(string queueName)
{
var connectionFactory = new ConnectionFactory { Uri = new Uri(ConnectionString) };
await using var connection = await connectionFactory.CreateConnectionAsync();
await using var model = await connection.CreateChannelAsync();
try
{
await model.QueueDeclarePassiveAsync(queueName);
return true;
}
catch (RabbitMQ.Client.Exceptions.OperationInterruptedException)
{
return false;
}
}
public static async Task DeleteExchange(string exchangeName)
{
var connectionFactory = new ConnectionFactory { Uri = new Uri(ConnectionString) };
await using var connection = await connectionFactory.CreateConnectionAsync();
await using var model = await connection.CreateChannelAsync();
await model.ExchangeDeleteAsync(exchangeName);
}
/// <summary>
/// We check for the existence of the exchange with the name <paramref name="exchangeName"/> by creating another
/// randomly named exchange and trying to bind from the randomly named one to the one we want to check the existence of.
/// This causes an exception if the exchange with the name <paramref name="exchangeName"/> does not exists.
/// </summary>
public static async Task<bool> ExchangeExists(string exchangeName)
{
var connectionFactory = new ConnectionFactory { Uri = new Uri(ConnectionString) };
await using var connection = await connectionFactory.CreateConnectionAsync();
await using var model = await connection.CreateChannelAsync();
try
{
const string nonExistentTopic = "6BE38CB8-089A-4B65-BA86-0801BBC064E9------DELETE-ME";
const string fakeExchange = "FEBC2512-CEC6-46EB-A058-37F1A9642B35------DELETE-ME";
await model.ExchangeDeclareAsync(fakeExchange, ExchangeType.Direct);
try
{
await model.ExchangeBindAsync(exchangeName, fakeExchange, nonExistentTopic);
await model.ExchangeUnbindAsync(exchangeName, fakeExchange, nonExistentTopic);
return true;
}
finally
{
await model.ExchangeDeleteAsync(fakeExchange);
}
}
catch
{
return false;
}
}
protected virtual RabbitMqTransport CreateRabbitMqTransport(string inputQueueAddress)
{
var rabbitMqTransport = new RabbitMqTransport(ConnectionString, inputQueueAddress, new ConsoleLoggerFactory(false));
rabbitMqTransport.SetBlockOnReceive(blockOnReceive: false);
return rabbitMqTransport;
}
}