Skip to content

Commit

Permalink
Track messages that successfully completed the message or error pipel…
Browse files Browse the repository at this point in the history
…ine but failed to get acknowledged due to expired leases (#1212) (#1215)

* Tests for the receive strategy

* Make ACK throw a LeaseTimeoutException consistently

* Add Bitfaster caching

* Implement fix

* Cleanup

* Some explanations and better expressiveness

* Cover additional scenario of ACK failing

* Make sure we keep track of to be completed message ids

* Cleanup exception

* Keep track of QueueMessage

* Better log statement

* Add acceptance test for visibility expiry

* Acceptance test to verify expiry

* Fix incorrect log statement

* Tone down immediate retry to debug since immediate retries is a common case that should cause warns

* Exception cosmetics

* Limit concurrency instead

* Properly group dependencies

* Evil whitespace has crept in

* Let the exception bubble up

Co-authored-by: Jayanthi <88632084+soujay@users.noreply.github.com>

* Improve the log statements

---------

Co-authored-by: danielmarbach <danielmarbach@users.noreply.github.com>
Co-authored-by: Jayanthi <88632084+soujay@users.noreply.github.com>
(cherry picked from commit 5872d55)

(cherry picked from commit c32a694)
(cherry picked from commit 78bb31a)

Co-authored-by: danielmarbach <danielmarbach@users.noreply.github.com>
  • Loading branch information
danielmarbach and danielmarbach authored Sep 18, 2024
1 parent 26dd9bc commit f1f3e42
Show file tree
Hide file tree
Showing 11 changed files with 410 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.3.3" />
<PackageReference Include="Azure.Data.Tables" Version="12.7.1" />
<PackageReference Include="Azure.Storage.Queues" Version="12.12.0" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.14.1" />
<PackageReference Include="Azure.Data.Tables" Version="12.9.0" />
<PackageReference Include="Azure.Identity" Version="1.12.0" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.21.2" />
<PackageReference Include="Azure.Storage.Queues" Version="12.19.1" />
<PackageReference Include="BitFaster.Caching" Version="2.5.2" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="8.0.0" />
<PackageReference Include="NServiceBus.Newtonsoft.Json" Version="3.0.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
</ItemGroup>

</Project>
119 changes: 119 additions & 0 deletions src/AcceptanceTests/Receiving/When_message_visibility_expired.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
namespace NServiceBus.Transport.AzureStorageQueues.AcceptanceTests
{
using System;
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using global::Azure.Storage.Queues;
using global::Azure.Storage.Queues.Models;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;
using Testing;

public class When_message_visibility_expired : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_complete_message_on_next_receive_when_pipeline_successful()
{
var ctx = await Scenario.Define<Context>()
.WithEndpoint<Receiver>(b =>
{
b.CustomConfig(c =>
{
// Limiting the concurrency for this test to make sure messages that are made available again are
// not concurrently processed. This is not necessary for the test to pass but it makes
// reasoning about the test easier.
c.LimitMessageProcessingConcurrencyTo(1);
});
b.When((session, _) => session.SendLocal(new MyMessage()));
})
.Done(c => c.MessageId is not null && c.Logs.Any(l => WasMarkedAsSuccessfullyCompleted(l, c)))
.Run();

var items = ctx.Logs.Where(l => WasMarkedAsSuccessfullyCompleted(l, ctx)).ToArray();

Assert.That(items, Is.Not.Empty);
}

[Test]
public async Task Should_complete_message_on_next_receive_when_error_pipeline_handled_the_message()
{
var ctx = await Scenario.Define<Context>(c =>
{
c.ShouldThrow = true;
})
.WithEndpoint<Receiver>(b =>
{
b.DoNotFailOnErrorMessages();
b.CustomConfig(c =>
{
var recoverability = c.Recoverability();
recoverability.AddUnrecoverableException<InvalidOperationException>();
// Limiting the concurrency for this test to make sure messages that are made available again are
// not concurrently processed. This is not necessary for the test to pass but it makes
// reasoning about the test easier.
c.LimitMessageProcessingConcurrencyTo(1);
});
b.When((session, _) => session.SendLocal(new MyMessage()));
})
.Done(c => c.MessageId is not null && c.Logs.Any(l => WasMarkedAsSuccessfullyCompleted(l, c)))
.Run();

var items = ctx.Logs.Where(l => WasMarkedAsSuccessfullyCompleted(l, ctx)).ToArray();

Assert.That(items, Is.Not.Empty);
}

static bool WasMarkedAsSuccessfullyCompleted(ScenarioContext.LogItem l, Context c)
=> l.Message.StartsWith($"Received message (ID: '{c.MessageId}') was marked as successfully completed");

class Context : ScenarioContext
{
public bool ShouldThrow { get; set; }

public string MessageId { get; set; }
}

class Receiver : EndpointConfigurationBuilder
{
public Receiver() => EndpointSetup<DefaultServer>(c =>
{
var transport = c.ConfigureTransport<AzureStorageQueueTransport>();
// Explicitly setting the transport transaction mode to ReceiveOnly because the message
// tracking only is implemented for this mode.
transport.TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
});
}

public class MyMessage : IMessage
{
}

class MyMessageHandler : IHandleMessages<MyMessage>
{
readonly Context testContext;

public MyMessageHandler(Context testContext) => this.testContext = testContext;

public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
var receiverQueue = BackwardsCompatibleQueueNameSanitizerForTests.Sanitize(Conventions.EndpointNamingConvention(typeof(Receiver)));
var queueClient = new QueueClient(Utilities.GetEnvConfiguredConnectionString(), receiverQueue);
var rawMessage = context.Extensions.Get<QueueMessage>();
// By setting the visibility timeout zero, the message will be "immediately available" for retrieval again and effectively the message pump
// has lost the message visibility timeout because any ACK or NACK will be rejected by the storage service.
await queueClient.UpdateMessageAsync(rawMessage.MessageId, rawMessage.PopReceipt, visibilityTimeout: TimeSpan.Zero, cancellationToken: context.CancellationToken);

testContext.MessageId = context.MessageId;

if (testContext.ShouldThrow)
{
throw new InvalidOperationException("Simulated exception");
}
}
}
}
}
152 changes: 152 additions & 0 deletions src/Tests/AtLeastOnceReceiveStrategyTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
namespace NServiceBus.Transport.AzureStorageQueues.Tests
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Azure.Transports.WindowsAzureStorageQueues;
using global::Azure;
using global::Azure.Storage.Queues.Models;
using NUnit.Framework;

