Skip to content

Commit

Permalink
When a batch of events or commands fails to send as the batch is too …
Browse files Browse the repository at this point in the history
…big, will try to send them individually
  • Loading branch information
cdmdotnet committed Oct 22, 2024
1 parent bdbf8cc commit 8864c80
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 1 deletion.
62 changes: 62 additions & 0 deletions Framework/Azure/Cqrs.Azure.ServiceBus/AzureCommandBusPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ await CreateBrokeredMessageAsync
try
{
int count = 1;
bool batchSuccess = true;
do
{
try
Expand All @@ -553,11 +554,41 @@ await CreateBrokeredMessageAsync
Logger.LogDebug("An empty collection of public commands to publish post validation.");
break;
}
#if NETSTANDARD2_0 || NET48_OR_GREATER
catch (ServiceBusException exception)
{
if (exception.Reason == ServiceBusFailureReason.MessageSizeExceeded)
{
Logger.LogDebug("The size of the event being sent was too large. Will try sending individually", exception: exception, metaData: new Dictionary<string, object> { { "Command", publicBrokeredMessages } });
batchSuccess = false;
}
else
throw;
}
#else
catch (QuotaExceededException exception)
{
Logger.LogDebug("The size of the event being sent was too large. Will try sending individually", exception: exception, metaData: new Dictionary<string, object> { { "Command", publicBrokeredMessages } });
batchSuccess = false;
}
#endif
catch (TimeoutException)
{
if (count >= TimeoutOnSendRetryMaximumCount)
throw;
}
if (!batchSuccess)
{
foreach(var publicBrokeredMessage in publicBrokeredMessages)
{
#if NETSTANDARD2_0 || NET48_OR_GREATER
await PublicServiceBusPublisher.SendMessageAsync(publicBrokeredMessage);
#else
PublicServiceBusPublisher.Send(publicBrokeredMessage);
#endif
}
break;
}
count++;
} while (true);
wasSuccessfull = true;
Expand Down Expand Up @@ -593,6 +624,7 @@ await CreateBrokeredMessageAsync
try
{
int count = 1;
bool batchSuccess = true;
do
{
try
Expand All @@ -609,11 +641,41 @@ await CreateBrokeredMessageAsync
Logger.LogDebug("An empty collection of private commands to publish post validation.");
break;
}
#if NETSTANDARD2_0 || NET48_OR_GREATER
catch (ServiceBusException exception)
{
if (exception.Reason == ServiceBusFailureReason.MessageSizeExceeded)
{
Logger.LogDebug("The size of the event being sent was too large. Will try sending individually", exception: exception, metaData: new Dictionary<string, object> { { "Command", publicBrokeredMessages } });
batchSuccess = false;
}
else
throw;
}
#else
catch (QuotaExceededException exception)
{
Logger.LogDebug("The size of the event being sent was too large. Will try sending individually", exception: exception, metaData: new Dictionary<string, object> { { "Command", publicBrokeredMessages } });
batchSuccess = false;
}
#endif
catch (TimeoutException)
{
if (count >= TimeoutOnSendRetryMaximumCount)
throw;
}
if (!batchSuccess)
{
foreach (var publicBrokeredMessage in publicBrokeredMessages)
{
#if NETSTANDARD2_0 || NET48_OR_GREATER
await PublicServiceBusPublisher.SendMessageAsync(publicBrokeredMessage);
#else
PublicServiceBusPublisher.Send(publicBrokeredMessage);
#endif
}
break;
}
count++;
} while (true);
wasSuccessfull = true;
Expand Down
64 changes: 63 additions & 1 deletion Framework/Azure/Cqrs.Azure.ServiceBus/AzureEventBusPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Chinchilla.Logging;
using Cqrs.Authentication;
using Cqrs.Bus;
Expand All @@ -19,6 +18,7 @@
using Cqrs.Messages;

#if NETSTANDARD2_0 || NET48_OR_GREATER
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using BrokeredMessage = Azure.Messaging.ServiceBus.ServiceBusMessage;
#else
Expand Down Expand Up @@ -435,6 +435,7 @@ await CreateBrokeredMessageAsync
try
{
int count = 1;
bool batchSuccess = true;
do
{
try
Expand All @@ -451,11 +452,41 @@ await CreateBrokeredMessageAsync
Logger.LogDebug("An empty collection of public events to publish post validation.");
break;
}
#if NETSTANDARD2_0 || NET48_OR_GREATER
catch (ServiceBusException exception)
{
if (exception.Reason == ServiceBusFailureReason.MessageSizeExceeded)
{
Logger.LogDebug("The size of the event being sent was too large. Will try sending individually", exception: exception, metaData: new Dictionary<string, object> { { "Command", publicBrokeredMessages } });
batchSuccess = false;
}
else
throw;
}
#else
catch (QuotaExceededException exception)
{
Logger.LogDebug("The size of the event being sent was too large. Will try sending individually", exception: exception, metaData: new Dictionary<string, object> { { "Command", publicBrokeredMessages } });
batchSuccess = false;
}
#endif
catch (TimeoutException)
{
if (count >= TimeoutOnSendRetryMaximumCount)
throw;
}
if (!batchSuccess)
{
foreach (var publicBrokeredMessage in publicBrokeredMessages)
{
#if NETSTANDARD2_0 || NET48_OR_GREATER
await PublicServiceBusPublisher.SendMessageAsync(publicBrokeredMessage);
#else
PublicServiceBusPublisher.Send(publicBrokeredMessage);
#endif
}
break;
}
count++;
} while (true);
wasSuccessfull = true;
Expand Down Expand Up @@ -491,6 +522,7 @@ await CreateBrokeredMessageAsync
try
{
int count = 1;
bool batchSuccess = true;
do
{
try
Expand All @@ -507,11 +539,41 @@ await CreateBrokeredMessageAsync
Logger.LogDebug("An empty collection of private events to publish post validation.");
break;
}
#if NETSTANDARD2_0 || NET48_OR_GREATER
catch (ServiceBusException exception)
{
if (exception.Reason == ServiceBusFailureReason.MessageSizeExceeded)
{
Logger.LogDebug("The size of the event being sent was too large. Will try sending individually", exception: exception, metaData: new Dictionary<string, object> { { "Command", publicBrokeredMessages } });
batchSuccess = false;
}
else
throw;
}
#else
catch (QuotaExceededException exception)
{
Logger.LogDebug("The size of the event being sent was too large. Will try sending individually", exception: exception, metaData: new Dictionary<string, object> { { "Command", publicBrokeredMessages } });
batchSuccess = false;
}
#endif
catch (TimeoutException)
{
if (count >= TimeoutOnSendRetryMaximumCount)
throw;
}
if (!batchSuccess)
{
foreach (var publicBrokeredMessage in publicBrokeredMessages)
{
#if NETSTANDARD2_0 || NET48_OR_GREATER
await PublicServiceBusPublisher.SendMessageAsync(publicBrokeredMessage);
#else
PublicServiceBusPublisher.Send(publicBrokeredMessage);
#endif
}
break;
}
count++;
} while (true);
wasSuccessfull = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Refactored .NET Standard 2.0 to be far now async/await able... means some breaking changes as method names and return types change.
* Added the ability to delay sending message publishing
* Introduced setting the SessionID (https://learn.microsoft.com/en-us/azure/service-bus-messaging/message-sessions) if IMessageWithOrderingKey is implemented on a message
* When a batch of events or commands fails to send as the batch is too big, will try to send them individually

Version 5.0

Expand Down

0 comments on commit 8864c80

Please sign in to comment.