Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prevent timeout message loss when not using DTC #2924

Merged
merged 6 commits into from
Oct 2, 2015
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@
*.fs text
*.fsx text
*.hs text

*.targets text
*.psm1 text
*.ps1 text
*.md text
*.DotSettings text
*.txt text eol=crlf
*.bat text eol=crlf

Expand All @@ -71,6 +73,3 @@
*.snk -text -diff
*.cub -text -diff
*.wixlib -text -diff


*.approved.* binary
67 changes: 57 additions & 10 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,12 @@ nugets
deploy
build32
binaries
obj
bin
*.vshost.*
.nu
_ReSharper.*
_UpgradeReport.*
*.csproj.user
*.resharper.user
*.resharper
*.suo
*.cache
*~
*.swp
*.user
TestResults
TestResult.xml
results
CommonAssemblyInfo.cs
lib/sqlite/System.Data.SQLite.dll
Expand All @@ -36,3 +26,60 @@ _NCrunch_NServiceBus/*
logs
run-git.cmd
src/Chocolatey/Build/*

installer/[F|f]iles
installer/[C|c]ustom[A|a]ctions
installer/ServiceControl-cache

# Created by https://www.gitignore.io

### VisualStudio ###
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.

# User-specific files
*.suo
*.user
*.userosscache
*.sln.docstates
.vs/

# mac temp file ignore
.DS_Store

# Build results
[Dd]ebug/
[Dd]ebugPublic/
[Rr]elease/
[Rr]eleases/
x64/
x86/
build/
bld/
[Bb]in/
[Oo]bj/

# Roslyn cache directories
*.ide/

# MSTest test Results
[Tt]est[Rr]esult*/
[Bb]uild[Ll]og.*

#NUNIT
*.VisualState.xml
TestResult.xml

# NCrunch
_NCrunch_*
.*crunch*.local.xml

# ReSharper is a .NET coding add-in
_ReSharper*/
*.[Rr]e[Ss]harper
*.DotSettings.user

src/scaffolding.config

