From f883fdd891b9dc2af5d2de8b7192071df941af60 Mon Sep 17 00:00:00 2001 From: David Boike Date: Tue, 24 Sep 2024 12:35:34 -0500 Subject: [PATCH] Inline dispatching of individual sends for messages that are too large to fit into a batch (#1049) (#1050) * Yield in FakeSender to expose any assumptions we might be making around things happening on the synchronous path * Fix the problem of not having the individual sends being properly tracked by inlining the dispatching the individual sends. Rename the method for better clarity --------- Co-authored-by: Daniel Marbach Co-authored-by: danielmarbach --- src/Tests/FakeSender.cs | 14 ++++---- src/Transport/Sending/MessageDispatcher.cs | 38 +++++++++++++++------- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/src/Tests/FakeSender.cs b/src/Tests/FakeSender.cs index 65b07241..78e69da5 100644 --- a/src/Tests/FakeSender.cs +++ b/src/Tests/FakeSender.cs @@ -27,27 +27,27 @@ public IReadOnlyCollection this[ServiceBusMessageBatch batch] set => throw new NotSupportedException(); } - public override ValueTask CreateMessageBatchAsync(CancellationToken cancellationToken = default) + public override async ValueTask CreateMessageBatchAsync(CancellationToken cancellationToken = default) { var batchMessageStore = new List(); ServiceBusMessageBatch serviceBusMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(256 * 1024, batchMessageStore, tryAddCallback: TryAdd); batchToBackingStore.Add(serviceBusMessageBatch, batchMessageStore); - return new ValueTask( - serviceBusMessageBatch); + await Task.Yield(); + return serviceBusMessageBatch; } - public override Task SendMessageAsync(ServiceBusMessage message, CancellationToken cancellationToken = default) + public override async Task SendMessageAsync(ServiceBusMessage message, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); sentMessages.Add(message); - return Task.CompletedTask; + await Task.Yield(); } - public override Task SendMessagesAsync(ServiceBusMessageBatch messageBatch, CancellationToken cancellationToken = default) + public override async Task SendMessagesAsync(ServiceBusMessageBatch messageBatch, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); batchedMessages.Add(messageBatch); - return Task.CompletedTask; + await Task.Yield(); } } } \ No newline at end of file diff --git a/src/Transport/Sending/MessageDispatcher.cs b/src/Transport/Sending/MessageDispatcher.cs index ff2abf17..bebaf86d 100644 --- a/src/Transport/Sending/MessageDispatcher.cs +++ b/src/Transport/Sending/MessageDispatcher.cs @@ -120,9 +120,6 @@ void AddBatchedOperationsTo(List dispatchTasks, var operations = destinationAndOperations.Value; var messagesToSend = new Queue(operations.Count); - // We assume the majority of the messages will be batched and only a few will be sent individually - // and therefore it is OK in those rare cases for the list to grow. - var messagesTooLargeToBeBatched = new List(0); foreach (var operation in operations) { var message = operation.Message.ToAzureServiceBusMessage(operation.Properties, azureServiceBusTransportTransaction?.IncomingQueuePartitionKey); @@ -131,20 +128,16 @@ void AddBatchedOperationsTo(List dispatchTasks, } // Accessing azureServiceBusTransaction.CommittableTransaction will initialize it if it isn't yet // doing the access as late as possible but still on the synchronous path. - dispatchTasks.Add(DispatchBatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, azureServiceBusTransportTransaction?.Transaction, messagesToSend, messagesTooLargeToBeBatched, cancellationToken)); - - foreach (var message in messagesTooLargeToBeBatched) - { - dispatchTasks.Add(DispatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, azureServiceBusTransportTransaction?.Transaction, message, cancellationToken)); - } + dispatchTasks.Add(DispatchBatchOrFallbackToIndividualSendsForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, azureServiceBusTransportTransaction?.Transaction, messagesToSend, cancellationToken)); } } - async Task DispatchBatchForDestination(string destination, ServiceBusClient? client, Transaction? transaction, - Queue messagesToSend, List messagesTooLargeToBeBatched, + async Task DispatchBatchOrFallbackToIndividualSendsForDestination(string destination, ServiceBusClient? client, Transaction? transaction, + Queue messagesToSend, CancellationToken cancellationToken) { int batchCount = 0; + List? messagesTooLargeToBeBatched = null; var sender = messageSenderRegistry.GetMessageSender(destination, client); while (messagesToSend.Count > 0) { @@ -176,6 +169,7 @@ async Task DispatchBatchForDestination(string destination, ServiceBusClient? cli dequeueMessage.ApplicationProperties.TryGetValue(Headers.MessageId, out var messageId); Log.Debug($"Message '{messageId ?? dequeueMessage.MessageId}' is too large for the batch '{batchCount}' and will be sent individually to destination {destination}."); } + messagesTooLargeToBeBatched ??= []; messagesTooLargeToBeBatched.Add(dequeueMessage); continue; } @@ -210,6 +204,28 @@ async Task DispatchBatchForDestination(string destination, ServiceBusClient? cli Log.Debug($"Sent batch '{batchCount}' with '{messageBatch.Count}' message ids '{logBuilder!.ToString(0, logBuilder.Length - 1)}' to destination {destination}."); } } + + if (messagesTooLargeToBeBatched is not null) + { + if (Log.IsDebugEnabled) + { + Log.Debug($"Sending '{messagesTooLargeToBeBatched.Count}' that were too large for the batch individually to destination {destination}."); + } + + var individualSendTasks = new List(messagesTooLargeToBeBatched.Count); + foreach (var message in messagesTooLargeToBeBatched) + { + individualSendTasks.Add(DispatchForDestination(destination, client, transaction, message, cancellationToken)); + } + + await Task.WhenAll(individualSendTasks) + .ConfigureAwait(false); + + if (Log.IsDebugEnabled) + { + Log.Debug($"Sent '{messagesTooLargeToBeBatched.Count}' that were too large for the batch individually to destination {destination}."); + } + } } // The parameters of this method are deliberately mutable and of the original collection type to make sure