Skip to content

Commit

Permalink
[ServiceBus] ServicebusRetryPolicy remove locking (Azure#31206)
Browse files Browse the repository at this point in the history
* Remove the lock from the retry policy

* Method visibility

* Switch to simpler implementation

* Formatting

* Allow controlling the server busy base sleep time

* A few tests to get started

* Rename field

* Greater than zero
  • Loading branch information
danielmarbach authored Sep 20, 2022
1 parent 3968018 commit f5b32f9
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,30 @@ namespace Azure.Messaging.ServiceBus
///
public abstract class ServiceBusRetryPolicy
{
private static readonly TimeSpan ServerBusyBaseSleepTime = TimeSpan.FromSeconds(10);

private readonly object serverBusyLock = new object();
/// <summary>
/// Represents a state flag that is used to make sure the server busy value can be observed with
/// reasonable fresh values without having to acquire a lock.
/// </summary>
private volatile int _serverBusyState;

// This is a volatile copy of IsServerBusy. IsServerBusy is synchronized with a lock, whereas encounteredServerBusy is kept volatile for performance reasons.
private volatile bool encounteredServerBusy;
private const int ServerNotBusyState = 0; // default value of serverBusy
private const int ServerBusyState = 1;

/// <summary>
/// Determines whether or not the server returned a busy error.
/// </summary>
private bool IsServerBusy { get; set; }
internal bool IsServerBusy => _serverBusyState == ServerBusyState;

/// <summary>
/// Gets the exception message when a server busy error is returned.
/// </summary>
private string ServerBusyExceptionMessage { get; set; }
internal string ServerBusyExceptionMessage { get; set; }

/// <summary>
/// Gets the server busy base sleep time
/// </summary>
/// <remarks>Defaults to TimeSpan.FromSeconds(10)</remarks>
internal TimeSpan ServerBusyBaseSleepTime { get; set; } = TimeSpan.FromSeconds(10);

/// <summary>
/// The instance of <see cref="ServiceBusEventSource" /> which can be mocked for testing.
Expand Down Expand Up @@ -144,14 +152,13 @@ internal async ValueTask<TResult> RunOperation<T1, TResult>(
{
return await operation(t1, tryTimeout, cancellationToken).ConfigureAwait(false);
}

catch (Exception ex)
{
Exception activeEx = AmqpExceptionHelper.TranslateException(ex);

if (activeEx is ServiceBusException { Reason: ServiceBusFailureReason.ServiceBusy })
{
SetServerBusy(activeEx.Message);
SetServerBusy(activeEx.Message, cancellationToken);
}

// Determine if there should be a retry for the next attempt; if so enforce the delay but do not quit the loop.
Expand Down Expand Up @@ -185,49 +192,42 @@ internal async ValueTask<TResult> RunOperation<T1, TResult>(
throw new TaskCanceledException();
}

internal void SetServerBusy(string exceptionMessage)
private void SetServerBusy(string exceptionMessage, CancellationToken cancellationToken)
{
// multiple call to this method will not prolong the timer.
if (encounteredServerBusy)
if (_serverBusyState == ServerBusyState)
{
return;
}

lock (serverBusyLock)
{
if (!encounteredServerBusy)
{
encounteredServerBusy = true;
ServerBusyExceptionMessage = string.IsNullOrWhiteSpace(exceptionMessage) ?
Resources.DefaultServerBusyException : exceptionMessage;
IsServerBusy = true;
_ = ScheduleResetServerBusy();
}
}
ServerBusyExceptionMessage = string.IsNullOrWhiteSpace(exceptionMessage) ?
Resources.DefaultServerBusyException : exceptionMessage;
Interlocked.Exchange(ref _serverBusyState, ServerBusyState);
_ = ScheduleResetServerBusy(cancellationToken);
}

internal void ResetServerBusy()
private void ResetServerBusy()
{
if (!encounteredServerBusy)
if (_serverBusyState == ServerNotBusyState)
{
return;
}

lock (serverBusyLock)
{
if (encounteredServerBusy)
{
encounteredServerBusy = false;
ServerBusyExceptionMessage = Resources.DefaultServerBusyException;
IsServerBusy = false;
}
}
ServerBusyExceptionMessage = Resources.DefaultServerBusyException;
Interlocked.Exchange(ref _serverBusyState, ServerNotBusyState);
}

private async Task ScheduleResetServerBusy()
private async Task ScheduleResetServerBusy(CancellationToken cancellationToken)
{
await Task.Delay(ServerBusyBaseSleepTime).ConfigureAwait(false);
ResetServerBusy();
try
{
await Task.Delay(ServerBusyBaseSleepTime, cancellationToken).ConfigureAwait(false);
ResetServerBusy();
}
catch (OperationCanceledException)
{
// ignored
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;

namespace Azure.Messaging.ServiceBus.Tests
{
/// <summary>
/// The suite of tests for the <see cref="ServiceBusRetryPolicy" />
/// class.
/// </summary>
///
[TestFixture]
public class ServiceBusRetryPolicyTests
{
[Test]
public void HasDefaultValues()
{
var policy = new MockServiceBusRetryPolicy();

Assert.That(policy.IsServerBusy, Is.False);
Assert.That(policy.ServerBusyBaseSleepTime, Is.GreaterThan(TimeSpan.FromSeconds(0)));
Assert.That(policy.ServerBusyExceptionMessage, Is.Null);
}

[Test]
public async Task ExecutesRunOperationWithState()
{
var policy = new MockServiceBusRetryPolicy();

var operationResult = await policy.RunOperation((state, timeout, token) => new ValueTask<bool>(!state), false, null,
CancellationToken.None);

Assert.That(operationResult, Is.True);
}

[Test]
public void SetsServerBusy()
{
var policy = new MockServiceBusRetryPolicy();

Assert.ThrowsAsync<ServiceBusException>(async () =>
{
await policy.RunOperation((state, timeout, token) =>
{
throw new ServiceBusException("Busy", ServiceBusFailureReason.ServiceBusy);
}, false, null,
CancellationToken.None);
});
Assert.That(policy.IsServerBusy, Is.True);
}

[Test]
public void ResetsServerBusyAfterBaseSleepTime()
{
var policy = new MockServiceBusRetryPolicy
{
ServerBusyBaseSleepTime = TimeSpan.FromMilliseconds(10)
};

Assert.ThrowsAsync<ServiceBusException>(async () =>
{
await policy.RunOperation((state, timeout, token) =>
{
throw new ServiceBusException("Busy", ServiceBusFailureReason.ServiceBusy);
}, false, null,
CancellationToken.None);
});

var serverNoLongerBusy = SpinWait.SpinUntil(() => policy.IsServerBusy == false, TimeSpan.FromSeconds(1));
Assert.That(serverNoLongerBusy, Is.True);
}

private class MockServiceBusRetryPolicy : ServiceBusRetryPolicy
{
public override TimeSpan CalculateTryTimeout(int attemptCount)
{
return TimeSpan.Zero;
}

public override TimeSpan? CalculateRetryDelay(Exception lastException, int attemptCount)
{
return null;
}
}
}
}

0 comments on commit f5b32f9

Please sign in to comment.