# Approval tests temp file
*.received.*
12 changes: 12 additions & 0 deletions src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,18 @@
<Compile Include="EndpointTemplates\DefaultServer.cs" />
<Compile Include="ScenarioDescriptors\SubscriptionStorages.cs" />
<Compile Include="Scheduling\When_scheduling_a_recurring_task.cs" />
<Compile Include="Timeouts\OutdatedTimeoutPersister.cs" />
<Compile Include="Timeouts\UpdatedTimeoutPersister.cs" />
<Compile Include="Timeouts\When_dispatched_timeout_already_removed_from_timeout_storage.cs" />
<Compile Include="Timeouts\When_dispatching_deferred_message_fails_without_dtc.cs" />
<Compile Include="Timeouts\When_endpoint_uses_no_timeout_persistence.cs" />
<Compile Include="Timeouts\When_endpoint_uses_outdated_sql_transport_with_dtc.cs" />
<Compile Include="Timeouts\When_endpoint_uses_outdated_timeout_persistence_without_dtc.cs" />
<Compile Include="Timeouts\When_endpoint_uses_outdated_timeout_persistence_with_disabled_dtc.cs" />
<Compile Include="Timeouts\When_endpoint_uses_outdated_timeout_persistence_with_dtc.cs" />
<Compile Include="Timeouts\When_endpoint_uses_outdated_sql_transport_with_disabled_dtc.cs" />
<Compile Include="Timeouts\When_endpoint_uses_updated_timeout_persistence.cs" />
<Compile Include="Timeouts\When_endpoint_uses_updated_sql_transport_with_disabled_dtc.cs" />
<Compile Include="Transactions\FakePromotableResourceManager.cs" />
<Compile Include="Transactions\When_receiving_a_message_with_dtc_disabled.cs" />
<Compile Include="Transactions\When_receiving_a_message_with_dtc_enabled.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace NServiceBus.AcceptanceTests.Timeouts
{
using System;
using System.Collections.Generic;
using System.Linq;
using NServiceBus.Timeout.Core;

class OutdatedTimeoutPersister : IPersistTimeouts
{
public List<Tuple<string, DateTime>> GetNextChunk(DateTime startSlice, out DateTime nextTimeToRunQuery)
{
nextTimeToRunQuery = DateTime.Now.AddYears(42);
return Enumerable.Empty<Tuple<string, DateTime>>().ToList();
}

public void Add(TimeoutData timeout)
{
}

public bool TryRemove(string timeoutId, out TimeoutData timeoutData)
{
timeoutData = null;
return false;
}

public void RemoveTimeoutBy(Guid sagaId)
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
namespace NServiceBus.AcceptanceTests.Timeouts
{
using System;
using System.Collections.Generic;
using System.Linq;
using NServiceBus.Timeout.Core;

class UpdatedTimeoutPersister : IPersistTimeouts, IPersistTimeoutsV2
{
public List<Tuple<string, DateTime>> GetNextChunk(DateTime startSlice, out DateTime nextTimeToRunQuery)
{
nextTimeToRunQuery = DateTime.Now.AddYears(42);
return Enumerable.Empty<Tuple<string, DateTime>>().ToList();
}

public void Add(TimeoutData timeout)
{
}

public bool TryRemove(string timeoutId, out TimeoutData timeoutData)
{
timeoutData = null;
return false;
}

public void RemoveTimeoutBy(Guid sagaId)
{
}

public TimeoutData Peek(string timeoutId)
{
return null;
}

public bool TryRemove(string timeoutId)
{
return true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
namespace NServiceBus.AcceptanceTests.Timeouts
{
using System;
using System.Collections.Generic;
using System.Transactions;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NServiceBus.Config;
using NServiceBus.Timeout.Core;
using NUnit.Framework;

public class When_dispatched_timeout_already_removed_from_timeout_storage : NServiceBusAcceptanceTest
{
[Test]
public void Should_rollback_and_not_deliver_timeout_when_using_dtc()
{
var context = new Context();

Scenario.Define(context)
.WithEndpoint<TimeoutHandlingEndpoint>(b => b
.CustomConfig(configure => Configure.Transactions.Advanced(s => s.EnableDistributedTransactions()))
.Given(bus =>
{
bus.Defer(TimeSpan.FromSeconds(5), new MyMessage());
}))
.Done(c => c.AttemptedToRemoveTimeout || c.MessageReceived)
.Run();

Assert.IsFalse(context.MessageReceived, "Message should not be delivered using dtc");
Assert.AreEqual(2, context.NumberOfProcessingAttempts, "The rollback should cause a retry");
Assert.IsTrue(context.AttemptedToRemoveTimeout);
}

[Test]
public void Should_rollback_and_deliver_timeout_anyway_when_using_native_tx()
{
var context = new Context();

Scenario.Define(context)
.WithEndpoint<TimeoutHandlingEndpoint>(b => b
.CustomConfig(configure => Configure.Transactions.Advanced(s => s.DisableDistributedTransactions()))
.Given(bus =>
{
bus.Defer(TimeSpan.FromSeconds(5), new MyMessage());
}))
.Done(c => c.AttemptedToRemoveTimeout && c.MessageReceived)
.Run();

Assert.IsTrue(context.MessageReceived, "Message should be delivered although transaction has been aborted");
Assert.AreEqual(2, context.NumberOfProcessingAttempts, "The rollback should cause a retry");
Assert.IsTrue(context.AttemptedToRemoveTimeout);
}

[Test]
public void Should_deliver_timeout_anyway_when_using_no_tx()
{
var context = new Context();

Scenario.Define(context)
.WithEndpoint<TimeoutHandlingEndpoint>(b => b
.CustomConfig(configure => Configure.Transactions.Disable())
.Given(bus =>
{
bus.Defer(TimeSpan.FromSeconds(5), new MyMessage());
}))
.Done(c => c.AttemptedToRemoveTimeout && c.MessageReceived)
.Run();

Assert.IsTrue(context.MessageReceived, "Message should be delivered although timeout processing fails");
Assert.AreEqual(1, context.NumberOfProcessingAttempts, "Should not retry without transactions enabled");
Assert.IsTrue(context.AttemptedToRemoveTimeout);
}

public class Context : ScenarioContext
{
public bool MessageReceived { get; set; }

public bool AttemptedToRemoveTimeout { get; set; }

public int NumberOfProcessingAttempts { get; set; }
}

public class TimeoutHandlingEndpoint : EndpointConfigurationBuilder
{
public TimeoutHandlingEndpoint()
{
EndpointSetup<DefaultServer>();
}

public class DelayedMessageHandler : IHandleMessages<MyMessage>
{
Context context;

public DelayedMessageHandler(Context context)
{
this.context = context;
}

public void Handle(MyMessage message)
{
context.MessageReceived = true;
}
}

public class EndpointConfiguration : IWantToRunWhenConfigurationIsComplete
{
Context context;
IPersistTimeouts originalPersister;

public EndpointConfiguration(Context context, IPersistTimeouts originalPersister)
{
this.context = context;
this.originalPersister = originalPersister;
}

public void Run()
{
Configure.Component(b => new TimeoutPersistenceWrapper(originalPersister, originalPersister as IPersistTimeoutsV2, context), DependencyLifecycle.SingleInstance);
}
}

class TimeoutPersistenceWrapper : IPersistTimeouts, IPersistTimeoutsV2
{
IPersistTimeouts originalTimeoutPersister;
IPersistTimeoutsV2 originalTimeoutPersisterV2;
Context context;

public TimeoutPersistenceWrapper(IPersistTimeouts originalTimeoutPersister, IPersistTimeoutsV2 originalTimeoutPersisterV2, Context context)
{
this.originalTimeoutPersister = originalTimeoutPersister;
this.originalTimeoutPersisterV2 = originalTimeoutPersisterV2;
this.context = context;
}

public List<Tuple<string, DateTime>> GetNextChunk(DateTime startSlice, out DateTime nextTimeToRunQuery)
{
return originalTimeoutPersister.GetNextChunk(startSlice, out nextTimeToRunQuery);
}

public void Add(TimeoutData timeout)
{
originalTimeoutPersister.Add(timeout);
}

public bool TryRemove(string timeoutId, out TimeoutData timeoutData)
{
return originalTimeoutPersister.TryRemove(timeoutId, out timeoutData);
}

public void RemoveTimeoutBy(Guid sagaId)
{
originalTimeoutPersister.RemoveTimeoutBy(sagaId);
}

public TimeoutData Peek(string timeoutId)
{
context.NumberOfProcessingAttempts++;
return originalTimeoutPersisterV2.Peek(timeoutId);
}

public bool TryRemove(string timeoutId)
{
context.AttemptedToRemoveTimeout = true;

using (var tx = new TransactionScope(TransactionScopeOption.Suppress))
{
// delete the timeout so it won't be available on retries
originalTimeoutPersisterV2.TryRemove(timeoutId);
tx.Complete();
}

return false;
}
}
}

[Serializable]
public class MyMessage : IMessage
{
}
}
}
Loading