Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add session idle timeout option for session processor #18640

Merged
merged 3 commits into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ protected ServiceBusSessionProcessor() { }
public virtual int MaxConcurrentSessions { get { throw null; } }
public virtual int PrefetchCount { get { throw null; } }
public virtual Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } }
public virtual System.TimeSpan? SessionIdleTimeout { get { throw null; } }
public event System.Func<Azure.Messaging.ServiceBus.ProcessErrorEventArgs, System.Threading.Tasks.Task> ProcessErrorAsync { add { } remove { } }
public event System.Func<Azure.Messaging.ServiceBus.ProcessSessionMessageEventArgs, System.Threading.Tasks.Task> ProcessMessageAsync { add { } remove { } }
public event System.Func<Azure.Messaging.ServiceBus.ProcessSessionEventArgs, System.Threading.Tasks.Task> SessionClosingAsync { add { } remove { } }
Expand All @@ -394,6 +395,7 @@ public ServiceBusSessionProcessorOptions() { }
public int MaxConcurrentSessions { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } set { } }
public System.TimeSpan? SessionIdleTimeout { get { throw null; } set { } }
public System.Collections.Generic.IList<string> SessionIds { get { throw null; } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override bool Equals(object obj) { throw null; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public class ServiceBusProcessor : IAsyncDisposable
///
/// <value>The maximum number of concurrent calls to the message handler.</value>
public virtual int MaxConcurrentCalls { get; }
internal TimeSpan? MaxReceiveWaitTime { get; }

/// <summary>
/// Gets a value that indicates whether the processor should automatically
Expand Down Expand Up @@ -200,6 +201,7 @@ internal ServiceBusProcessor(
PrefetchCount = _options.PrefetchCount;
MaxAutoLockRenewalDuration = _options.MaxAutoLockRenewalDuration;
MaxConcurrentCalls = _options.MaxConcurrentCalls;
MaxReceiveWaitTime = _options.MaxReceiveWaitTime;
MaxConcurrentSessions = maxConcurrentSessions;
MaxConcurrentCallsPerSession = maxConcurrentCallsPerSession;
_sessionIds = sessionIds ?? Array.Empty<string>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ public class ServiceBusSessionProcessor : IAsyncDisposable
/// <inheritdoc cref="ServiceBusProcessor.FullyQualifiedNamespace"/>
public virtual string FullyQualifiedNamespace => _innerProcessor.FullyQualifiedNamespace;

/// <summary>
/// Gets the maximum amount of time the processor will wait before switching to a new session
/// after no messages have been received on the session that is currently being processed.
/// If not specified, the <see cref="ServiceBusRetryOptions.TryTimeout"/> will be used.
/// </summary>
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
public virtual TimeSpan? SessionIdleTimeout => _innerProcessor.MaxReceiveWaitTime;

internal ServiceBusSessionProcessor(
ServiceBusConnection connection,
string entityPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,31 +63,31 @@ public TimeSpan MaxAutoLockRenewalDuration
private TimeSpan _maxAutoRenewDuration = TimeSpan.FromMinutes(5);

/// <summary>
/// Gets or sets the maximum amount of time to wait for each Receive call using the processor's underlying
/// receiver.
/// Gets or sets the maximum amount of time to wait before switching to a new session
/// after no messages have been received on the session that is currently being processed.
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
/// If not specified, the <see cref="ServiceBusRetryOptions.TryTimeout"/> will be used.
/// </summary>
///
/// <remarks>If no message is returned for a call
/// to Receive, a new session will be requested by the processor.
/// Hence, if this value is set to be too low, it could cause new sessions to be requested
/// more often than necessary.
/// <remarks>
/// Note, if <see cref="SessionIds"/> is populated and <see cref="MaxConcurrentSessions"/> is greater or equal to
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
/// the number of sessions specified in <see cref="SessionIds"/>, the session will not be closed when the idle timeout elapses.
/// However, it will still control the amount of time each receive call waits.
/// </remarks>
internal TimeSpan? MaxReceiveWaitTime
public TimeSpan? SessionIdleTimeout
{
get => _maxReceiveWaitTime;
get => _sessionIdleTimeout;

set
{
if (value.HasValue)
{
Argument.AssertPositive(value.Value, nameof(MaxReceiveWaitTime));
Argument.AssertPositive(value.Value, nameof(SessionIdleTimeout));
}

_maxReceiveWaitTime = value;
_sessionIdleTimeout = value;
}
}
private TimeSpan? _maxReceiveWaitTime;
private TimeSpan? _sessionIdleTimeout;

/// <summary>
/// Gets or sets the maximum number of sessions that can be processed concurrently by the processor.
Expand Down Expand Up @@ -167,7 +167,7 @@ internal ServiceBusProcessorOptions ToProcessorOptions() =>
PrefetchCount = PrefetchCount,
AutoCompleteMessages = AutoCompleteMessages,
MaxAutoLockRenewalDuration = MaxAutoLockRenewalDuration,
MaxReceiveWaitTime = MaxReceiveWaitTime
MaxReceiveWaitTime = SessionIdleTimeout
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public async Task SessionProcessorActivities()
new ServiceBusSessionProcessorOptions
{
AutoCompleteMessages = false,
MaxReceiveWaitTime = TimeSpan.FromSeconds(10),
SessionIdleTimeout = TimeSpan.FromSeconds(10),
MaxConcurrentSessions = 1
});
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public async Task ProcessMessages(int numThreads, bool autoComplete)
enablePartitioning: false,
enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
await using var client = GetClient();
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
ServiceBusSender sender = client.CreateSender(scope.QueueName);

// use double the number of threads so we can make sure we test that we don't
Expand Down Expand Up @@ -96,7 +96,7 @@ public async Task UserSettlingWithAutoCompleteDoesNotThrow(int numThreads)
enablePartitioning: false,
enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
await using var client = GetClient();
ServiceBusSender sender = client.CreateSender(scope.QueueName);

// use double the number of threads so we can make sure we test that we don't
Expand Down Expand Up @@ -319,7 +319,7 @@ public async Task CanStopProcessingFromHandler(int numThreads)
enablePartitioning: false,
enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
await using var client = GetClient();
ServiceBusSender sender = client.CreateSender(scope.QueueName);
int numMessages = 100;
using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync();
Expand Down Expand Up @@ -370,7 +370,7 @@ public async Task OnMessageExceptionHandlerCalled()
{
var invalidQueueName = "nonexistentqueuename";
var exceptionReceivedHandlerCalled = false;
var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
await using var client = GetClient();
ServiceBusProcessor processor = client.CreateProcessor(invalidQueueName);
TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
processor.ProcessMessageAsync += ProcessMessage;
Expand Down Expand Up @@ -424,7 +424,7 @@ Task ProcessErrors(ProcessErrorEventArgs args)
public async Task StartStopMultipleTimes()
{
var invalidQueueName = "nonexistentqueuename";
var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
await using var client = GetClient();
await using ServiceBusProcessor processor = client.CreateProcessor(invalidQueueName);
TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
processor.ProcessMessageAsync += eventArgs => Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public void ProcessorOptionsSetOnClient()
Assert.AreEqual(options.PrefetchCount, processor.PrefetchCount);
Assert.AreEqual(options.ReceiveMode, processor.ReceiveMode);
Assert.AreEqual(options.MaxAutoLockRenewalDuration, processor.MaxAutoLockRenewalDuration);
Assert.AreEqual(options.MaxReceiveWaitTime, processor.MaxReceiveWaitTime);
Assert.AreEqual(fullyQualifiedNamespace, processor.FullyQualifiedNamespace);
Assert.IsFalse(processor.IsClosed);
Assert.IsFalse(processor.IsProcessing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ public async Task OnSessionExceptionHandlerCalledWhenRegisteredOnNonSessionfulQu
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
var exceptionReceivedHandlerCalled = false;
var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
await using var client = GetClient();

await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
{
Expand Down Expand Up @@ -472,7 +472,7 @@ public async Task OnSessionExceptionHandlerCalledWhenRegisteredOnNonSessionfulTo
await using (var scope = await ServiceBusScope.CreateWithTopic(enablePartitioning: false, enableSession: false))
{
var exceptionReceivedHandlerCalled = false;
var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
await using var client = GetClient();
await using var processor = client.CreateSessionProcessor(scope.TopicName, scope.SubscriptionNames.First(), new ServiceBusSessionProcessorOptions
{
MaxConcurrentSessions = 1
Expand Down Expand Up @@ -611,7 +611,8 @@ public async Task AutoLockRenewalWorks(int numThreads, int maxCallsPerSession)
{
MaxConcurrentSessions = numThreads,
AutoCompleteMessages = false,
MaxConcurrentCallsPerSession = maxCallsPerSession
MaxConcurrentCallsPerSession = maxCallsPerSession,
SessionIdleTimeout = TimeSpan.FromSeconds(30)
};
await using var processor = client.CreateSessionProcessor(scope.QueueName, options);
int messageCt = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public void ProcessorOptionsSetOnClient()
Assert.AreEqual(options.PrefetchCount, processor.PrefetchCount);
Assert.AreEqual(options.ReceiveMode, processor.ReceiveMode);
Assert.AreEqual(options.MaxAutoLockRenewalDuration, processor.MaxAutoLockRenewalDuration);
Assert.AreEqual(options.SessionIdleTimeout, processor.SessionIdleTimeout);
Assert.AreEqual(fullyQualifiedNamespace, processor.FullyQualifiedNamespace);
Assert.IsFalse(processor.IsClosed);
Assert.IsFalse(processor.IsProcessing);
Expand All @@ -181,15 +182,15 @@ public void ProcessorOptionsValidation()
() => options.MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(-1),
Throws.InstanceOf<ArgumentOutOfRangeException>());
Assert.That(
() => options.MaxReceiveWaitTime = TimeSpan.FromSeconds(0),
() => options.SessionIdleTimeout = TimeSpan.FromSeconds(0),
Throws.InstanceOf<ArgumentOutOfRangeException>());
Assert.That(
() => options.MaxReceiveWaitTime = TimeSpan.FromSeconds(-1),
() => options.SessionIdleTimeout = TimeSpan.FromSeconds(-1),
Throws.InstanceOf<ArgumentOutOfRangeException>());

// should not throw
options.PrefetchCount = 0;
options.MaxReceiveWaitTime = TimeSpan.FromSeconds(1);
options.SessionIdleTimeout = TimeSpan.FromSeconds(1);
options.MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(0);
}

Expand Down