Skip to content

Commit 3b9c9bd

Browse files
committed
Refactor the Reliable part
- Reduce the code to the consumer/producer part - Add a base record for the configuration - Add CreateNewEntity as abstract method - Add tests for known and unknown exception - Document methods Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 0e690e5 commit 3b9c9bd

File tree

5 files changed

+296
-188
lines changed

5 files changed

+296
-188
lines changed

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
abstract RabbitMQ.Stream.Client.Reliable.ReliableBase.Close() -> System.Threading.Tasks.Task
2-
abstract RabbitMQ.Stream.Client.Reliable.ReliableBase.CloseReliable() -> System.Threading.Tasks.Task
3-
abstract RabbitMQ.Stream.Client.Reliable.ReliableBase.GetNewReliable(bool boot) -> System.Threading.Tasks.Task
2+
abstract RabbitMQ.Stream.Client.Reliable.ReliableBase.CloseEntity() -> System.Threading.Tasks.Task
43
const RabbitMQ.Stream.Client.AMQP.DescribedFormatCode.AmqpValue = 119 -> byte
54
const RabbitMQ.Stream.Client.AMQP.DescribedFormatCode.ApplicationData = 117 -> byte
65
const RabbitMQ.Stream.Client.AMQP.DescribedFormatCode.ApplicationProperties = 116 -> byte
@@ -645,12 +644,19 @@ RabbitMQ.Stream.Client.Reliable.MessagesConfirmation.MessagesConfirmation() -> v
645644
RabbitMQ.Stream.Client.Reliable.MessagesConfirmation.PublishingId.get -> ulong
646645
RabbitMQ.Stream.Client.Reliable.MessagesConfirmation.Status.get -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
647646
RabbitMQ.Stream.Client.Reliable.ReliableBase
648-
RabbitMQ.Stream.Client.Reliable.ReliableBase.Init() -> System.Threading.Tasks.Task
647+
RabbitMQ.Stream.Client.Reliable.ReliableBase.Init(RabbitMQ.Stream.Client.Reliable.IReconnectStrategy reconnectStrategy) -> System.Threading.Tasks.Task
649648
RabbitMQ.Stream.Client.Reliable.ReliableBase.IsOpen() -> bool
650649
RabbitMQ.Stream.Client.Reliable.ReliableBase.ReliableBase() -> void
651650
RabbitMQ.Stream.Client.Reliable.ReliableBase.TryToReconnect(RabbitMQ.Stream.Client.Reliable.IReconnectStrategy reconnectStrategy) -> System.Threading.Tasks.Task
652651
RabbitMQ.Stream.Client.Reliable.ReliableBase._inReconnection -> bool
653-
RabbitMQ.Stream.Client.Reliable.ReliableBase._needReconnect -> bool
652+
RabbitMQ.Stream.Client.Reliable.ReliableBase._isOpen -> bool
653+
RabbitMQ.Stream.Client.Reliable.ReliableConfig
654+
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ReconnectStrategy.get -> RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
655+
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ReconnectStrategy.set -> void
656+
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Stream.get -> string
657+
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Stream.set -> void
658+
RabbitMQ.Stream.Client.Reliable.ReliableConfig.StreamSystem.get -> RabbitMQ.Stream.Client.StreamSystem
659+
RabbitMQ.Stream.Client.Reliable.ReliableConfig.StreamSystem.set -> void
654660
RabbitMQ.Stream.Client.Reliable.ReliableConsumer
655661
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig
656662
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.ClientProvidedName.get -> string
@@ -659,14 +665,8 @@ RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.MessageHandler.get -> Sys
659665
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.MessageHandler.set -> void
660666
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.OffsetSpec.get -> RabbitMQ.Stream.Client.IOffsetType
661667
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.OffsetSpec.set -> void
662-
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.ReconnectStrategy.get -> RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
663-
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.ReconnectStrategy.set -> void
664668
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.Reference.get -> string
665669
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.Reference.set -> void
666-
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.Stream.get -> string
667-
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.Stream.set -> void
668-
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.StreamSystem.get -> RabbitMQ.Stream.Client.StreamSystem
669-
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.StreamSystem.set -> void
670670
RabbitMQ.Stream.Client.Reliable.ReliableProducer
671671
RabbitMQ.Stream.Client.Reliable.ReliableProducer.BatchSend(System.Collections.Generic.List<RabbitMQ.Stream.Client.Message> messages) -> System.Threading.Tasks.ValueTask
672672
RabbitMQ.Stream.Client.Reliable.ReliableProducer.Send(RabbitMQ.Stream.Client.Message message) -> System.Threading.Tasks.ValueTask
@@ -678,14 +678,8 @@ RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.ConfirmationHandler.get -
678678
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.ConfirmationHandler.init -> void
679679
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.MaxInFlight.get -> int
680680
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.MaxInFlight.set -> void
681-
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.ReconnectStrategy.get -> RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
682-
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.ReconnectStrategy.set -> void
683681
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.Reference.get -> string
684682
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.Reference.set -> void
685-
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.Stream.get -> string
686-
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.Stream.set -> void
687-
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.StreamSystem.get -> RabbitMQ.Stream.Client.StreamSystem
688-
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.StreamSystem.set -> void
689683
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.TimeoutMessageAfter.get -> System.TimeSpan
690684
RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig.TimeoutMessageAfter.init -> void
691685
RabbitMQ.Stream.Client.ResponseCode

