Skip to content

Commit ada5666

Browse files
Add Close to Producer and Consumer class
Add Close/0 method as in Java and Go. Implement IDisposable interface. Dispose internally calls Close/0. The Close/0 method removes the Producer/Consumer from the server list and Close the TCP connection if there are no producers and consumers Co-Authored-By: Ivan Maximov <sungam3r@yandex.ru>
1 parent 14df95c commit ada5666

File tree

11 files changed

+357
-120
lines changed

11 files changed

+357
-120
lines changed

.github/workflows/nuget.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ jobs:
3232
- name: Build
3333
run: dotnet build --no-restore
3434
- name: Test
35-
run: dotnet test Tests/Tests.csproj --no-build --verbosity normal
35+
run: dotnet test Tests/Tests.csproj --no-build --logger "console;verbosity=detailed"
3636
- name: Publish RabbitMQ.Stream.Client
3737
uses: brandedoutcast/publish-nuget@v2.5.2
3838
with:

RabbitMQ.Stream.Client/Client.cs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Buffers;
44
using System.Collections.Concurrent;
55
using System.Collections.Generic;
6+
using System.Diagnostics;
67
using System.Linq;
78
using System.Linq.Expressions;
89
using System.Net;
@@ -134,26 +135,26 @@ public static async Task<Client> Create(ClientParameters parameters)
134135
// exchange properties
135136
var peerPropertiesResponse = await client.Request<PeerPropertiesRequest, PeerPropertiesResponse>(corr => new PeerPropertiesRequest(corr, parameters.Properties));
136137
foreach (var (k, v) in peerPropertiesResponse.Properties)
137-
Console.WriteLine($"server Props {k} {v}");
138+
Debug.WriteLine($"server Props {k} {v}");
138139

139140
//auth
140141
var saslHandshakeResponse = await client.Request<SaslHandshakeRequest, SaslHandshakeResponse>(corr => new SaslHandshakeRequest(corr));
141142
foreach (var m in saslHandshakeResponse.Mechanisms)
142-
Console.WriteLine($"sasl mechanism: {m}");
143+
Debug.WriteLine($"sasl mechanism: {m}");
143144

144145
var saslData = Encoding.UTF8.GetBytes($"\0{parameters.UserName}\0{parameters.Password}");
145146
var authResponse = await client.Request<SaslAuthenticateRequest, SaslAuthenticateResponse>(corr => new SaslAuthenticateRequest(corr, "PLAIN", saslData));
146-
Console.WriteLine($"auth: {authResponse.ResponseCode} {authResponse.Data}");
147+
Debug.WriteLine($"auth: {authResponse.ResponseCode} {authResponse.Data}");
147148

148149
//tune
149150
var tune = await client.tuneReceived.Task;
150151
await client.Publish(new TuneRequest(0, 0));
151152

152153
// open
153154
var open = await client.Request<OpenRequest, OpenResponse>(corr => new OpenRequest(corr, "/"));
154-
Console.WriteLine($"open: {open.ResponseCode} {open.ConnectionProperties.Count}");
155+
Debug.WriteLine($"open: {open.ResponseCode} {open.ConnectionProperties.Count}");
155156
foreach (var (k, v) in open.ConnectionProperties)
156-
Console.WriteLine($"open prop: {k} {v}");
157+
Debug.WriteLine($"open prop: {k} {v}");
157158

