Skip to content

Commit 609c4fa

Browse files
committed
fixup
1 parent 4766002 commit 609c4fa

File tree

1 file changed

+45
-4
lines changed

1 file changed

+45
-4
lines changed

projects/Test/Integration/TestAsyncConsumer.cs

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public TestAsyncConsumer(ITestOutputHelper output)
5353
public async Task TestBasicRoundtripConcurrent()
5454
{
5555
QueueDeclareOk q = await _channel.QueueDeclareAsync();
56+
5657
string publish1 = GetUniqueString(1024);
5758
byte[] body = _encoding.GetBytes(publish1);
5859
await _channel.BasicPublishAsync("", q.QueueName, body);
@@ -129,8 +130,8 @@ public async Task TestBasicRoundtripConcurrent()
129130
}
130131
finally
131132
{
132-
tokenSource.Dispose();
133133
ctsr.Dispose();
134+
tokenSource.Dispose();
134135
}
135136
}
136137

@@ -188,8 +189,28 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
188189
{
189190
using (IConnection publishConn = await _connFactory.CreateConnectionAsync())
190191
{
192+
publishConn.ConnectionShutdown += (o, ea) =>
193+
{
194+
HandleConnectionShutdown(publishConn, ea, (args) =>
195+
{
196+
if (args.Initiator == ShutdownInitiator.Peer)
197+
{
198+
MaybeSetException(ea, publish1SyncSource, publish2SyncSource);
199+
}
200+
});
201+
};
191202
using (IChannel publishChannel = await publishConn.CreateChannelAsync())
192203
{
204+
publishChannel.ChannelShutdown += (o, ea) =>
205+
{
206+
HandleChannelShutdown(publishChannel, ea, (args) =>
207+
{
208+
if (args.Initiator == ShutdownInitiator.Peer)
209+
{
210+
MaybeSetException(ea, publish1SyncSource, publish2SyncSource);
211+
}
212+
});
213+
};
193214
await publishChannel.ConfirmSelectAsync();
194215

195216
for (int i = 0; i < publish_total; i++)
@@ -210,8 +231,29 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
210231
{
211232
using (IConnection consumeConn = await _connFactory.CreateConnectionAsync())
212233
{
234+
consumeConn.ConnectionShutdown += (o, ea) =>
235+
{
236+
HandleConnectionShutdown(consumeConn, ea, (args) =>
237+
{
238+
if (args.Initiator == ShutdownInitiator.Peer)
239+
{
240+
MaybeSetException(ea, publish1SyncSource, publish2SyncSource);
241+
}
242+
});
243+
};
213244
using (IChannel consumeChannel = await consumeConn.CreateChannelAsync())
214245
{
246+
consumeChannel.ChannelShutdown += (o, ea) =>
247+
{
248+
HandleChannelShutdown(consumeChannel, ea, (args) =>
249+
{
250+
if (args.Initiator == ShutdownInitiator.Peer)
251+
{
252+
MaybeSetException(ea, publish1SyncSource, publish2SyncSource);
253+
}
254+
});
255+
};
256+
215257
var consumer = new AsyncEventingBasicConsumer(consumeChannel);
216258

217259
int publish1_count = 0;
@@ -254,7 +296,6 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
254296

255297
await AssertRanToCompletion(publishTask);
256298

257-
// ensure we get a delivery
258299
await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task);
259300
consumerSyncSource.TrySetResult(true);
260301

@@ -266,8 +307,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
266307
}
267308
finally
268309
{
269-
tokenSource.Dispose();
270310
ctsr.Dispose();
311+
tokenSource.Dispose();
271312
}
272313
}
273314

@@ -367,8 +408,8 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
367408
finally
368409
{
369410
await _channel.QueueDeleteAsync(queue: queueName);
370-
cancellationTokenSource.Dispose();
371411
ctsr.Dispose();
412+
cancellationTokenSource.Dispose();
372413
}
373414
}
374415

0 commit comments

Comments
 (0)