RabbitMQ.Stream.Client/Reliable/ReliableBase.cs

Lines changed: 109 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,48 +9,104 @@
99

1010
namespace RabbitMQ.Stream.Client.Reliable;
1111

12+
public record ReliableConfig
13+
{
14+
public IReconnectStrategy ReconnectStrategy { get; set; } = new BackOffReconnectStrategy();
15+
public StreamSystem StreamSystem { get; set; }
16+
public string Stream { get; set; }
17+
}
18+
1219
/// <summary>
1320
/// Base class for Reliable producer/ consumer
21+
/// With the term Entity we mean a Producer or a Consumer
1422
/// </summary>
1523
public abstract class ReliableBase
1624
{
1725
protected readonly SemaphoreSlim SemaphoreSlim = new(1);
18-
protected bool _needReconnect = true;
26+
27+
protected bool _isOpen;
1928
protected bool _inReconnection;
2029

21-
protected async Task Init()
30+
public async Task Init(IReconnectStrategy reconnectStrategy)
2231
{
23-
await GetNewReliable(true);
32+
await Init(true, reconnectStrategy);
2433
}
2534

26-
// boot is the first time is called.
27-
// used to init the producer/consumer
28-
protected abstract Task GetNewReliable(bool boot);
29-
30-
protected async Task TryToReconnect(IReconnectStrategy reconnectStrategy)
35+
// <summary>
36+
/// Init the reliable client
37+
/// <param name="boot"> If it is the First boot for the reliable P/C </param>
38+
/// <param name="reconnectStrategy">IReconnectStrategy</param>
39+
/// Try to Init the Entity, if it fails, it will try to reconnect
40+
/// only if the exception is a known exception
41+
// </summary>
42+
private async Task Init(bool boot, IReconnectStrategy reconnectStrategy)
3143
{
32-
_inReconnection = true;
44+
var reconnect = false;
45+
await SemaphoreSlim.WaitAsync();
3346
try
3447
{
35-
var reconnect = reconnectStrategy.WhenDisconnected(ToString());
36-
var hasToReconnect = reconnect && _needReconnect;
37-
var addInfo = "Client won't reconnect";
38-
if (hasToReconnect)
48+
_isOpen = true;
49+
await CreateNewEntity(boot);
50+
}
51+
52+
catch (Exception e)
53+
{
54+
reconnect = IsAKnownException(e);
55+
LogException(e);
56+
if (!reconnect)
3957
{
40-
addInfo = "Client will try reconnect";
58+
// We consider the client as closed
59+
// since the exception is raised to the caller
60+
_isOpen = false;
61+
throw;
4162
}
63+
}
64+
finally
65+
{
66+
SemaphoreSlim.Release();
67+
}
4268

43-
LogEventSource.Log.LogInformation(
44-
$"{ToString()} is disconnected. {addInfo}");
69+
if (reconnect)
70+
{
71+
await TryToReconnect(reconnectStrategy);
72+
}
73+
}
4574

46-
if (hasToReconnect)
47-
{
48-
await GetNewReliable(false);
49-
}
50-
else
75+
// <summary>
76+
/// Init the a new Entity (Producer/Consumer)
77+
/// <param name="boot"> If it is the First boot for the reliable P/C </param>
78+
/// Called by Init method
79+
// </summary>
80+
internal abstract Task CreateNewEntity(bool boot);
81+
82+
// <summary>
83+
/// Try to reconnect to the broker
84+
/// Based on the retry strategy
85+
/// <param name="reconnectStrategy"> The Strategy for the reconnection
86+
/// by default it is exponential backoff.
87+
/// It it possible to change implementing the IReconnectStrategy interface </param>
88+
// </summary>
89+
protected async Task TryToReconnect(IReconnectStrategy reconnectStrategy)
90+
{
91+
_inReconnection = true;
92+
try
93+
{
94+
// The reconnected strategy cloud decide to stop the reconnection
95+
// for some reason
96+
// _isOpen==true is when the user specified that the client must stop
97+
98+
switch (reconnectStrategy.WhenDisconnected(ToString()) && _isOpen)
5199
{
52-
_needReconnect = false;
53-
await Close();
100+
case true:
101+
LogEventSource.Log.LogInformation(
102+
$"{ToString()} is disconnected. Client will try reconnect");
103+
await Init(false, reconnectStrategy);
104+
break;
105+
case false:
106+
LogEventSource.Log.LogInformation(
107+
$"{ToString()} is asked to be closed");
108+
await Close();
109+
break;
54110
}
55111
}
56112
finally
@@ -83,7 +139,7 @@ internal async Task HandleMetaDataMaybeReconnect(string stream, StreamSystem sys
83139
// Here we just close the producer connection
84140
// the func TryToReconnect/0 will be called.
85141

86-
await CloseReliable();
142+
await CloseEntity();
87143
}
88144
else
89145
{
@@ -96,23 +152,49 @@ internal async Task HandleMetaDataMaybeReconnect(string stream, StreamSystem sys
96152
}
97153
}
98154

155+
// <summary>
156+
/// IsAKnownException returns true if the exception is a known exception
157+
/// We need it to reconnect when the producer/consumer.
158+
/// - LeaderNotFoundException is a temporary exception
159+
/// It means that the leader is not available and the client can't reconnect.
160+
/// Especially the Producer that needs to know the leader.
161+
/// - SocketException
162+
/// Client is trying to connect in a not ready endpoint.
163+
/// It is usually a temporary situation.
164+
/// - TimeoutException
165+
/// Some call went in timeout. Maybe a temporary DNS problem.
166+
/// In this case we can try to reconnect.
167+
///
168+
/// For the other kind of exception, we just throw back the exception.
169+
//</summary>
99170
internal static bool IsAKnownException(Exception exception)
100171
{
101172
return exception is (SocketException or TimeoutException or LeaderNotFoundException);
102173
}
103174

104-
internal void LogException(Exception exception)
175+
private void LogException(Exception exception)
105176
{
106177
LogEventSource.Log.LogError(IsAKnownException(exception)
107178
? $"Trying to reconnect {ToString()} due of: {exception.Message}"
108179
: $"Error during initialization {ToString()} due of: {exception.Message}");
109180
}
110181

111-
protected abstract Task CloseReliable();
182+
// <summary>
183+
/// ONLY close the current Entity (Producer/Consumer)
184+
/// without closing the Reliable(Producer/Consumer) instance.
185+
/// It happens when the stream change topology, and the entity
186+
/// must be recreated. In the producer case for example when the
187+
/// leader changes.
188+
// </summary>
189+
protected abstract Task CloseEntity();
190+
191+
// <summary>
192+
/// Close the Reliable(Producer/Consumer) instance.
193+
// </summary>
112194
public abstract Task Close();
113195

114196
public bool IsOpen()
115197
{
116-
return _needReconnect;
198+
return _isOpen;
117199
}
118200
}

0 commit comments

Comments
 (0)