Skip to content

Commit

Permalink
fixed timeout persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Bussmann committed Sep 28, 2015
1 parent 4630075 commit ac28c79
Show file tree
Hide file tree
Showing 50 changed files with 1,429 additions and 93 deletions.
7 changes: 3 additions & 4 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@
*.fs text
*.fsx text
*.hs text

*.targets text
*.psm1 text
*.ps1 text
*.md text
*.DotSettings text
*.txt text eol=crlf
*.bat text eol=crlf

Expand All @@ -71,6 +73,3 @@
*.snk -text -diff
*.cub -text -diff
*.wixlib -text -diff


*.approved.* binary
67 changes: 57 additions & 10 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,12 @@ nugets
deploy
build32
binaries
obj
bin
*.vshost.*
.nu
_ReSharper.*
_UpgradeReport.*
*.csproj.user
*.resharper.user
*.resharper
*.suo
*.cache
*~
*.swp
*.user
TestResults
TestResult.xml
results
CommonAssemblyInfo.cs
lib/sqlite/System.Data.SQLite.dll
Expand All @@ -36,3 +26,60 @@ _NCrunch_NServiceBus/*
logs
run-git.cmd
src/Chocolatey/Build/*

installer/[F|f]iles
installer/[C|c]ustom[A|a]ctions
installer/ServiceControl-cache

# Created by https://www.gitignore.io

### VisualStudio ###
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.

# User-specific files
*.suo
*.user
*.userosscache
*.sln.docstates
.vs/

# mac temp file ignore
.DS_Store

# Build results
[Dd]ebug/
[Dd]ebugPublic/
[Rr]elease/
[Rr]eleases/
x64/
x86/
build/
bld/
[Bb]in/
[Oo]bj/

# Roslyn cache directories
*.ide/

# MSTest test Results
[Tt]est[Rr]esult*/
[Bb]uild[Ll]og.*

#NUNIT
*.VisualState.xml
TestResult.xml

# NCrunch
_NCrunch_*
.*crunch*.local.xml

# ReSharper is a .NET coding add-in
_ReSharper*/
*.[Rr]e[Ss]harper
*.DotSettings.user

src/scaffolding.config

