Skip to content

Commit

Permalink
Inline dispatching of individual sends for messages that are too larg…
Browse files Browse the repository at this point in the history
…e to fit into a batch (#1049) (#1050)

* Yield in FakeSender to expose any assumptions we might be making around things happening on the synchronous path

* Fix the problem of not having the individual sends being properly tracked by inlining the dispatching the individual sends.
Rename the method for better clarity

---------

Co-authored-by: Daniel Marbach <daniel.marbach@openplace.net>
Co-authored-by: danielmarbach <danielmarbach@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 24, 2024
1 parent b39dc6b commit f883fdd
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 18 deletions.
14 changes: 7 additions & 7 deletions src/Tests/FakeSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,27 @@ public IReadOnlyCollection<ServiceBusMessage> this[ServiceBusMessageBatch batch]
set => throw new NotSupportedException();
}

public override ValueTask<ServiceBusMessageBatch> CreateMessageBatchAsync(CancellationToken cancellationToken = default)
public override async ValueTask<ServiceBusMessageBatch> CreateMessageBatchAsync(CancellationToken cancellationToken = default)
{
var batchMessageStore = new List<ServiceBusMessage>();
ServiceBusMessageBatch serviceBusMessageBatch = ServiceBusModelFactory.ServiceBusMessageBatch(256 * 1024, batchMessageStore, tryAddCallback: TryAdd);
batchToBackingStore.Add(serviceBusMessageBatch, batchMessageStore);
return new ValueTask<ServiceBusMessageBatch>(
serviceBusMessageBatch);
await Task.Yield();
return serviceBusMessageBatch;
}

public override Task SendMessageAsync(ServiceBusMessage message, CancellationToken cancellationToken = default)
public override async Task SendMessageAsync(ServiceBusMessage message, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
sentMessages.Add(message);
return Task.CompletedTask;
await Task.Yield();
}

public override Task SendMessagesAsync(ServiceBusMessageBatch messageBatch, CancellationToken cancellationToken = default)
public override async Task SendMessagesAsync(ServiceBusMessageBatch messageBatch, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
batchedMessages.Add(messageBatch);
return Task.CompletedTask;
await Task.Yield();
}
}
}
38 changes: 27 additions & 11 deletions src/Transport/Sending/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,6 @@ void AddBatchedOperationsTo(List<Task> dispatchTasks,
var operations = destinationAndOperations.Value;

var messagesToSend = new Queue<ServiceBusMessage>(operations.Count);
// We assume the majority of the messages will be batched and only a few will be sent individually
// and therefore it is OK in those rare cases for the list to grow.
var messagesTooLargeToBeBatched = new List<ServiceBusMessage>(0);
foreach (var operation in operations)
{
var message = operation.Message.ToAzureServiceBusMessage(operation.Properties, azureServiceBusTransportTransaction?.IncomingQueuePartitionKey);
Expand All @@ -131,20 +128,16 @@ void AddBatchedOperationsTo(List<Task> dispatchTasks,
}
// Accessing azureServiceBusTransaction.CommittableTransaction will initialize it if it isn't yet
// doing the access as late as possible but still on the synchronous path.
dispatchTasks.Add(DispatchBatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, azureServiceBusTransportTransaction?.Transaction, messagesToSend, messagesTooLargeToBeBatched, cancellationToken));

foreach (var message in messagesTooLargeToBeBatched)
{
dispatchTasks.Add(DispatchForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, azureServiceBusTransportTransaction?.Transaction, message, cancellationToken));
}
dispatchTasks.Add(DispatchBatchOrFallbackToIndividualSendsForDestination(destination, azureServiceBusTransportTransaction?.ServiceBusClient, azureServiceBusTransportTransaction?.Transaction, messagesToSend, cancellationToken));
}
}

async Task DispatchBatchForDestination(string destination, ServiceBusClient? client, Transaction? transaction,
Queue<ServiceBusMessage> messagesToSend, List<ServiceBusMessage> messagesTooLargeToBeBatched,
async Task DispatchBatchOrFallbackToIndividualSendsForDestination(string destination, ServiceBusClient? client, Transaction? transaction,
Queue<ServiceBusMessage> messagesToSend,
CancellationToken cancellationToken)
{
int batchCount = 0;
List<ServiceBusMessage>? messagesTooLargeToBeBatched = null;
var sender = messageSenderRegistry.GetMessageSender(destination, client);
while (messagesToSend.Count > 0)
{
Expand Down Expand Up @@ -176,6 +169,7 @@ async Task DispatchBatchForDestination(string destination, ServiceBusClient? cli
dequeueMessage.ApplicationProperties.TryGetValue(Headers.MessageId, out var messageId);
Log.Debug($"Message '{messageId ?? dequeueMessage.MessageId}' is too large for the batch '{batchCount}' and will be sent individually to destination {destination}.");
}
messagesTooLargeToBeBatched ??= [];
messagesTooLargeToBeBatched.Add(dequeueMessage);
continue;
}
Expand Down Expand Up @@ -210,6 +204,28 @@ async Task DispatchBatchForDestination(string destination, ServiceBusClient? cli
Log.Debug($"Sent batch '{batchCount}' with '{messageBatch.Count}' message ids '{logBuilder!.ToString(0, logBuilder.Length - 1)}' to destination {destination}.");
}
}

if (messagesTooLargeToBeBatched is not null)
{
if (Log.IsDebugEnabled)
{
Log.Debug($"Sending '{messagesTooLargeToBeBatched.Count}' that were too large for the batch individually to destination {destination}.");
}

var individualSendTasks = new List<Task>(messagesTooLargeToBeBatched.Count);
foreach (var message in messagesTooLargeToBeBatched)
{
individualSendTasks.Add(DispatchForDestination(destination, client, transaction, message, cancellationToken));
}

await Task.WhenAll(individualSendTasks)
.ConfigureAwait(false);

if (Log.IsDebugEnabled)
{
Log.Debug($"Sent '{messagesTooLargeToBeBatched.Count}' that were too large for the batch individually to destination {destination}.");
}
}
}

// The parameters of this method are deliberately mutable and of the original collection type to make sure
Expand Down

0 comments on commit f883fdd

Please sign in to comment.