-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Track messages that successfully completed the message or error pipel…
…ine but failed to get acknowledged due to expired leases (#1212) (#1220) * Tests for the receive strategy * Make ACK throw a LeaseTimeoutException consistently * Add Bitfaster caching * Implement fix * Cleanup * Some explanations and better expressiveness * Cover additional scenario of ACK failing * Make sure we keep track of to be completed message ids * Cleanup exception * Keep track of QueueMessage * Better log statement * Add acceptance test for visibility expiry * Acceptance test to verify expiry * Fix incorrect log statement * Tone down immediate retry to debug since immediate retries is a common case that should cause warns * Exception cosmetics * Limit concurrency instead * Properly group dependencies * Evil whitespace has crept in * Let the exception bubble up Co-authored-by: Jayanthi <88632084+soujay@users.noreply.github.com> * Improve the log statements --------- Co-authored-by: danielmarbach <danielmarbach@users.noreply.github.com> Co-authored-by: Jayanthi <88632084+soujay@users.noreply.github.com> (cherry picked from commit 5872d55) (cherry picked from commit c32a694) (cherry picked from commit 78bb31a) # Conflicts: # src/AcceptanceTests/NServiceBus.Transport.AzureStorageQueues.AcceptanceTests.csproj # src/Tests/NServiceBus.Transport.AzureStorageQueues.Tests.csproj # src/Transport/AtLeastOnceReceiveStrategy.cs # src/Transport/AtMostOnceReceiveStrategy.cs # src/Transport/MessageReceiver.cs # src/Transport/MessageRetrieved.cs # src/Transport/NServiceBus.Transport.AzureStorageQueues.csproj # src/TransportTests/NServiceBus.Transport.AzureStorageQueues.TransportTests.csproj Co-authored-by: danielmarbach <danielmarbach@users.noreply.github.com>
- Loading branch information
1 parent
f11fed8
commit 53f1d18
Showing
13 changed files
with
463 additions
and
74 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
119 changes: 119 additions & 0 deletions
119
src/AcceptanceTests/Receiving/When_message_visibility_expired.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
namespace NServiceBus.Transport.AzureStorageQueues.AcceptanceTests | ||
{ | ||
using System; | ||
using System.Linq; | ||
using System.Threading.Tasks; | ||
using AcceptanceTesting; | ||
using AcceptanceTesting.Customization; | ||
using global::Azure.Storage.Queues; | ||
using global::Azure.Storage.Queues.Models; | ||
using NServiceBus.AcceptanceTests; | ||
using NServiceBus.AcceptanceTests.EndpointTemplates; | ||
using NUnit.Framework; | ||
using Testing; | ||
|
||
public class When_message_visibility_expired : NServiceBusAcceptanceTest | ||
{ | ||
[Test] | ||
public async Task Should_complete_message_on_next_receive_when_pipeline_successful() | ||
{ | ||
var ctx = await Scenario.Define<Context>() | ||
.WithEndpoint<Receiver>(b => | ||
{ | ||
b.CustomConfig(c => | ||
{ | ||
// Limiting the concurrency for this test to make sure messages that are made available again are | ||
// not concurrently processed. This is not necessary for the test to pass but it makes | ||
// reasoning about the test easier. | ||
c.LimitMessageProcessingConcurrencyTo(1); | ||
}); | ||
b.When((session, _) => session.SendLocal(new MyMessage())); | ||
}) | ||
.Done(c => c.MessageId != null && c.Logs.Any(l => WasMarkedAsSuccessfullyCompleted(l, c))) | ||
.Run(); | ||
|
||
var items = ctx.Logs.Where(l => WasMarkedAsSuccessfullyCompleted(l, ctx)).ToArray(); | ||
|
||
Assert.That(items, Is.Not.Empty); | ||
} | ||
|
||
[Test] | ||
public async Task Should_complete_message_on_next_receive_when_error_pipeline_handled_the_message() | ||
{ | ||
var ctx = await Scenario.Define<Context>(c => | ||
{ | ||
c.ShouldThrow = true; | ||
}) | ||
.WithEndpoint<Receiver>(b => | ||
{ | ||
b.DoNotFailOnErrorMessages(); | ||
b.CustomConfig(c => | ||
{ | ||
var recoverability = c.Recoverability(); | ||
recoverability.AddUnrecoverableException<InvalidOperationException>(); | ||
// Limiting the concurrency for this test to make sure messages that are made available again are | ||
// not concurrently processed. This is not necessary for the test to pass but it makes | ||
// reasoning about the test easier. | ||
c.LimitMessageProcessingConcurrencyTo(1); | ||
}); | ||
b.When((session, _) => session.SendLocal(new MyMessage())); | ||
}) | ||
.Done(c => c.MessageId != null && c.Logs.Any(l => WasMarkedAsSuccessfullyCompleted(l, c))) | ||
.Run(); | ||
|
||
var items = ctx.Logs.Where(l => WasMarkedAsSuccessfullyCompleted(l, ctx)).ToArray(); | ||
|
||
Assert.That(items, Is.Not.Empty); | ||
} | ||
|
||
static bool WasMarkedAsSuccessfullyCompleted(ScenarioContext.LogItem l, Context c) | ||
=> l.Message.StartsWith($"Received message (ID: '{c.MessageId}') was marked as successfully completed"); | ||
|
||
class Context : ScenarioContext | ||
{ | ||
public bool ShouldThrow { get; set; } | ||
|
||
public string MessageId { get; set; } | ||
} | ||
|
||
class Receiver : EndpointConfigurationBuilder | ||
{ | ||
public Receiver() => EndpointSetup<DefaultServer>(c => | ||
{ | ||
var transport = c.ConfigureAsqTransport(); | ||
// Explicitly setting the transport transaction mode to ReceiveOnly because the message | ||
// tracking only is implemented for this mode. | ||
transport.Transactions(TransportTransactionMode.ReceiveOnly); | ||
}); | ||
} | ||
|
||
public class MyMessage : IMessage | ||
{ | ||
} | ||
|
||
class MyMessageHandler : IHandleMessages<MyMessage> | ||
{ | ||
readonly Context testContext; | ||
|
||
public MyMessageHandler(Context testContext) => this.testContext = testContext; | ||
|
||
public async Task Handle(MyMessage message, IMessageHandlerContext context) | ||
{ | ||
var receiverQueue = BackwardsCompatibleQueueNameSanitizerForTests.Sanitize(Conventions.EndpointNamingConvention(typeof(Receiver))); | ||
var queueClient = new QueueClient(Utilities.GetEnvConfiguredConnectionString(), receiverQueue); | ||
var rawMessage = context.Extensions.Get<QueueMessage>(); | ||
// By setting the visibility timeout zero, the message will be "immediately available" for retrieval again and effectively the message pump | ||
// has lost the message visibility timeout because any ACK or NACK will be rejected by the storage service. | ||
await queueClient.UpdateMessageAsync(rawMessage.MessageId, rawMessage.PopReceipt, visibilityTimeout: TimeSpan.Zero); | ||
|
||
testContext.MessageId = context.MessageId; | ||
|
||
if (testContext.ShouldThrow) | ||
{ | ||
throw new InvalidOperationException("Simulated exception"); | ||
} | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.