# Approval tests temp file
*.received.*
12 changes: 12 additions & 0 deletions src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,18 @@
<Compile Include="EndpointTemplates\DefaultServer.cs" />
<Compile Include="ScenarioDescriptors\SubscriptionStorages.cs" />
<Compile Include="Scheduling\When_scheduling_a_recurring_task.cs" />
<Compile Include="Timeouts\OutdatedTimeoutPersister.cs" />
<Compile Include="Timeouts\UpdatedTimeoutPersister.cs" />
<Compile Include="Timeouts\When_dispatched_timeout_already_removed_from_timeout_storage.cs" />
<Compile Include="Timeouts\When_dispatching_deferred_message_fails_without_dtc.cs" />
<Compile Include="Timeouts\When_endpoint_uses_no_timeout_persistence.cs" />
<Compile Include="Timeouts\When_endpoint_uses_outdated_sql_transport_with_dtc.cs" />
<Compile Include="Timeouts\When_endpoint_uses_outdated_timeout_persistence_without_dtc.cs" />
<Compile Include="Timeouts\When_endpoint_uses_outdated_timeout_persistence_with_disabled_dtc.cs" />
<Compile Include="Timeouts\When_endpoint_uses_outdated_timeout_persistence_with_dtc.cs" />
<Compile Include="Timeouts\When_endpoint_uses_outdated_sql_transport_with_disabled_dtc.cs" />
<Compile Include="Timeouts\When_endpoint_uses_updated_timeout_persistence.cs" />
<Compile Include="Timeouts\When_endpoint_uses_updated_sql_transport_with_disabled_dtc.cs" />
<Compile Include="Transactions\FakePromotableResourceManager.cs" />
<Compile Include="Transactions\When_receiving_a_message_with_dtc_disabled.cs" />
<Compile Include="Transactions\When_receiving_a_message_with_dtc_enabled.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace NServiceBus.AcceptanceTests.Timeouts
{
using System;
using System.Collections.Generic;
using System.Linq;
using NServiceBus.Timeout.Core;

class OutdatedTimeoutPersister : IPersistTimeouts
{
public List<Tuple<string, DateTime>> GetNextChunk(DateTime startSlice, out DateTime nextTimeToRunQuery)
{
nextTimeToRunQuery = DateTime.Now.AddYears(42);
return Enumerable.Empty<Tuple<string, DateTime>>().ToList();
}

public void Add(TimeoutData timeout)
{
}

public bool TryRemove(string timeoutId, out TimeoutData timeoutData)
{
timeoutData = null;
return false;
}

public void RemoveTimeoutBy(Guid sagaId)
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
namespace NServiceBus.AcceptanceTests.Timeouts
{
using System;
using System.Collections.Generic;
using System.Linq;
using NServiceBus.Timeout.Core;

class UpdatedTimeoutPersister : IPersistTimeouts, IPersistTimeoutsV2
{
public List<Tuple<string, DateTime>> GetNextChunk(DateTime startSlice, out DateTime nextTimeToRunQuery)
{
nextTimeToRunQuery = DateTime.Now.AddYears(42);
return Enumerable.Empty<Tuple<string, DateTime>>().ToList();
}

public void Add(TimeoutData timeout)
{
}

public bool TryRemove(string timeoutId, out TimeoutData timeoutData)
{
timeoutData = null;
return false;
}

public void RemoveTimeoutBy(Guid sagaId)
{
}

public TimeoutData Peek(string timeoutId)
{
return null;
}

public bool TryRemove(string timeoutId)
{
return true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
namespace NServiceBus.AcceptanceTests.Timeouts
{
using System;
using System.Collections.Generic;
using System.Transactions;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NServiceBus.Config;
using NServiceBus.Timeout.Core;
using NUnit.Framework;

public class When_dispatched_timeout_already_removed_from_timeout_storage
{
[Test]
public void Should_rollback_and_not_deliver_timeout_when_using_dtc()
{
var context = new Context();

Scenario.Define(context)
.WithEndpoint<TimeoutHandlingEndpoint>(b => b
.CustomConfig(configure => Configure.Transactions.Advanced(s => s.EnableDistributedTransactions()))
.Given(bus =>
{
bus.Defer(TimeSpan.FromSeconds(5), new MyMessage());
}))
.Done(c => c.AttemptedToRemoveTimeout || c.MessageReceived)
.Run();

Assert.IsFalse(context.MessageReceived, "Message should not be delivered using dtc");
Assert.AreEqual(2, context.NumberOfProcessingAttempts, "The rollback should cause a retry");
Assert.IsTrue(context.AttemptedToRemoveTimeout);
}

[Test]
public void Should_rollback_and_deliver_timeout_anyway_when_using_native_tx()
{
var context = new Context();

Scenario.Define(context)
.WithEndpoint<TimeoutHandlingEndpoint>(b => b
.CustomConfig(configure => Configure.Transactions.Advanced(s => s.DisableDistributedTransactions()))
.Given(bus =>
{
bus.Defer(TimeSpan.FromSeconds(5), new MyMessage());
}))
.Done(c => c.AttemptedToRemoveTimeout && c.MessageReceived)
.Run();

Assert.IsTrue(context.MessageReceived, "Message should only be delivered although transaction has been aborted");
Assert.AreEqual(2, context.NumberOfProcessingAttempts, "The rollback should cause a retry");
Assert.IsTrue(context.AttemptedToRemoveTimeout);
}

[Test]
public void Should_deliver_timeout_anyway_when_using_no_tx()
{
var context = new Context();

Scenario.Define(context)
.WithEndpoint<TimeoutHandlingEndpoint>(b => b
.CustomConfig(configure => Configure.Transactions.Disable())
.Given(bus =>
{
bus.Defer(TimeSpan.FromSeconds(5), new MyMessage());
}))
.Done(c => c.AttemptedToRemoveTimeout && c.MessageReceived)
.Run();

Assert.IsTrue(context.MessageReceived, "Message should be delivered although timeout processing fails");
Assert.AreEqual(1, context.NumberOfProcessingAttempts, "Should not retry without transactions enabled");
Assert.IsTrue(context.AttemptedToRemoveTimeout);
}

public class Context : ScenarioContext
{
public bool MessageReceived { get; set; }

public bool AttemptedToRemoveTimeout { get; set; }

public int NumberOfProcessingAttempts { get; set; }
}

public class TimeoutHandlingEndpoint : EndpointConfigurationBuilder
{
public TimeoutHandlingEndpoint()
{
EndpointSetup<DefaultServer>();
}

public class DelayedMessageHandler : IHandleMessages<MyMessage>
{
Context context;

public DelayedMessageHandler(Context context)
{
this.context = context;
}

public void Handle(MyMessage message)
{
context.MessageReceived = true;
}
}

public class EndpointConfiguration : IWantToRunWhenConfigurationIsComplete
{
Context context;
IPersistTimeouts originalPersister;

public EndpointConfiguration(Context context, IPersistTimeouts originalPersister)
{
this.context = context;
this.originalPersister = originalPersister;
}

public void Run()
{
Configure.Component(b => new TimeoutPersistenceWrapper(originalPersister, originalPersister as IPersistTimeoutsV2, context), DependencyLifecycle.SingleInstance);
}
}

class TimeoutPersistenceWrapper : IPersistTimeouts, IPersistTimeoutsV2
{
IPersistTimeouts originalTimeoutPersister;
IPersistTimeoutsV2 originalTimeoutPersisterV2;
Context context;

public TimeoutPersistenceWrapper(IPersistTimeouts originalTimeoutPersister, IPersistTimeoutsV2 originalTimeoutPersisterV2, Context context)
{
this.originalTimeoutPersister = originalTimeoutPersister;
this.originalTimeoutPersisterV2 = originalTimeoutPersisterV2;
this.context = context;
}

public List<Tuple<string, DateTime>> GetNextChunk(DateTime startSlice, out DateTime nextTimeToRunQuery)
{
return originalTimeoutPersister.GetNextChunk(startSlice, out nextTimeToRunQuery);
}

public void Add(TimeoutData timeout)
{
originalTimeoutPersister.Add(timeout);
}

public bool TryRemove(string timeoutId, out TimeoutData timeoutData)
{
return originalTimeoutPersister.TryRemove(timeoutId, out timeoutData);
}

public void RemoveTimeoutBy(Guid sagaId)
{
originalTimeoutPersister.RemoveTimeoutBy(sagaId);
}

public TimeoutData Peek(string timeoutId)
{
context.NumberOfProcessingAttempts++;
return originalTimeoutPersisterV2.Peek(timeoutId);
}

public bool TryRemove(string timeoutId)
{
context.AttemptedToRemoveTimeout = true;

using (var tx = new TransactionScope(TransactionScopeOption.Suppress))
{
// delete the timeout so it won't be available on retries
originalTimeoutPersisterV2.TryRemove(timeoutId);
tx.Complete();
}

return false;
}
}
}

[Serializable]
public class MyMessage : IMessage
{
}
}
}
Loading

0 comments on commit ac28c79

Please sign in to comment.