Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outbox cleanup circuit breaker #242

Merged
merged 3 commits into from
Dec 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/NServiceBus.NHibernate/NServiceBus.NHibernate.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
<Compile Include="Outbox\IOutboxRecord.cs" />
<Compile Include="Outbox\OutboxConfig.cs" />
<Compile Include="Outbox\OutboxRecord.cs" />
<Compile Include="RepeatedFailuresOverTimeCircuitBreaker.cs" />
<Compile Include="SynchronizedStorage\INHibernateSynchronizedStorageSession.cs" />
<Compile Include="SynchronizedStorage\NHibernateAmbientTransactionSynchronizedStorageSession.cs" />
<Compile Include="Obsoletes\NHibernateStorageContext.cs" />
Expand Down
42 changes: 38 additions & 4 deletions src/NServiceBus.NHibernate/Outbox/OutboxCleaner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ namespace NServiceBus.Features

class OutboxCleaner : FeatureStartupTask
{
public OutboxCleaner(INHibernateOutboxStorage outboxPersister)
public OutboxCleaner(INHibernateOutboxStorage outboxPersister, CriticalError criticalError)
{
this.outboxPersister = outboxPersister;
this.criticalError = criticalError;
}

protected override Task OnStart(IMessageSession busSession)
Expand All @@ -29,14 +30,31 @@ protected override Task OnStart(IMessageSession busSession)

if (configValue == null)
{
frequencyToRunDeduplicationDataCleanup = TimeSpan.FromMinutes(1);
frequencyToRunDeduplicationDataCleanup = DefaultFrequencyToRunDeduplicationDataCleanup;
}
else if (!TimeSpan.TryParse(configValue, out frequencyToRunDeduplicationDataCleanup))
{
throw new Exception("Invalid value in \"NServiceBus/Outbox/NHibernate/FrequencyToRunDeduplicationDataCleanup\" AppSetting. Please ensure it is a TimeSpan.");
}

cleanupTimer = new Timer(PerformCleanup, null, TimeSpan.FromMinutes(1), frequencyToRunDeduplicationDataCleanup);
configValue = ConfigurationManager.AppSettings.Get("NServiceBus/Outbox/NHibernate/TimeToWaitBeforeTriggeringCriticalErrorWhenCleanupTaskFails");

if (configValue == null)
{
timeToWaitBeforeTriggeringCriticalError = DefaultTimeToWaitBeforeTriggeringCriticalError;
}
else if (!TimeSpan.TryParse(configValue, out timeToWaitBeforeTriggeringCriticalError))
{
throw new Exception("Invalid value in \"NServiceBus/Outbox/NHibernate/TimeToWaitBeforeTriggeringCriticalErrorWhenCleanupTaskFails\" AppSetting. Please ensure it is a TimeSpan.");
}

circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
"OutboxCleanupTaskConnectivity",
timeToWaitBeforeTriggeringCriticalError,
ex => criticalError.Raise("Failed to clean the Oubox.", ex)
);

cleanupTimer = new Timer(PerformCleanup, null, OutboxCleanupStartupDelay, frequencyToRunDeduplicationDataCleanup);

return Task.FromResult(true);
}
Expand All @@ -55,14 +73,30 @@ protected override Task OnStop(IMessageSession busSession)

void PerformCleanup(object state)
{
outboxPersister.RemoveEntriesOlderThan(DateTime.UtcNow - timeToKeepDeduplicationData);
try
{
outboxPersister.RemoveEntriesOlderThan(DateTime.UtcNow - timeToKeepDeduplicationData);
circuitBreaker.Success();
}
catch (Exception ex)
{
circuitBreaker.Failure(ex);
}
}

RepeatedFailuresOverTimeCircuitBreaker circuitBreaker;

// ReSharper disable NotAccessedField.Local
Timer cleanupTimer;
// ReSharper restore NotAccessedField.Local
CriticalError criticalError;
TimeSpan frequencyToRunDeduplicationDataCleanup;
INHibernateOutboxStorage outboxPersister;
TimeSpan timeToKeepDeduplicationData;
TimeSpan timeToWaitBeforeTriggeringCriticalError;

static readonly TimeSpan DefaultFrequencyToRunDeduplicationDataCleanup = TimeSpan.FromMinutes(1);
static readonly TimeSpan DefaultTimeToWaitBeforeTriggeringCriticalError = TimeSpan.FromMinutes(2);
static readonly TimeSpan OutboxCleanupStartupDelay = TimeSpan.FromMinutes(1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
namespace NServiceBus
{
using System;
using System.Threading;
using Logging;

class RepeatedFailuresOverTimeCircuitBreaker : IDisposable
{
public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering, Action<Exception> triggerAction)
{
this.name = name;
this.triggerAction = triggerAction;
this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering;

timer = new Timer(CircuitBreakerTriggered);
}

public void Success()
{
var oldValue = Interlocked.Exchange(ref failureCount, 0);

if (oldValue == 0)
{
return;
}

timer.Change(System.Threading.Timeout.Infinite, System.Threading.Timeout.Infinite);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit hard to read and capture the intent. Would it be a good idea to have a properly named/intent capturing private method? This is subjective of course so feel free to ignore if you disagree.

Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name);
}

public void Failure(Exception exception)
{
lastException = exception;
var newValue = Interlocked.Increment(ref failureCount);

if (newValue == 1)
{
timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the above.

Logger.WarnFormat("The circuit breaker for {0} is now in the armed state", name);
}
}

public void Dispose()
{
//Injected
}

void CircuitBreakerTriggered(object state)
{
if (Interlocked.Read(ref failureCount) > 0)
{
Logger.WarnFormat("The circuit breaker for {0} will now be triggered", name);
triggerAction(lastException);
}
}

long failureCount;
Exception lastException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the last exception enough? or should we go with an AggregateException instead? That'd require a bit more code to ensure exceptions are aggregated correctly and cleared when successful.


string name;
Timer timer;
TimeSpan timeToWaitBeforeTriggering;
Action<Exception> triggerAction;

static TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the same as Timeout.Infinite?

static ILog Logger = LogManager.GetLogger<RepeatedFailuresOverTimeCircuitBreaker>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ internal NHibernateStorageSession()
s.SetDefault(OutboxMappingSettingsKey, typeof(OutboxRecordMapping));
});

// since the installers are registered even if the feature isn't enabled we need to make
// since the installers are registered even if the feature isn't enabled we need to make
// this a no-op of there is no "schema updater" available
Defaults(c => c.Set<Installer.SchemaUpdater>(new Installer.SchemaUpdater()));
}
Expand Down Expand Up @@ -64,7 +64,7 @@ protected override void Setup(FeatureConfigurationContext context)
var factory = context.Settings.Get<IOutboxPersisterFactory>();
var persister = factory.Create(sessionFactory, context.Settings.EndpointName());
context.Container.ConfigureComponent(b => persister, DependencyLifecycle.SingleInstance);
context.RegisterStartupTask(b => new OutboxCleaner(persister));
context.RegisterStartupTask(b => new OutboxCleaner(persister, b.Build<CriticalError>()));
}

var runInstaller = context.Settings.Get<bool>("NHibernate.Common.AutoUpdateSchema");
Expand Down Expand Up @@ -104,4 +104,4 @@ void ApplyMappings(Configuration config, FeatureConfigurationContext context)
config.AddMapping(mapper.CompileMappingForAllExplicitlyAddedEntities());
}
}
}
}