158159
client.correlationId = 100;
159160
return client;
@@ -222,7 +223,7 @@ private async ValueTask<TOut> Request<TIn, TOut>(Func<uint, TIn> request, int ti
222223
await Publish(request(corr));
223224
using (CancellationTokenSource cts = new CancellationTokenSource(timeout))
224225
{
225-
using (cts.Token.Register(valueTaskSource => ((ManualResetValueTaskSource<TOut>)valueTaskSource).SetException(new TimeoutException()), tcs))
226+
await using (cts.Token.Register(valueTaskSource => ((ManualResetValueTaskSource<TOut>)valueTaskSource).SetException(new TimeoutException()), tcs))
226227
{
227228
var valueTask = new ValueTask<TOut>(tcs, tcs.Version);
228229
var result = await valueTask;
@@ -383,6 +384,19 @@ public async Task<CloseResponse> Close(string reason)
383384

384385
return result;
385386
}
387+
388+
// Safe close
389+
// the client can be closed only if the publishers are == 0
390+
// not a public method used internally by producers and consumers
391+
internal CloseResponse MaybeClose(string reason)
392+
{
393+
if (publishers.Count == 0 && consumers.Count ==0)
394+
{
395+
return this.Close(reason).Result;
396+
}
397+
var result = new CloseResponse(0, ResponseCode.Ok);
398+
return result;
399+
}
386400

387401
public async ValueTask<QueryPublisherResponse> QueryPublisherSequence(string publisherRef, string stream)
388402
{

RabbitMQ.Stream.Client/Consumer.cs

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
namespace RabbitMQ.Stream.Client
77
{
8-
98
public struct MessageContext
109
{
1110
public ulong Offset { get; }
@@ -18,20 +17,21 @@ public MessageContext(ulong offset, TimeSpan timestamp)
1817
this.Timestamp = timestamp;
1918
}
2019
}
21-
20+
2221
public record ConsumerConfig
2322
{
2423
public string Stream { get; set; }
2524
public string Reference { get; set; }
2625
public Func<Consumer, MessageContext, Message, Task> MessageHandler { get; set; }
2726
public IOffsetType OffsetSpec { get; set; } = new OffsetTypeNext();
2827
}
29-
30-
public class Consumer
28+
29+
public class Consumer : IDisposable
3130
{
3231
private readonly Client client;
3332
private readonly ConsumerConfig config;
3433
private byte subscriberId;
34+
private bool _disposed;
3535

3636
private Consumer(Client client, ConsumerConfig config)
3737
{
@@ -77,8 +77,45 @@ await config.MessageHandler(this,
7777
this.subscriberId = consumerId;
7878
return;
7979
}
80-
80+
8181
throw new CreateConsumerException($"consumer could not be created code: {response.ResponseCode}");
8282
}
83+
84+
public async Task<ResponseCode> Close()
85+
{
86+
if (_disposed)
87+
return ResponseCode.Ok;
88+
89+
var deleteConsumerResponse = await this.client.Unsubscribe(this.subscriberId);
90+
var result = deleteConsumerResponse.ResponseCode;
91+
var closed = this.client.MaybeClose($"client-close-subscriber: {this.subscriberId}");
92+
if (closed.ResponseCode != ResponseCode.Ok)
93+
{
94+
// TODO replace with new logger
95+
Console.WriteLine($"Error during close tcp connection. Subscriber: {this.subscriberId}");
96+
}
97+
98+
_disposed = true;
99+
return result;
100+
}
101+
102+
//
103+
private void Dispose(bool disposing)
104+
{
105+
if (!disposing) return;
106+
107+
var closeProducer = this.Close();
108+
if (closeProducer.Result != ResponseCode.Ok)
109+
{
110+
// TODO replace with new logger
111+
Console.WriteLine($"Error during remove producer. Subscriber: {this.subscriberId}");
112+
}
113+
}
114+
115+
public void Dispose()
116+
{
117+
Dispose(true);
118+
GC.SuppressFinalize(this);
119+
}
83120
}
84121
}

RabbitMQ.Stream.Client/Producer.cs

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public record ProducerConfig
2222
public Action<Confirmation> ConfirmHandler { get; set; } = _ => { };
2323
}
2424

25-
public class Producer
25+
public class Producer : IDisposable
2626
{
2727
private readonly Client client;
2828
private byte publisherId;
@@ -34,12 +34,17 @@ public class Producer
3434

3535
private readonly ConcurrentQueue<TaskCompletionSource> flows = new();
3636
private Task publishTask;
37+
private bool _disposed;
3738

3839
private Producer(Client client, ProducerConfig config)
3940
{
4041
this.client = client;
4142
this.config = config;
42-
this.messageBuffer = Channel.CreateBounded<OutgoingMsg>(new BoundedChannelOptions(10000) { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = false, FullMode = BoundedChannelFullMode.Wait });
43+
this.messageBuffer = Channel.CreateBounded<OutgoingMsg>(new BoundedChannelOptions(10000)
44+
{
45+
AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = false,
46+
FullMode = BoundedChannelFullMode.Wait
47+
});
4348
this.publishTask = Task.Run(ProcessBuffer);
4449
this.semaphore = new(config.MaxInFlight, config.MaxInFlight);
4550
}
@@ -84,7 +89,7 @@ private async Task Init()
8489
public async ValueTask Send(ulong publishingId, Message message)
8590
{
8691
// Let's see if we can get a semaphore without having to wait, which should be the case most of the time
87-
if(!semaphore.Wait(0))
92+
if (!semaphore.Wait(0))
8893
{
8994
// Nope, we have maxed our In-Flight messages, let's asynchrnously wait for confirms
9095
if (!await semaphore.WaitAsync(1000).ConfigureAwait(false))
@@ -94,11 +99,10 @@ public async ValueTask Send(ulong publishingId, Message message)
9499
}
95100

96101
var msg = new OutgoingMsg(publisherId, publishingId, message);
97-
98-
// Let's see if we can write a message to the channel without having to wait
99-
if(!messageBuffer.Writer.TryWrite(msg))
100-
{
101102

103+
// Let's see if we can write a message to the channel without having to wait
104+
if (!messageBuffer.Writer.TryWrite(msg))
105+
{
102106
// Nope, channel is full and being processed, let's asynchronously wait until we can buffer the message
103107
await messageBuffer.Writer.WriteAsync(msg).ConfigureAwait(false);
104108
}
@@ -137,12 +141,49 @@ async Task SendMessages(List<(ulong, Message)> messages)
137141
}
138142
}
139143

144+
public async Task<ResponseCode> Close()
145+
{
146+
if (_disposed)
147+
return ResponseCode.Ok;
148+
149+
var deletePublisherResponse = await this.client.DeletePublisher(this.publisherId);
150+
var result = deletePublisherResponse.ResponseCode;
151+
var closed = this.client.MaybeClose($"client-close-publisher: {this.publisherId}");
152+
if (closed.ResponseCode != ResponseCode.Ok)
153+
{
154+
// TODO replace with new logger
155+
Console.WriteLine($"Error during close tcp connection. Producer: {this.publisherId}");
156+
}
157+
158+
_disposed = true;
159+
return result;
160+
}
161+
140162
public static async Task<Producer> Create(ClientParameters clientParameters, ProducerConfig config)
141163
{
142164
var client = await Client.Create(clientParameters);
143165
var producer = new Producer(client, config);
144166
await producer.Init();
145167
return producer;
146168
}
169+
170+
private void Dispose(bool disposing)
171+
{
172+
if (!disposing) return;
173+
174+
var closeProducer = this.Close();
175+
if (closeProducer.Result != ResponseCode.Ok)
176+
{
177+
// TODO replace with new logger
178+
Console.WriteLine($"Error during remove producer. Producer: {this.publisherId}");
179+
}
180+
}
181+
182+
183+
public void Dispose()
184+
{
185+
Dispose(true);
186+
GC.SuppressFinalize(this);
187+
}
147188
}
148189
}