[TestFixture]
public class AtLeastOnceReceiveStrategyTests
{
[Test]
public async Task Should_complete_message_on_next_receive_when_pipeline_successful_but_completion_failed_due_to_expired_lease()
{
var fakeQueueClient = new FakeQueueClient();
var onMessageCalled = 0;
var onErrorCalled = 0;

var receiveStrategy = new AtLeastOnceReceiveStrategy((_, _) =>
{
onMessageCalled++;
return Task.CompletedTask;
}, (_, _) =>
{
onErrorCalled++;
return Task.FromResult(ErrorHandleResult.Handled);
}, (_, _, _) => { });

var messageId = Guid.NewGuid().ToString();

var rawMessageThatIsExpired = QueuesModelFactory.QueueMessage("RawMessageId1", "PopReceipt1", "", 1, nextVisibleOn: DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(30)));
var messageRetrieved1 = new MessageRetrieved(null, null, rawMessageThatIsExpired, fakeQueueClient, null);
var messageWrapper1 = new MessageWrapper { Id = messageId, Headers = new Dictionary<string, string>() };

Assert.ThrowsAsync<LeaseTimeoutException>(async () => await receiveStrategy.Receive(messageRetrieved1, messageWrapper1, "queue"));

var rawMessageThatIsValid = QueuesModelFactory.QueueMessage("RawMessageId2", "PopReceipt2", "", 1, nextVisibleOn: DateTimeOffset.UtcNow.Add(TimeSpan.FromSeconds(30)));
var messageRetrieved2 = new MessageRetrieved(null, null, rawMessageThatIsValid, fakeQueueClient, null);
var messageWrapper2 = new MessageWrapper { Id = messageId, Headers = new Dictionary<string, string>() };

await receiveStrategy.Receive(messageRetrieved2, messageWrapper2, "queue");

Assert.Multiple(() =>
{
Assert.That(fakeQueueClient.DeletedMessages, Has.Count.EqualTo(1).And.Contains(("RawMessageId2", "PopReceipt2")));
Assert.That(onMessageCalled, Is.EqualTo(1));
Assert.That(onErrorCalled, Is.Zero);
});
}

