Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RabbitMQ.Stream.Client/Consts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ internal static class Consts
internal static readonly TimeSpan MidWait = TimeSpan.FromSeconds(3);
internal static readonly TimeSpan LongWait = TimeSpan.FromSeconds(10);
internal const ushort ConsumerInitialCredits = 2;

internal const byte Version1 = 1;
internal const byte Version2 = 2;
internal const string SubscriptionPropertyFilterPrefix = "filter.";
Expand Down
47 changes: 39 additions & 8 deletions RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,15 @@ public ushort InitialCredits
// It is enabled by default. You can disable it by setting it to null.
// It is recommended to keep it enabled. Disable it only for performance reasons.
public ICrc32 Crc32 { get; set; } = new StreamCrc32();
}

public class ConsumerInfo : Info
{
public string Reference { get; }
public FlowControl FlowControl { get; set; } = new FlowControl();
}

public ConsumerInfo(string stream, string reference, string identifier, List<string> partitions) : base(stream,
public class ConsumerInfo(string stream, string reference, string identifier, List<string> partitions)
: Info(stream,
identifier, partitions)
{
Reference = reference;
}
{
public string Reference { get; } = reference;

public override string ToString()
{
Expand All @@ -93,3 +91,36 @@ public override string ToString()
$"ConsumerInfo(Stream={Stream}, Reference={Reference}, Identifier={Identifier}, Partitions={string.Join(",", partitions)})";
}
}

public enum ConsumerFlowStrategy
{
/// <summary>
/// Request credits before parsing the chunk.
/// Default strategy. The best for performance.
/// </summary>
CreditsBeforeParseChunk,

/// <summary>
/// Request credits after parsing the chunk.
/// It can be useful if the parsing is expensive and you want to avoid requesting credits too early.
/// Useful for slow processing of chunks.
/// </summary>
CreditsAfterParseChunk,

/// <summary>
/// The user manually requests credits with <see cref="RawConsumer.Credits()"/>
/// Everything is done manually, so the user has full control over the flow of the consumer.
/// </summary>
ConsumerCredits
}

/// <summary>
/// FlowControl is used to control the flow of the consumer.
/// See <see cref="ConsumerFlowStrategy"/> for the available strategies.
/// Open for future extensions.
/// </summary>ra
public class FlowControl
{
public ConsumerFlowStrategy Strategy { get; set; } = ConsumerFlowStrategy.CreditsBeforeParseChunk;

}
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/ICrc32.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ public interface ICrc32
/// It is possible to add custom logic to handle the failure, such as logging.
/// The code here should be safe
/// </summary>
Func<IConsumer, ChunkAction> FailAction { get; set; }
Func<IConsumer, ChunkAction> FailAction { get; init; }
}
}
17 changes: 15 additions & 2 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.get -> System.Func<RabbitMQ.Str
RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.set -> void
RabbitMQ.Stream.Client.ConsumerFilter.Values.get -> System.Collections.Generic.List<string>
RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
RabbitMQ.Stream.Client.ConsumerFlowStrategy
RabbitMQ.Stream.Client.ConsumerFlowStrategy.ConsumerCredits = 2 -> RabbitMQ.Stream.Client.ConsumerFlowStrategy
RabbitMQ.Stream.Client.ConsumerFlowStrategy.CreditsAfterParseChunk = 1 -> RabbitMQ.Stream.Client.ConsumerFlowStrategy
RabbitMQ.Stream.Client.ConsumerFlowStrategy.CreditsBeforeParseChunk = 0 -> RabbitMQ.Stream.Client.ConsumerFlowStrategy
RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference, string identifier, System.Collections.Generic.List<string> partitions) -> void
RabbitMQ.Stream.Client.ConsumerInfo.Reference.get -> string
Expand Down Expand Up @@ -137,6 +141,10 @@ RabbitMQ.Stream.Client.EntityCommonConfig.Identifier.get -> string
RabbitMQ.Stream.Client.EntityCommonConfig.Identifier.set -> void
RabbitMQ.Stream.Client.EntityCommonConfig.MetadataHandler.get -> System.Func<RabbitMQ.Stream.Client.MetaDataUpdate, System.Threading.Tasks.Task>
RabbitMQ.Stream.Client.EntityCommonConfig.MetadataHandler.set -> void
RabbitMQ.Stream.Client.FlowControl
RabbitMQ.Stream.Client.FlowControl.FlowControl() -> void
RabbitMQ.Stream.Client.FlowControl.Strategy.get -> RabbitMQ.Stream.Client.ConsumerFlowStrategy
RabbitMQ.Stream.Client.FlowControl.Strategy.set -> void
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.HeartBeatHandler.HeartBeatHandler(System.Func<System.Threading.Tasks.ValueTask<bool>> sendHeartbeatFunc, System.Func<string, string, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CloseResponse>> close, int heartbeat, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.HeartBeatHandler> logger = null) -> void
RabbitMQ.Stream.Client.IClient.ClientId.get -> string
Expand Down Expand Up @@ -173,11 +181,13 @@ RabbitMQ.Stream.Client.ICommandVersions.Command.get -> ushort
RabbitMQ.Stream.Client.ICommandVersions.MaxVersion.get -> ushort
RabbitMQ.Stream.Client.ICommandVersions.MinVersion.get -> ushort
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.FlowControl.get -> RabbitMQ.Stream.Client.FlowControl
RabbitMQ.Stream.Client.IConsumerConfig.FlowControl.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.ICrc32.FailAction.get -> System.Func<RabbitMQ.Stream.Client.IConsumer, RabbitMQ.Stream.Client.ChunkAction>
RabbitMQ.Stream.Client.ICrc32.FailAction.set -> void
RabbitMQ.Stream.Client.ICrc32.FailAction.init -> void
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.Info.Identifier.get -> string
Expand Down Expand Up @@ -228,6 +238,7 @@ RabbitMQ.Stream.Client.PublishFilter.PublishFilter() -> void
RabbitMQ.Stream.Client.PublishFilter.PublishFilter(byte publisherId, System.Collections.Generic.List<(ulong, RabbitMQ.Stream.Client.Message)> messages, System.Func<RabbitMQ.Stream.Client.Message, string> filterValueExtractor, Microsoft.Extensions.Logging.ILogger logger) -> void
RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.RawConsumer.Credits() -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.RawConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.RawConsumerConfig.ConnectionClosedHandler.get -> System.Func<string, System.Threading.Tasks.Task>
RabbitMQ.Stream.Client.RawConsumerConfig.ConnectionClosedHandler.set -> void
Expand Down Expand Up @@ -257,6 +268,8 @@ RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.get -> RabbitMQ.Stream.Clie
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.get -> RabbitMQ.Stream.Client.ConsumerFilter
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.FlowControl.get -> RabbitMQ.Stream.Client.FlowControl
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.FlowControl.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.get -> ushort
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerFactory._consumer -> RabbitMQ.Stream.Client.IConsumer
Expand Down Expand Up @@ -327,7 +340,7 @@ RabbitMQ.Stream.Client.RoutingStrategyType.Hash = 0 -> RabbitMQ.Stream.Client.Ro
RabbitMQ.Stream.Client.RoutingStrategyType.Key = 1 -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.StreamCrc32
RabbitMQ.Stream.Client.StreamCrc32.FailAction.get -> System.Func<RabbitMQ.Stream.Client.IConsumer, RabbitMQ.Stream.Client.ChunkAction>
RabbitMQ.Stream.Client.StreamCrc32.FailAction.set -> void
RabbitMQ.Stream.Client.StreamCrc32.FailAction.init -> void
RabbitMQ.Stream.Client.StreamCrc32.Hash(byte[] data) -> byte[]
RabbitMQ.Stream.Client.StreamCrc32.StreamCrc32() -> void
RabbitMQ.Stream.Client.StreamStats
Expand Down
96 changes: 70 additions & 26 deletions RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ internal void Validate()
case { Values.Count: 0 }:
throw new ArgumentException("Values must be provided when Filter is set");
}