RabbitMQ.Stream.Client/StreamSystem.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ public CreateConsumerException(string s) : base(s)
102102
{
103103
}
104104
}
105-
105+
106+
106107
public class CreateStreamException : Exception
107108
{
108109
public CreateStreamException(string s) : base(s) { }
@@ -112,6 +113,7 @@ public class CreateProducerException : Exception
112113
{
113114
public CreateProducerException(string s) : base(s) { }
114115
}
116+
115117

116118
public struct Properties
117119
{

RabbitMQ.Stream.Client/Unubscribe.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ private UnsubscribeResponse(uint correlationId, ResponseCode responseCode)
1919

2020
public uint CorrelationId => correlationId;
2121

22-
public ResponseCode Code => responseCode;
22+
public ResponseCode ResponseCode => responseCode;
2323

2424
public int Write(Span<byte> span)
2525
{

Tests/ClientTests.cs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using RabbitMQ.Stream.Client;
1010
using Xunit;
1111
using Xunit.Abstractions;
12+
[assembly: CollectionBehavior(DisableTestParallelization = true)]
1213

1314
namespace Tests
1415
{
@@ -86,6 +87,7 @@ public async void MetadataUpdateIsHandled()
8687
var mdu = testPassed.Task.Result;
8788
Assert.Equal(stream, mdu.Stream);
8889
Assert.Equal(ResponseCode.StreamNotAvailable, mdu.Code);
90+
await client.Close("done");
8991
}
9092

9193
[Fact]
@@ -100,6 +102,7 @@ public async void DeclarePublisherShouldReturnErrorCode()
100102

101103
var (publisherId, result) = await client.DeclarePublisher(publisherRef, "this-stream-does-not-exist", confirmed, errored);
102104
Assert.Equal(ResponseCode.StreamDoesNotExist, result.ResponseCode);
105+
await client.Close("done");
103106
}
104107

105108
[Fact]
@@ -131,13 +134,16 @@ public async void PublishShouldError()
131134
var msgData = new Message(Encoding.UTF8.GetBytes("hi"));
132135
await client.Publish(new Publish(publisherId, new List<(ulong, Message)> { (100, msgData) }));
133136

134-
Assert.True(testPassed.Task.Wait(5000));
137+
new Utils<bool>(testOutputHelper).WaitUntilTaskCompletes(testPassed);
138+
135139
Assert.True(testPassed.Task.Result);
140+
await client.Close("done");
136141
}
137142

138143
[Fact]
139144
public async void PublishShouldConfirm()
140145
{
146+
testOutputHelper.WriteLine("PublishShouldConfirm");
141147
var stream = Guid.NewGuid().ToString();
142148
var clientParameters = new ClientParameters { };
143149
var client = await Client.Create(clientParameters);
@@ -171,14 +177,16 @@ public async void PublishShouldConfirm()
171177
if ((int)i - numConfirmed > 1000)
172178
await Task.Delay(10);
173179
}
174-
Assert.True(testPassed.Task.Wait(10000));
180+
new Utils<bool>(testOutputHelper).WaitUntilTaskCompletes(testPassed);
181+
175182
Debug.WriteLine($"num confirmed {numConfirmed}");
176183
Assert.True(testPassed.Task.Result);
177184
await client.DeleteStream(stream);
178185
var closeResponse = await client.Close("finished");
179186
Assert.Equal(ResponseCode.Ok, closeResponse.ResponseCode);
180187
Assert.True(client.IsClosed);
181188
//Assert.Throws<AggregateException>(() => client.Close("finished").Result);
189+
await client.Close("done");
182190
}
183191

