Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,8 @@ By default Reliable Producer/Consumer uses an `BackOffReconnectStrategy` to reco
You can customize the behaviour implementing the `IReconnectStrategy` interface:

```csharp
bool WhenDisconnected(string connectionInfo);
void WhenConnected(string connectionInfo);
ValueTask<bool> WhenDisconnected(string connectionInfo);
ValueTask WhenConnected(string connectionInfo);
```

If `WhenDisconnected` return is `true` Producer/Consumer will be reconnected else closed.
Expand Down
8 changes: 4 additions & 4 deletions RabbitMQ.Stream.Client/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ await config.MessageHandler(this,
throw new CreateConsumerException($"consumer could not be created code: {response.ResponseCode}");
}

public Task<ResponseCode> Close()
public async Task<ResponseCode> Close()
{
if (client.IsClosed)
{
return Task.FromResult(ResponseCode.Ok);
return ResponseCode.Ok;
}

var result = ResponseCode.Ok;
Expand All @@ -132,7 +132,7 @@ public Task<ResponseCode> Close()
// in this case we reduce the waiting time
// the consumer could be removed because of stream deleted
// so it is not necessary to wait.
deleteConsumerResponseTask.Wait(TimeSpan.FromSeconds(3));
await deleteConsumerResponseTask.WaitAsync(TimeSpan.FromSeconds(3));
if (deleteConsumerResponseTask.IsCompletedSuccessfully)
{
result = deleteConsumerResponseTask.Result.ResponseCode;
Expand All @@ -146,7 +146,7 @@ public Task<ResponseCode> Close()
var closed = client.MaybeClose($"client-close-subscriber: {subscriberId}");
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"client-close-subscriber: {subscriberId}");
_disposed = true;
return Task.FromResult(result);
return result;
}

//
Expand Down
67 changes: 41 additions & 26 deletions RabbitMQ.Stream.Client/HeartBeatHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@ public class HeartBeatHandler
private DateTime _lastUpdate = DateTime.Now;
private uint _missedHeartbeat;

private readonly Func<ValueTask<bool>> _sendHeartbeatFunc;
private readonly Func<string, Task<CloseResponse>> _close;
private readonly int _heartbeat;