[Test]
public void Should_rethrow_on_next_receive_when_message_could_not_be_completed()
{
var fakeQueueClient = new FakeQueueClient();

var receiveStrategy = new AtLeastOnceReceiveStrategy((_, _) => Task.CompletedTask, (_, _) => Task.FromResult(ErrorHandleResult.Handled), (_, _, _) => { });

var messageId = Guid.NewGuid().ToString();

var rawMessageThatIsExpired = QueuesModelFactory.QueueMessage("RawMessageId1", "PopReceipt1", "", 1, nextVisibleOn: DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(30)));
var messageRetrieved1 = new MessageRetrieved(null, null, rawMessageThatIsExpired, fakeQueueClient, null);
var messageWrapper1 = new MessageWrapper { Id = messageId, Headers = new Dictionary<string, string>() };

Assert.ThrowsAsync<LeaseTimeoutException>(async () => await receiveStrategy.Receive(messageRetrieved1, messageWrapper1, "queue"));

fakeQueueClient.DeleteMessageCallback = () => throw new RequestFailedException(404, "MessageNotFound", QueueErrorCode.MessageNotFound.ToString(), null);

var rawMessageThatIsValid = QueuesModelFactory.QueueMessage("RawMessageId2", "PopReceipt2", "", 1, nextVisibleOn: DateTimeOffset.UtcNow.Add(TimeSpan.FromSeconds(30)));
var messageRetrieved2 = new MessageRetrieved(null, null, rawMessageThatIsValid, fakeQueueClient, null);
var messageWrapper2 = new MessageWrapper { Id = messageId, Headers = new Dictionary<string, string>() };

Assert.ThrowsAsync<LeaseTimeoutException>(async () => await receiveStrategy.Receive(messageRetrieved2, messageWrapper2, "queue"));
Assert.That(fakeQueueClient.DeletedMessages, Is.Empty);
}

[Test]
public async Task Should_eventually_complete_the_message_even_when_next_receive_threw_when_message_could_not_be_completed()
{
var fakeQueueClient = new FakeQueueClient();

var receiveStrategy = new AtLeastOnceReceiveStrategy((_, _) => Task.CompletedTask, (_, _) => Task.FromResult(ErrorHandleResult.Handled), (_, _, _) => { });

var messageId = Guid.NewGuid().ToString();

var rawMessageThatIsExpired = QueuesModelFactory.QueueMessage("RawMessageId1", "PopReceipt1", "", 1, nextVisibleOn: DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(30)));
var messageRetrieved1 = new MessageRetrieved(null, null, rawMessageThatIsExpired, fakeQueueClient, null);
var messageWrapper1 = new MessageWrapper { Id = messageId, Headers = new Dictionary<string, string>() };

Assert.ThrowsAsync<LeaseTimeoutException>(async () => await receiveStrategy.Receive(messageRetrieved1, messageWrapper1, "queue"));

fakeQueueClient.DeleteMessageCallback = () => throw new RequestFailedException(404, "MessageNotFound", QueueErrorCode.MessageNotFound.ToString(), null);

var rawMessageThatCannotBeCompleted = QueuesModelFactory.QueueMessage("RawMessageId2", "PopReceipt2", "", 1, nextVisibleOn: DateTimeOffset.UtcNow.Add(TimeSpan.FromSeconds(30)));
var messageRetrieved2 = new MessageRetrieved(null, null, rawMessageThatCannotBeCompleted, fakeQueueClient, null);
var messageWrapper2 = new MessageWrapper { Id = messageId, Headers = new Dictionary<string, string>() };

Assert.ThrowsAsync<LeaseTimeoutException>(async () => await receiveStrategy.Receive(messageRetrieved2, messageWrapper2, "queue"));

fakeQueueClient.DeleteMessageCallback = () => Task.FromResult(default(Response));

var rawMessageThatIsValid = QueuesModelFactory.QueueMessage("RawMessageId3", "PopReceipt3", "", 1, nextVisibleOn: DateTimeOffset.UtcNow.Add(TimeSpan.FromSeconds(30)));
var messageRetrieved3 = new MessageRetrieved(null, null, rawMessageThatIsValid, fakeQueueClient, null);
var messageWrapper3 = new MessageWrapper { Id = messageId, Headers = new Dictionary<string, string>() };

await receiveStrategy.Receive(messageRetrieved3, messageWrapper3, "queue");

Assert.That(fakeQueueClient.DeletedMessages, Has.Count.EqualTo(1).And.Contains(("RawMessageId3", "PopReceipt3")));
}

