Skip to content

Commit 0e690e5

Browse files
committed
Make reconnection async
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 213ca9c commit 0e690e5

File tree

3 files changed

+20
-4
lines changed

3 files changed

+20
-4
lines changed

RabbitMQ.Stream.Client/Reliable/ReliableBase.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ internal async Task HandleMetaDataMaybeReconnect(string stream, StreamSystem sys
8282
$" Client: {ToString()}");
8383
// Here we just close the producer connection
8484
// the func TryToReconnect/0 will be called.
85+
8586
await CloseReliable();
8687
}
8788
else
@@ -100,6 +101,13 @@ internal static bool IsAKnownException(Exception exception)
100101
return exception is (SocketException or TimeoutException or LeaderNotFoundException);
101102
}
102103

104+
internal void LogException(Exception exception)
105+
{
106+
LogEventSource.Log.LogError(IsAKnownException(exception)
107+
? $"Trying to reconnect {ToString()} due of: {exception.Message}"
108+
: $"Error during initialization {ToString()} due of: {exception.Message}");
109+
}
110+
103111
protected abstract Task CloseReliable();
104112
public abstract Task Close();
105113

RabbitMQ.Stream.Client/Reliable/ReliableConsumer.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,11 @@ protected override async Task GetNewReliable(bool boot)
7676
},
7777
MetadataHandler = update =>
7878
{
79-
HandleMetaDataMaybeReconnect(update.Stream,
80-
_reliableConsumerConfig.StreamSystem).Wait();
79+
Task.Run(() =>
80+
{
81+
HandleMetaDataMaybeReconnect(update.Stream,
82+
_reliableConsumerConfig.StreamSystem).WaitAsync(CancellationToken.None);
83+
});
8184
},
8285
MessageHandler = async (consumer, ctx, message) =>
8386
{
@@ -95,6 +98,7 @@ protected override async Task GetNewReliable(bool boot)
9598
catch (Exception e)
9699
{
97100
reconnect = IsAKnownException(e);
101+
LogException(e);
98102
if (!reconnect)
99103
{
100104
throw;

RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,11 @@ protected override async Task GetNewReliable(bool boot)
9090
MaxInFlight = _reliableProducerConfig.MaxInFlight,
9191
MetadataHandler = update =>
9292
{
93-
HandleMetaDataMaybeReconnect(update.Stream,
94-
_reliableProducerConfig.StreamSystem).Wait();
93+
Task.Run(() =>
94+
{
95+
HandleMetaDataMaybeReconnect(update.Stream,
96+
_reliableProducerConfig.StreamSystem).WaitAsync(CancellationToken.None);
97+
});
9598
},
9699
ConnectionClosedHandler = async _ =>
97100
{
@@ -130,6 +133,7 @@ protected override async Task GetNewReliable(bool boot)
130133
catch (Exception e)
131134
{
132135
reconnect = IsAKnownException(e);
136+
LogException(e);
133137
if (!reconnect)
134138
{
135139
throw;

0 commit comments

Comments
 (0)