public HeartBeatHandler(Func<ValueTask<bool>> sendHeartbeatFunc,
Func<string, Task<CloseResponse>> close,
int heartbeat)
{
_sendHeartbeatFunc = sendHeartbeatFunc;
_close = close;
_heartbeat = heartbeat;

// the heartbeat is disabled when zero
// so all the timer won't be enabled

Expand All @@ -32,35 +40,42 @@ public HeartBeatHandler(Func<ValueTask<bool>> sendHeartbeatFunc,
{
_timer.Enabled = false;
_timer.Interval = heartbeat * 1000;
_timer.Elapsed += (_, _) =>
{
var f = sendHeartbeatFunc();
f.AsTask().Wait(1000);

var seconds = (DateTime.Now - _lastUpdate).TotalSeconds;
if (seconds < heartbeat)
{
return;
}

// missed the Heartbeat
Interlocked.Increment(ref _missedHeartbeat);
LogEventSource.Log.LogWarning($"Heartbeat missed: {_missedHeartbeat}");
if (_missedHeartbeat <= 3)
{
return;
}

// When the client does not receive the Heartbeat for three times the
// client will be closed
LogEventSource.Log.LogError($"Too many Heartbeat missed: {_missedHeartbeat}");
Close();
close($"Too many Heartbeat missed: {_missedHeartbeat}. " +
$"Client connection will be closed");
};
_timer.Elapsed += TimerElapsed;
}
}

private void TimerElapsed(object sender, System.Timers.ElapsedEventArgs e)
{
_ = PerformHeartBeatAsync();
}

private async Task PerformHeartBeatAsync()
{
var f = _sendHeartbeatFunc();
await f.AsTask().WaitAsync(TimeSpan.FromMilliseconds(1000));

var seconds = (DateTime.Now - _lastUpdate).TotalSeconds;
if (seconds < _heartbeat)
{
return;
}

// missed the Heartbeat
Interlocked.Increment(ref _missedHeartbeat);
LogEventSource.Log.LogWarning($"Heartbeat missed: {_missedHeartbeat}");
if (_missedHeartbeat <= 3)
{
return;
}

// When the client does not receive the Heartbeat for three times the
// client will be closed
LogEventSource.Log.LogError($"Too many Heartbeat missed: {_missedHeartbeat}");
Close();
await _close($"Too many Heartbeat missed: {_missedHeartbeat}. " +
$"Client connection will be closed");
}

internal void UpdateHeartBeat()
{
_lastUpdate = DateTime.Now;
Expand Down
10 changes: 5 additions & 5 deletions RabbitMQ.Stream.Client/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ internal void PreValidateBatch(List<(ulong, Message)> messages)

private async Task SemaphoreWait()
{
if (!semaphore.Wait(0) && !client.IsClosed)
if (!await semaphore.WaitAsync(0) && !client.IsClosed)
{
// Nope, we have maxed our In-Flight messages, let's asynchronously wait for confirms
if (!await semaphore.WaitAsync(TimeSpan.FromSeconds(1)).ConfigureAwait(false))
Expand Down Expand Up @@ -249,11 +249,11 @@ private async Task ProcessBuffer()
}
}

public Task<ResponseCode> Close()
public async Task<ResponseCode> Close()
{
if (client.IsClosed)
{
return Task.FromResult(ResponseCode.Ok);
return ResponseCode.Ok;
}

var result = ResponseCode.Ok;
Expand All @@ -264,7 +264,7 @@ public Task<ResponseCode> Close()
// in this case we reduce the waiting time
// the producer could be removed because of stream deleted
// so it is not necessary to wait.
deletePublisherResponseTask.Wait(TimeSpan.FromSeconds(3));
await deletePublisherResponseTask.WaitAsync(TimeSpan.FromSeconds(3));
if (deletePublisherResponseTask.IsCompletedSuccessfully)
{
result = deletePublisherResponseTask.Result.ResponseCode;
Expand All @@ -277,7 +277,7 @@ public Task<ResponseCode> Close()

var closed = client.MaybeClose($"client-close-publisher: {publisherId}");
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"client-close-publisher: {publisherId}");
return Task.FromResult(result);
return result;
}

public static async Task<Producer> Create(ClientParameters clientParameters,
Expand Down
8 changes: 4 additions & 4 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ RabbitMQ.Stream.Client.IOffsetType.OffsetType.get -> RabbitMQ.Stream.Client.Offs
RabbitMQ.Stream.Client.IOffsetType.Size.get -> int
RabbitMQ.Stream.Client.IOffsetType.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.IRouting
RabbitMQ.Stream.Client.IRouting.CreateClient(RabbitMQ.Stream.Client.ClientParameters clientParameters) -> RabbitMQ.Stream.Client.IClient
RabbitMQ.Stream.Client.IRouting.CreateClient(RabbitMQ.Stream.Client.ClientParameters clientParameters) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
RabbitMQ.Stream.Client.IRouting.ValidateDns.get -> bool
RabbitMQ.Stream.Client.IRouting.ValidateDns.set -> void
RabbitMQ.Stream.Client.Keywords
Expand Down Expand Up @@ -633,8 +633,8 @@ RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.StreamNotAvailable = 6 -> Rab
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.UndefinedError = 200 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.WaitForConfirmation = 0 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy.WhenConnected(string connectionInfo) -> void
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy.WhenDisconnected(string connectionInfo) -> bool
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy.WhenConnected(string connectionInfo) -> System.Threading.Tasks.ValueTask
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy.WhenDisconnected(string connectionInfo) -> System.Threading.Tasks.ValueTask<bool>
RabbitMQ.Stream.Client.Reliable.MessagesConfirmation
RabbitMQ.Stream.Client.Reliable.MessagesConfirmation.InsertDateTime.get -> System.DateTime
RabbitMQ.Stream.Client.Reliable.MessagesConfirmation.InsertDateTime.init -> void
Expand Down Expand Up @@ -707,7 +707,7 @@ RabbitMQ.Stream.Client.ResponseCode.SubscriptionIdDoesNotExist = 4 -> RabbitMQ.S
RabbitMQ.Stream.Client.ResponseCode.UnknownFrame = 13 -> RabbitMQ.Stream.Client.ResponseCode
RabbitMQ.Stream.Client.ResponseCode.VirtualHostAccessFailure = 12 -> RabbitMQ.Stream.Client.ResponseCode
RabbitMQ.Stream.Client.Routing
RabbitMQ.Stream.Client.Routing.CreateClient(RabbitMQ.Stream.Client.ClientParameters clientParameters) -> RabbitMQ.Stream.Client.IClient
RabbitMQ.Stream.Client.Routing.CreateClient(RabbitMQ.Stream.Client.ClientParameters clientParameters) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
RabbitMQ.Stream.Client.Routing.Routing() -> void
RabbitMQ.Stream.Client.Routing.ValidateDns.get -> bool
RabbitMQ.Stream.Client.Routing.ValidateDns.set -> void
Expand Down
13 changes: 7 additions & 6 deletions RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Copyright (c) 2007-2020 VMware, Inc.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQ.Stream.Client.Reliable;

Expand All @@ -18,13 +18,13 @@ public interface IReconnectStrategy
/// </summary>
/// <param name="connectionInfo">Additional connection info. Just for logging</param>
/// <returns>if True the client will be reconnected else closed</returns>
bool WhenDisconnected(string connectionInfo);
ValueTask<bool> WhenDisconnected(string connectionInfo);

/// <summary>
/// It is raised when the TCP client is connected successfully
/// </summary>
/// <param name="connectionInfo">Additional connection info. Just for logging</param>
void WhenConnected(string connectionInfo);
ValueTask WhenConnected(string connectionInfo);
}

/// <summary>
Expand All @@ -36,17 +36,18 @@ internal class BackOffReconnectStrategy : IReconnectStrategy
{
private int Tentatives { get; set; } = 1;

public bool WhenDisconnected(string connectionInfo)
public async ValueTask<bool> WhenDisconnected(string connectionInfo)
{
Tentatives <<= 1;
LogEventSource.Log.LogInformation(
$"{connectionInfo} disconnected, check if reconnection needed in {Tentatives * 100} ms.");
Thread.Sleep(TimeSpan.FromMilliseconds(Tentatives * 100));
await Task.Delay(TimeSpan.FromMilliseconds(Tentatives * 100));
return true;
}

public void WhenConnected(string _)
public ValueTask WhenConnected(string connectionInfo)
{
Tentatives = 1;
return ValueTask.CompletedTask;
}
}
4 changes: 2 additions & 2 deletions RabbitMQ.Stream.Client/Reliable/ReliableBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ protected async Task TryToReconnect(IReconnectStrategy reconnectStrategy)
_inReconnection = true;
try
{
var reconnect = reconnectStrategy.WhenDisconnected(ToString());
var reconnect = await reconnectStrategy.WhenDisconnected(ToString());
var hasToReconnect = reconnect && _needReconnect;
var addInfo = "Client won't reconnect";
if (hasToReconnect)
Expand Down Expand Up @@ -72,7 +72,7 @@ internal async Task HandleMetaDataMaybeReconnect(string stream, StreamSystem sys
{
// This sleep is needed. When a stream is deleted it takes sometime.
// The StreamExists/1 could return true even the stream doesn't exist anymore.
Thread.Sleep(500);
await Task.Delay(500);
if (await system.StreamExists(stream))
{
LogEventSource.Log.LogInformation(
Expand Down
5 changes: 2 additions & 3 deletions RabbitMQ.Stream.Client/Reliable/ReliableConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ protected override async Task GetNewReliable(bool boot)
},
MetadataHandler = update =>
{
HandleMetaDataMaybeReconnect(update.Stream,
_reliableConsumerConfig.StreamSystem).Wait();
_ = HandleMetaDataMaybeReconnect(update.Stream, _reliableConsumerConfig.StreamSystem);
},
MessageHandler = async (consumer, ctx, message) =>
{
Expand All @@ -88,7 +87,7 @@ protected override async Task GetNewReliable(bool boot)
}
}
});
_reliableConsumerConfig.ReconnectStrategy.WhenConnected(ToString());
await _reliableConsumerConfig.ReconnectStrategy.WhenConnected(ToString());
}

catch (Exception e)
Expand Down
7 changes: 4 additions & 3 deletions RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ protected override async Task GetNewReliable(bool boot)
MaxInFlight = _reliableProducerConfig.MaxInFlight,
MetadataHandler = update =>
{
HandleMetaDataMaybeReconnect(update.Stream,
_reliableProducerConfig.StreamSystem).Wait();
// intentionally fire & forget
_ = HandleMetaDataMaybeReconnect(update.Stream,
_reliableProducerConfig.StreamSystem);
},
ConnectionClosedHandler = async _ =>
{
Expand All @@ -110,7 +111,7 @@ protected override async Task GetNewReliable(bool boot)
confirmationStatus);
}
});
_reliableProducerConfig.ReconnectStrategy.WhenConnected(ToString());
await _reliableProducerConfig.ReconnectStrategy.WhenConnected(ToString());
if (boot)
{
// Init the publishing id
Expand Down
20 changes: 8 additions & 12 deletions RabbitMQ.Stream.Client/RoutingClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,23 @@
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQ.Stream.Client
{
public interface IRouting
{
IClient CreateClient(ClientParameters clientParameters);
Task<IClient> CreateClient(ClientParameters clientParameters);
bool ValidateDns { get; set; }
}

public class Routing : IRouting
{
public bool ValidateDns { get; set; } = true;

public IClient CreateClient(ClientParameters clientParameters)
public async Task<IClient> CreateClient(ClientParameters clientParameters)
{
var taskClient = Client.Create(clientParameters);
taskClient.Wait(TimeSpan.FromSeconds(1));
return taskClient.Result;
return await Client.Create(clientParameters);
}
}

Expand Down Expand Up @@ -67,14 +64,14 @@ int maxAttempts
// In this case we just return the node (leader for producer, random for consumer)
// since there is not load balancer configuration

return routing.CreateClient(clientParameters with { Endpoint = endPointNoLb });
return await routing.CreateClient(clientParameters with { Endpoint = endPointNoLb });
}

// here it means that there is a AddressResolver configuration
// so there is a load-balancer or proxy we need to get the right connection
// as first we try with the first node given from the LB
var endPoint = clientParameters.AddressResolver.EndPoint;
var client = routing.CreateClient(clientParameters with { Endpoint = endPoint });
var client = await routing.CreateClient(clientParameters with { Endpoint = endPoint });

var advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host");
var advertisedPort = GetPropertyValue(client.ConnectionProperties, "advertised_port");
Expand All @@ -85,7 +82,7 @@ int maxAttempts
attemptNo++;
await client.Close("advertised_host or advertised_port doesn't match");

client = routing.CreateClient(clientParameters with { Endpoint = endPoint });
client = await routing.CreateClient(clientParameters with { Endpoint = endPoint });

advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host");
advertisedPort = GetPropertyValue(client.ConnectionProperties, "advertised_port");
Expand All @@ -95,7 +92,7 @@ int maxAttempts
$"Could not find broker ({broker.Host}:{broker.Port}) after {maxAttempts} attempts");
}

