From 4d2d9f025941886baa3a3ba3ea68f9da848df3cf Mon Sep 17 00:00:00 2001 From: Grover <135006+cdmdotnet@users.noreply.github.com> Date: Tue, 22 Oct 2024 16:41:51 +1300 Subject: [PATCH] When a batch of events or commands fails to send as the batch is too big, will try to send them individually --- .../AzureCommandBusPublisher.cs | 62 ++++++++++++++++++ .../AzureEventBusPublisher.cs | 64 ++++++++++++++++++- .../Cqrs.Azure.ServiceBus.csproj | 1 + 3 files changed, 126 insertions(+), 1 deletion(-) diff --git a/Framework/Azure/Cqrs.Azure.ServiceBus/AzureCommandBusPublisher.cs b/Framework/Azure/Cqrs.Azure.ServiceBus/AzureCommandBusPublisher.cs index fb4758cf8..e2e496de4 100644 --- a/Framework/Azure/Cqrs.Azure.ServiceBus/AzureCommandBusPublisher.cs +++ b/Framework/Azure/Cqrs.Azure.ServiceBus/AzureCommandBusPublisher.cs @@ -537,6 +537,7 @@ await CreateBrokeredMessageAsync try { int count = 1; + bool batchSuccess = true; do { try @@ -553,11 +554,41 @@ await CreateBrokeredMessageAsync Logger.LogDebug("An empty collection of public commands to publish post validation."); break; } +#if NETSTANDARD2_0 || NET48_OR_GREATER + catch (ServiceBusException exception) + { + if (exception.Reason == ServiceBusFailureReason.MessageSizeExceeded) + { + Logger.LogDebug("The size of the event being sent was too large. Will try sending individually", exception: exception, metaData: new Dictionary { { "Command", publicBrokeredMessages } }); + batchSuccess = false; + } + else + throw; + } +#else + catch (QuotaExceededException exception) + { + Logger.LogDebug("The size of the event being sent was too large. Will try sending individually", exception: exception, metaData: new Dictionary { { "Command", publicBrokeredMessages } }); + batchSuccess = false; + } +#endif catch (TimeoutException) { if (count >= TimeoutOnSendRetryMaximumCount) throw; } + if (!batchSuccess) + { + foreach(var publicBrokeredMessage in publicBrokeredMessages) + { +#if NETSTANDARD2_0 || NET48_OR_GREATER + await PublicServiceBusPublisher.SendMessageAsync(publicBrokeredMessage); +#else + PublicServiceBusPublisher.Send(publicBrokeredMessage); +#endif + } + break; + } count++; } while (true); wasSuccessfull = true; @@ -593,6 +624,7 @@ await CreateBrokeredMessageAsync try { int count = 1; + bool batchSuccess = true; do { try @@ -609,11 +641,41 @@ await CreateBrokeredMessageAsync Logger.LogDebug("An empty collection of private commands to publish post validation."); break; } +#if NETSTANDARD2_0 || NET48_OR_GREATER + catch (ServiceBusException exception) + { + if (exception.Reason == ServiceBusFailureReason.MessageSizeExceeded) + { + Logger.LogDebug("The size of the event being sent was too large. Will try sending individually", exception: exception, metaData: new Dictionary { { "Command", publicBrokeredMessages } }); + batchSuccess = false; + } + else + throw; + } +#else + catch (QuotaExceededException exception) + { + Logger.LogDebug("The size of the event being sent was too large. Will try sending individually", exception: exception, metaData: new Dictionary { { "Command", publicBrokeredMessages } }); + batchSuccess = false; + } +#endif catch (TimeoutException) { if (count >= TimeoutOnSendRetryMaximumCount) throw; } + if (!batchSuccess) + { + foreach (var publicBrokeredMessage in publicBrokeredMessages) + { +#if NETSTANDARD2_0 || NET48_OR_GREATER + await PublicServiceBusPublisher.SendMessageAsync(publicBrokeredMessage); +#else + PublicServiceBusPublisher.Send(publicBrokeredMessage); +#endif + } + break; + } count++; } while (true); wasSuccessfull = true; diff --git a/Framework/Azure/Cqrs.Azure.ServiceBus/AzureEventBusPublisher.cs b/Framework/Azure/Cqrs.Azure.ServiceBus/AzureEventBusPublisher.cs index 25ac8800c..8bd5d4b99 100644 --- a/Framework/Azure/Cqrs.Azure.ServiceBus/AzureEventBusPublisher.cs +++ b/Framework/Azure/Cqrs.Azure.ServiceBus/AzureEventBusPublisher.cs @@ -10,7 +10,6 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; -using System.Threading.Tasks; using Chinchilla.Logging; using Cqrs.Authentication; using Cqrs.Bus; @@ -19,6 +18,7 @@ using Cqrs.Messages; #if NETSTANDARD2_0 || NET48_OR_GREATER +using System.Threading.Tasks; using Azure.Messaging.ServiceBus; using BrokeredMessage = Azure.Messaging.ServiceBus.ServiceBusMessage; #else @@ -435,6 +435,7 @@ await CreateBrokeredMessageAsync try { int count = 1; + bool batchSuccess = true; do { try @@ -451,11 +452,41 @@ await CreateBrokeredMessageAsync Logger.LogDebug("An empty collection of public events to publish post validation."); break; } +#if NETSTANDARD2_0 || NET48_OR_GREATER + catch (ServiceBusException exception) + { + if (exception.Reason == ServiceBusFailureReason.MessageSizeExceeded) + { + Logger.LogDebug("The size of the event being sent was too large. Will try sending individually", exception: exception, metaData: new Dictionary { { "Command", publicBrokeredMessages } }); + batchSuccess = false; + } + else + throw; + } +#else + catch (QuotaExceededException exception) + { + Logger.LogDebug("The size of the event being sent was too large. Will try sending individually", exception: exception, metaData: new Dictionary { { "Command", publicBrokeredMessages } }); + batchSuccess = false; + } +#endif catch (TimeoutException) { if (count >= TimeoutOnSendRetryMaximumCount) throw; } + if (!batchSuccess) + { + foreach (var publicBrokeredMessage in publicBrokeredMessages) + { +#if NETSTANDARD2_0 || NET48_OR_GREATER + await PublicServiceBusPublisher.SendMessageAsync(publicBrokeredMessage); +#else + PublicServiceBusPublisher.Send(publicBrokeredMessage); +#endif + } + break; + } count++; } while (true); wasSuccessfull = true; @@ -491,6 +522,7 @@ await CreateBrokeredMessageAsync try { int count = 1; + bool batchSuccess = true; do { try @@ -507,11 +539,41 @@ await CreateBrokeredMessageAsync Logger.LogDebug("An empty collection of private events to publish post validation."); break; } +#if NETSTANDARD2_0 || NET48_OR_GREATER + catch (ServiceBusException exception) + { + if (exception.Reason == ServiceBusFailureReason.MessageSizeExceeded) + { + Logger.LogDebug("The size of the event being sent was too large. Will try sending individually", exception: exception, metaData: new Dictionary { { "Command", publicBrokeredMessages } }); + batchSuccess = false; + } + else + throw; + } +#else + catch (QuotaExceededException exception) + { + Logger.LogDebug("The size of the event being sent was too large. Will try sending individually", exception: exception, metaData: new Dictionary { { "Command", publicBrokeredMessages } }); + batchSuccess = false; + } +#endif catch (TimeoutException) { if (count >= TimeoutOnSendRetryMaximumCount) throw; } + if (!batchSuccess) + { + foreach (var publicBrokeredMessage in publicBrokeredMessages) + { +#if NETSTANDARD2_0 || NET48_OR_GREATER + await PublicServiceBusPublisher.SendMessageAsync(publicBrokeredMessage); +#else + PublicServiceBusPublisher.Send(publicBrokeredMessage); +#endif + } + break; + } count++; } while (true); wasSuccessfull = true; diff --git a/Framework/Azure/Cqrs.Azure.ServiceBus/Cqrs.Azure.ServiceBus.csproj b/Framework/Azure/Cqrs.Azure.ServiceBus/Cqrs.Azure.ServiceBus.csproj index be5d79739..2f0ba7b8d 100644 --- a/Framework/Azure/Cqrs.Azure.ServiceBus/Cqrs.Azure.ServiceBus.csproj +++ b/Framework/Azure/Cqrs.Azure.ServiceBus/Cqrs.Azure.ServiceBus.csproj @@ -15,6 +15,7 @@ * Refactored .NET Standard 2.0 to be far now async/await able... means some breaking changes as method names and return types change. * Added the ability to delay sending message publishing * Introduced setting the SessionID (https://learn.microsoft.com/en-us/azure/service-bus-messaging/message-sessions) if IMessageWithOrderingKey is implemented on a message + * When a batch of events or commands fails to send as the batch is too big, will try to send them individually Version 5.0