FlowControl ??= new FlowControl();

}

internal bool IsFiltering => ConsumerFilter is { Values.Count: > 0 };
Expand Down Expand Up @@ -177,7 +180,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
ProcessChunks();
}

// if a user specify a custom offset
// if a user specifies a custom offset,
// the _client must filter messages
// and dispatch only the messages starting from the
// user offset.
Expand All @@ -196,17 +199,12 @@ protected override string GetStream()
return _config.Stream;
}

public async Task StoreOffset(ulong offset)
{
await _client.StoreOffset(_config.Reference, _config.Stream, offset).ConfigureAwait(false);
}

////// *********************
// IsPromotedAsActive is needed to understand if the consumer is active or not
// by default is active
// in case of single active consumer can be not active
// it is important to skip the messages in the chunk that
// it is in progress. In this way the promotion will be faster
// it is in progress. In this way, the promotion will be faster
// avoiding to block the consumer handler if the user put some
// long task
private bool IsPromotedAsActive { get; set; }
Expand All @@ -219,14 +217,14 @@ public async Task StoreOffset(ulong offset)

/// <summary>
/// MaybeLockDispatch locks the dispatch of the messages
/// it is needed only when the consumer is single active consumer
/// MaybeLockDispatch is an optimization to avoid to lock the dispatch
/// it is needed only when the consumer is single active consumer.
/// MaybeLockDispatch is an optimization to avoid lock the dispatch
/// when the consumer is not single active consumer
/// </summary>
private async Task MaybeLockDispatch()
{
if (_config.IsSingleActiveConsumer)
await PromotionLock.WaitAsync(Token).ConfigureAwait(false);
await PromotionLock.WaitAsync(TimeSpan.FromSeconds(5), Token).ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -309,9 +307,14 @@ async Task DispatchMessage(Message message, ulong i)
{
if (!Token.IsCancellationRequested)
{
// we need to lock the dispatch only if the consumer is single active consumer
await MaybeLockDispatch().ConfigureAwait(false);
var lockedIsPromotedAsActive = IsPromotedAsActive;
MaybeReleaseLock();

// it is usually active
// it is useful only in single active consumer
if (IsPromotedAsActive)
if (lockedIsPromotedAsActive)
{
if (_status != EntityStatus.Open)
{
Expand Down Expand Up @@ -423,15 +426,7 @@ await _config.MessageHandler(this,
for (ulong z = 0; z < subEntryChunk.NumRecordsInBatch; z++)
{
var message = MessageFromSequence(ref unCompressedData, ref compressOffset);
await MaybeLockDispatch().ConfigureAwait(false);
try
{
await DispatchMessage(message, messageOffset++).ConfigureAwait(false);
}
finally
{
MaybeReleaseLock();
}
await DispatchMessage(message, messageOffset++).ConfigureAwait(false);
}

numRecords -= subEntryChunk.NumRecordsInBatch;
Expand Down Expand Up @@ -479,13 +474,23 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) //
{
if (Token.IsCancellationRequested)
break;
await _client.Credit(EntityId, 1).ConfigureAwait(false);
// Request the credit to the server
if (_config.FlowControl.Strategy ==
ConsumerFlowStrategy.CreditsBeforeParseChunk)
{
// Request the credit before processing the chunk
// this is the default behavior
// it is useful to keep the network busy
// and avoid to wait for the next chunk
await _client.Credit(EntityId, 1)
.ConfigureAwait(false);
}
}
catch (InvalidOperationException)
{
// The client has been closed
// Suppose a scenario where the client is closed and the ProcessChunks task is still running
// we remove the the subscriber from the client and we close the client
// we remove the subscriber from the client and we close the client
// The ProcessChunks task will try to send the credit to the server
// The client will throw an InvalidOperationException
// since the connection is closed
Expand Down Expand Up @@ -514,6 +519,14 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) //
case ChunkAction.TryToProcess:
// That's what happens most of the time, and this is the default action
await ParseChunk(chunk).ConfigureAwait(false);

if (_config.FlowControl.Strategy == ConsumerFlowStrategy.CreditsAfterParseChunk)
{
// it avoids flooding the network with credits
await _client.Credit(EntityId, 1)
.ConfigureAwait(false);
}

break;
default:
throw new ArgumentOutOfRangeException();
Expand Down Expand Up @@ -598,7 +611,7 @@ private async Task Init()
chunkConsumed++;
// Send the chunk to the _chunksBuffer
// in this way the chunks are processed in a separate thread
// this wont' block the socket thread
// this won't block the socket thread
// introduced https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250
if (Token.IsCancellationRequested)
{
Expand Down Expand Up @@ -642,7 +655,7 @@ await _chunksBuffer.Writer.WriteAsync((deliver.Chunk, chunkAction), Token)
{
// The consumer is closing from the user but some chunks are still in the buffer
// simply skip the chunk since the Token.IsCancellationRequested is true
// the catch is needed to avoid to propagate the exception to the socket thread.
// the catch is needed to avoid propagating the exception to the socket thread.
Logger?.LogWarning(
"OperationCanceledException. {EntityInfo} has been closed while consuming messages. " +
"Token.IsCancellationRequested: {IsCancellationRequested}",
Expand Down Expand Up @@ -719,7 +732,7 @@ private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() =>
// at this point the server has removed the consumer from the list
// and the unsubscribe is not needed anymore (ignoreIfClosed = true)
// we call the Close to re-enter to the standard behavior
// ignoreIfClosed is an optimization to avoid to send the unsubscribe
// ignoreIfClosed is an optimization to avoid sending the unsubscribe
_config.Pool.RemoveConsumerEntityFromStream(_client.ClientId, EntityId, _config.Stream);
await Shutdown(_config, true).ConfigureAwait(false);
_config.MetadataHandler?.Invoke(metaDataUpdate);
Expand Down Expand Up @@ -770,7 +783,7 @@ protected override async Task<ResponseCode> DeleteEntityFromTheServer(bool ignor
public override async Task<ResponseCode> Close()
{
// when the consumer is closed we must be sure that the
// the subscription is completed to avoid problems with the connection
// subscription is completed to avoid problems with the connection
// It could happen when the closing is called just after the creation
_completeSubscription.Task.Wait();
return await Shutdown(_config).ConfigureAwait(false);
Expand All @@ -790,5 +803,36 @@ public void Dispose()
}

public ConsumerInfo Info { get; }

public async Task StoreOffset(ulong offset)
{
await _client.StoreOffset(_config.Reference, _config.Stream, offset).ConfigureAwait(false);
}

/// <summary>
/// Request credits from the server.
/// Valid only if the ConsumerFlowStrategy is set to ConsumerFlowStrategy.ConsumerCredits.
/// </summary>
public async Task Credits()
{
await Credits(1).ConfigureAwait(false);
}

private async Task Credits(ushort credits)
{
if (credits < 1)
{
throw new ArgumentException(
$"Credits must be greater than 0");
}

if (_config.FlowControl.Strategy != ConsumerFlowStrategy.ConsumerCredits)
{
throw new InvalidOperationException(
"RequestCredits can be used only with ConsumerFlowStrategy.ManualRequestCredit.");
}

await _client.Credit(EntityId, credits).ConfigureAwait(false);
}
}
}
17 changes: 3 additions & 14 deletions RabbitMQ.Stream.Client/Reliable/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,10 @@ public record ConsumerConfig : ReliableConfig
/// <summary>
/// Enable the check of the crc on the delivery when set to an implementation
/// of <see cref="ICrc32"><code>ICrc32</code></see>.
/// The server will send the crc for each chunk and the client will check it.
/// It is not enabled by default. In some case it is could reduce the performance.
/// ICrc32 is an interface that can be implemented by the user with the desired implementation.
/// The client is tested with the System.IO.Hashing.Crc32 implementation, like:
///<c>
/// private class UserCrc32 : ICrc32
/// {
/// public byte[] Hash(byte[] data)
/// {
/// return System.IO.Hashing.Crc32.Hash(data);
/// }
/// }
/// </c>
/// </summary>
public ICrc32 Crc32 { get; set; } = null;
public ICrc32 Crc32 { get; set; } = new StreamCrc32();

public FlowControl FlowControl { get; set; } = new FlowControl();

public ConsumerConfig(StreamSystem streamSystem, string stream) : base(streamSystem, stream)
{
Expand Down
Loading