[Test]
public async Task Should_complete_message_on_next_receive_when_error_pipeline_successful_but_completion_failed_due_to_expired_lease()
{
var fakeQueueClient = new FakeQueueClient();

var onMessageCalled = 0;
var onErrorCalled = 0;

var receiveStrategy = new AtLeastOnceReceiveStrategy((_, _) =>
{
onMessageCalled++;
return Task.FromException<InvalidOperationException>(new InvalidOperationException());
}, (_, _) =>
{
onErrorCalled++;
return Task.FromResult(ErrorHandleResult.Handled);
}, (_, _, _) => { });

var messageId = Guid.NewGuid().ToString();

var rawMessageThatIsExpired = QueuesModelFactory.QueueMessage("RawMessageId1", "PopReceipt1", "", 1, nextVisibleOn: DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(30)));
var messageRetrieved1 = new MessageRetrieved(null, null, rawMessageThatIsExpired, fakeQueueClient, null);
var messageWrapper1 = new MessageWrapper { Id = messageId, Headers = new Dictionary<string, string>() };

await receiveStrategy.Receive(messageRetrieved1, messageWrapper1, "queue");

var rawMessageThatIsValid = QueuesModelFactory.QueueMessage("RawMessageId2", "PopReceipt2", "", 1, nextVisibleOn: DateTimeOffset.UtcNow.Add(TimeSpan.FromSeconds(30)));
var messageRetrieved2 = new MessageRetrieved(null, null, rawMessageThatIsValid, fakeQueueClient, null);
var messageWrapper2 = new MessageWrapper { Id = messageId, Headers = new Dictionary<string, string>() };

await receiveStrategy.Receive(messageRetrieved2, messageWrapper2, "queue");

Assert.Multiple(() =>
{
Assert.That(fakeQueueClient.DeletedMessages, Has.Count.EqualTo(1).And.Contains(("RawMessageId2", "PopReceipt2")));
Assert.That(onMessageCalled, Is.EqualTo(1));
Assert.That(onErrorCalled, Is.EqualTo(1));
});
}
}
}
30 changes: 30 additions & 0 deletions src/Tests/FakeQueueClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace NServiceBus.Transport.AzureStorageQueues.Tests
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using global::Azure;
using global::Azure.Storage.Queues;

public class FakeQueueClient : QueueClient
{
public FakeQueueClient() : base(new Uri("http://localhost"))
{
}

public IReadOnlyCollection<(string messageId, string popReceipt)> DeletedMessages => deletedMessages;

public Func<Task<Response>> DeleteMessageCallback = () => Task.FromResult(default(Response));

public override async Task<Response> DeleteMessageAsync(string messageId, string popReceipt,
CancellationToken cancellationToken = default)
{
var response = await DeleteMessageCallback();
deletedMessages.Add((messageId, popReceipt));
return response;
}

readonly List<(string messageId, string popReceipt)> deletedMessages = new List<(string messageId, string popReceipt)>();
}
}
22 changes: 14 additions & 8 deletions src/Tests/NServiceBus.Transport.AzureStorageQueues.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,29 @@
<TargetFrameworks>net481;net8.0</TargetFrameworks>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>$(SolutionDir)NServiceBusTests.snk</AssemblyOriginatorKeyFile>
<LangVersion>10.0</LangVersion>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Transport\NServiceBus.Transport.AzureStorageQueues.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.3.3" />
<PackageReference Include="Azure.Storage.Queues" Version="12.12.0" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.14.1" />
<PackageReference Include="Microsoft.Azure.Cosmos.Table" Version="1.0.8" />
<PackageReference Include="Particular.Approvals" Version="0.3.0" />
<PackageReference Include="Azure.Data.Tables" Version="12.9.0" />
<PackageReference Include="Azure.Identity" Version="1.12.0" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.21.2" />
<PackageReference Include="Azure.Storage.Queues" Version="12.19.1" />
<PackageReference Include="BitFaster.Caching" Version="2.5.2" />
<PackageReference Include="NServiceBus" Version="8.0.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
<PackageReference Include="Particular.Approvals" Version="1.0.0" />
<PackageReference Include="PublicApiGenerator" Version="11.1.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="PublicApiGenerator" Version="10.3.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
</ItemGroup>

</Project>
Loading

0 comments on commit f1f3e42

Please sign in to comment.