Thread.Sleep(TimeSpan.FromMilliseconds(200));
await Task.Delay(TimeSpan.FromMilliseconds(200));
}

return client;
Expand Down Expand Up @@ -142,8 +139,7 @@ public static async Task<IClient> LookupRandomConnection(ClientParameters client
{
var brokers = new List<Broker>() { metaDataInfo.Leader };
brokers.AddRange(metaDataInfo.Replicas);
var rnd = new Random();
brokers.Sort((_, _) => rnd.Next(-1, 1));
brokers.Sort((_, _) => Random.Shared.Next(-1, 1));
var exceptions = new List<Exception>();
foreach (var broker in brokers)
{
Expand Down
7 changes: 4 additions & 3 deletions Tests/ReliableTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -448,14 +448,15 @@ public MyReconnection(ITestOutputHelper testOutputHelper)
_testOutputHelper = testOutputHelper;
}

public bool WhenDisconnected(string info)
ValueTask<bool> IReconnectStrategy.WhenDisconnected(string info)
{
_testOutputHelper.WriteLine($"MyReconnection WhenDisconnected {info}");
return false;
return ValueTask.FromResult(false);
}

public void WhenConnected(string _)
ValueTask IReconnectStrategy.WhenConnected(string connectionInfo)
{
return ValueTask.CompletedTask;
}
}

Expand Down
Loading