184192
[Fact]
@@ -209,8 +217,10 @@ public async void ConsumerShouldReceiveDelivery()
209217
(0, new Message(Encoding.UTF8.GetBytes("hi"))),
210218
(1, new Message(Encoding.UTF8.GetBytes("hi")))
211219
}));
212-
Assert.True(testPassed.Task.Wait(10000));
220+
new Utils<Deliver>(testOutputHelper).WaitUntilTaskCompletes(testPassed);
221+
213222
Assert.Equal(2, msgs.Count());
223+
await client.Close("done");
214224
}
215225

216226

@@ -314,13 +324,16 @@ public async void ConsumerShouldReceiveDeliveryAfterCredit()
314324
var publisherRef = Guid.NewGuid().ToString();
315325
var (publisherId, declarePubResp) = await client.DeclarePublisher(publisherRef, stream, _ => { }, _ => { });
316326
await client.Publish(new Publish(publisherId, new List<(ulong, Message)> { (0, new Message(Array.Empty<byte>())) }));
317-
Assert.False(testPassed.Task.Wait(1000));
327+
new Utils<Deliver>(testOutputHelper).WaitUntilTaskCompletes(testPassed, false);
328+
318329
//We have not credited yet
319330
await client.Credit(subId, 1);
320331

321-
Assert.True(testPassed.Task.Wait(10000));
332+
new Utils<Deliver>(testOutputHelper).WaitUntilTaskCompletes(testPassed);
333+
322334
var delivery = testPassed.Task.Result;
323335
Assert.Single(delivery.Messages);
336+
await client.Close("done");
324337
}
325338

326339
[Fact]
@@ -344,12 +357,14 @@ public async void UnsubscribedConsumerShouldNotReceiveDelivery()
344357

345358
var unsubscribeResponse = await client.Unsubscribe(subId);
346359

347-
Assert.Equal(ResponseCode.Ok, unsubscribeResponse.Code);
360+
Assert.Equal(ResponseCode.Ok, unsubscribeResponse.ResponseCode);
348361
var publisherRef = Guid.NewGuid().ToString();
349362
var (publisherId, declarePubResp) = await client.DeclarePublisher(publisherRef, stream, _ => { }, _ => { });
350363
await client.Publish(new Publish(publisherId, new List<(ulong, Message)> { (0, new Message(Array.Empty<byte>())) }));
351364
// 1s should be enough to catch this at least some of the time
352-
Assert.False(testPassed.Task.Wait(1000));
365+
new Utils<Deliver>(testOutputHelper).WaitUntilTaskCompletes(testPassed, false);
366+
await client.Close("done");
367+
353368
}
354369
}
355370
}

0 commit comments

Comments
 (0)