Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add session idle timeout option for session processor #18640

Merged
merged 3 commits into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ protected ServiceBusSessionProcessor() { }
public virtual int MaxConcurrentSessions { get { throw null; } }
public virtual int PrefetchCount { get { throw null; } }
public virtual Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } }
public virtual System.TimeSpan? SessionIdleTimeout { get { throw null; } }
public event System.Func<Azure.Messaging.ServiceBus.ProcessErrorEventArgs, System.Threading.Tasks.Task> ProcessErrorAsync { add { } remove { } }
public event System.Func<Azure.Messaging.ServiceBus.ProcessSessionMessageEventArgs, System.Threading.Tasks.Task> ProcessMessageAsync { add { } remove { } }
public event System.Func<Azure.Messaging.ServiceBus.ProcessSessionEventArgs, System.Threading.Tasks.Task> SessionClosingAsync { add { } remove { } }
Expand All @@ -394,6 +395,7 @@ public ServiceBusSessionProcessorOptions() { }
public int MaxConcurrentSessions { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } set { } }
public System.TimeSpan? SessionIdleTimeout { get { throw null; } set { } }
public System.Collections.Generic.IList<string> SessionIds { get { throw null; } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override bool Equals(object obj) { throw null; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public class ServiceBusProcessor : IAsyncDisposable
///
/// <value>The maximum number of concurrent calls to the message handler.</value>
public virtual int MaxConcurrentCalls { get; }
internal TimeSpan? MaxReceiveWaitTime { get; }

/// <summary>
/// Gets a value that indicates whether the processor should automatically
Expand Down Expand Up @@ -200,6 +201,7 @@ internal ServiceBusProcessor(
PrefetchCount = _options.PrefetchCount;
MaxAutoLockRenewalDuration = _options.MaxAutoLockRenewalDuration;
MaxConcurrentCalls = _options.MaxConcurrentCalls;
MaxReceiveWaitTime = _options.MaxReceiveWaitTime;
MaxConcurrentSessions = maxConcurrentSessions;
MaxConcurrentCallsPerSession = maxConcurrentCallsPerSession;
_sessionIds = sessionIds ?? Array.Empty<string>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ public class ServiceBusSessionProcessor : IAsyncDisposable
/// <inheritdoc cref="ServiceBusProcessor.FullyQualifiedNamespace"/>
public virtual string FullyQualifiedNamespace => _innerProcessor.FullyQualifiedNamespace;

/// <summary>
/// Gets the maximum amount of time to wait for a message to be received for the
/// currently active session. After this time has elapsed, the processor will close the session
/// and attempt to process another session.
/// If not specified, the <see cref="ServiceBusRetryOptions.TryTimeout"/> will be used.
/// </summary>
public virtual TimeSpan? SessionIdleTimeout => _innerProcessor.MaxReceiveWaitTime;

internal ServiceBusSessionProcessor(
ServiceBusConnection connection,
string entityPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,31 +63,32 @@ public TimeSpan MaxAutoLockRenewalDuration
private TimeSpan _maxAutoRenewDuration = TimeSpan.FromMinutes(5);

/// <summary>
/// Gets or sets the maximum amount of time to wait for each Receive call using the processor's underlying
/// receiver.
/// Gets or sets the maximum amount of time to wait for a message to be received for the
/// currently active session. After this time has elapsed, the processor will close the session
/// and attempt to process another session.
/// If not specified, the <see cref="ServiceBusRetryOptions.TryTimeout"/> will be used.
/// </summary>
///
/// <remarks>If no message is returned for a call
/// to Receive, a new session will be requested by the processor.
/// Hence, if this value is set to be too low, it could cause new sessions to be requested
/// more often than necessary.
/// <remarks>
/// If <see cref="SessionIds"/> is populated and <see cref="MaxConcurrentSessions"/> is greater or equal to
/// the number of sessions specified in <see cref="SessionIds"/>, the session will not be closed when the idle timeout elapses.
/// However, it will still control the amount of time each receive call waits.
/// </remarks>
internal TimeSpan? MaxReceiveWaitTime
public TimeSpan? SessionIdleTimeout
{
get => _maxReceiveWaitTime;
get => _sessionIdleTimeout;

set
{
if (value.HasValue)
{
Argument.AssertPositive(value.Value, nameof(MaxReceiveWaitTime));
Argument.AssertPositive(value.Value, nameof(SessionIdleTimeout));
}

_maxReceiveWaitTime = value;
_sessionIdleTimeout = value;
}
}
private TimeSpan? _maxReceiveWaitTime;
private TimeSpan? _sessionIdleTimeout;

/// <summary>
/// Gets or sets the maximum number of sessions that can be processed concurrently by the processor.
Expand Down Expand Up @@ -167,7 +168,7 @@ internal ServiceBusProcessorOptions ToProcessorOptions() =>
PrefetchCount = PrefetchCount,
AutoCompleteMessages = AutoCompleteMessages,
MaxAutoLockRenewalDuration = MaxAutoLockRenewalDuration,
MaxReceiveWaitTime = MaxReceiveWaitTime
MaxReceiveWaitTime = SessionIdleTimeout
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,20 @@ private string GetConnectionString() =>
TestEnvironment.ServiceBusConnectionString :
TestEnvironment.OverrideServiceBusConnectionString;

private ServiceBusAdministrationClient GetClient() =>
private ServiceBusAdministrationClient CreateClient() =>
InstrumentClient(
new ServiceBusAdministrationClient(
GetConnectionString(),
InstrumentClientOptions(new ServiceBusAdministrationClientOptions())));

private ServiceBusAdministrationClient GetAADClient() =>
private ServiceBusAdministrationClient CreateAADClient() =>
InstrumentClient(
new ServiceBusAdministrationClient(
TestEnvironment.FullyQualifiedNamespace,
GetTokenCredential(),
InstrumentClientOptions(new ServiceBusAdministrationClientOptions())));

private ServiceBusAdministrationClient GetSharedKeyTokenClient()
private ServiceBusAdministrationClient CreateSharedKeyTokenClient()
{
var properties = ServiceBusConnectionStringProperties.Parse(GetConnectionString());
var credential = new ServiceBusSharedAccessKeyCredential(properties.SharedAccessKeyName, properties.SharedAccessKey);
Expand All @@ -58,7 +58,7 @@ private ServiceBusAdministrationClient GetSharedKeyTokenClient()
public async Task BasicQueueCrudOperations()
{
var queueName = nameof(BasicQueueCrudOperations).ToLower() + Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var client = GetClient();
var client = CreateClient();

var queueOptions = new CreateQueueOptions(queueName)
{
Expand Down Expand Up @@ -169,7 +169,7 @@ await client.GetQueueAsync(queueOptions.Name),
public async Task BasicTopicCrudOperations()
{
var topicName = nameof(BasicTopicCrudOperations).ToLower() + Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var client = GetClient();
var client = CreateClient();

var options = new CreateTopicOptions(topicName)
{
Expand Down Expand Up @@ -267,7 +267,7 @@ public async Task BasicSubscriptionCrudOperations()
var topicName = nameof(BasicSubscriptionCrudOperations).ToLower() + Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var subscriptionName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);

var client = GetClient();
var client = CreateClient();

await client.CreateTopicAsync(topicName);

Expand Down Expand Up @@ -338,7 +338,7 @@ public async Task BasicRuleCrudOperations()
{
var topicName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var subscriptionName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var client = GetClient();
var client = CreateClient();
await client.CreateTopicAsync(topicName);

var rule1 = new CreateRuleOptions
Expand Down Expand Up @@ -426,7 +426,7 @@ await client.GetRuleAsync(topicName, subscriptionName, "rule1"),
public async Task GetQueueRuntimeInfo()
{
var queueName = nameof(GetQueueRuntimeInfo).ToLower() + Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var mgmtClient = GetClient();
var mgmtClient = CreateClient();
await using var sbClient = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);

QueueProperties queue = await mgmtClient.CreateQueueAsync(queueName);
Expand Down Expand Up @@ -486,7 +486,7 @@ public async Task GetSubscriptionRuntimeInfoTest()
{
var topicName = nameof(GetSubscriptionRuntimeInfoTest).ToLower() + Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var subscriptionName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var client = GetClient();
var client = CreateClient();
await using var sbClient = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);

await client.CreateTopicAsync(topicName);
Expand Down Expand Up @@ -578,7 +578,7 @@ public async Task GetTopicRuntimeInfo()
{
var topicName = nameof(GetTopicRuntimeInfo).ToLower() + Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var subscriptionName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var client = GetClient();
var client = CreateClient();

await client.CreateTopicAsync(topicName);

Expand Down Expand Up @@ -620,7 +620,7 @@ public async Task GetTopicRuntimeInfo()
[Test]
public async Task ThrowsIfEntityDoesNotExist()
{
var client = GetClient();
var client = CreateClient();

Assert.That(
async () =>
Expand Down Expand Up @@ -701,7 +701,7 @@ await client.GetTopicAsync(queueName),
[Test]
public async Task ThrowsIfEntityAlreadyExists()
{
var client = GetClient();
var client = CreateClient();
var queueName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var topicName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var subscriptionName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);
Expand Down Expand Up @@ -737,7 +737,7 @@ public async Task ForwardingEntity()
var queueName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var destinationName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var dlqDestinationName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var mgmtClient = GetClient();
var mgmtClient = CreateClient();

await mgmtClient.CreateQueueAsync(dlqDestinationName);
await mgmtClient.CreateQueueAsync(
Expand Down Expand Up @@ -776,7 +776,7 @@ await mgmtClient.CreateQueueAsync(
[Test]
public async Task SqlFilterParams()
{
var client = GetClient();
var client = CreateClient();
var topicName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var subscriptionName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);

Expand Down Expand Up @@ -815,7 +815,7 @@ public async Task CorrelationFilterProperties()
{
var topicName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var subscriptionName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var client = GetClient();
var client = CreateClient();

await client.CreateTopicAsync(topicName);
await client.CreateSubscriptionAsync(topicName, subscriptionName);
Expand All @@ -838,7 +838,7 @@ public async Task CorrelationFilterProperties()
[Test]
public async Task GetNamespaceProperties()
{
var client = GetClient();
var client = CreateClient();

NamespaceProperties nsInfo = await client.GetNamespacePropertiesAsync();
Assert.NotNull(nsInfo);
Expand All @@ -851,7 +851,7 @@ public async Task AuthenticateWithAAD()
{
var queueName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var topicName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var client = GetAADClient();
var client = CreateAADClient();

var queueOptions = new CreateQueueOptions(queueName);
QueueProperties createdQueue = await client.CreateQueueAsync(queueOptions);
Expand All @@ -872,7 +872,7 @@ public async Task AuthenticateWithSharedKeyCredential()
{
var queueName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var topicName = Recording.Random.NewGuid().ToString("D").Substring(0, 8);
var client = GetSharedKeyTokenClient();
var client = CreateSharedKeyTokenClient();

var queueOptions = new CreateQueueOptions(queueName);
QueueProperties createdQueue = await client.CreateQueueAsync(queueOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public async Task SessionProcessorActivities()
new ServiceBusSessionProcessorOptions
{
AutoCompleteMessages = false,
MaxReceiveWaitTime = TimeSpan.FromSeconds(10),
SessionIdleTimeout = TimeSpan.FromSeconds(10),
MaxConcurrentSessions = 1
});
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public async Task LogsEvents()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = GetNoRetryClient();
await using var client = CreateNoRetryClient();
_listener.SingleEventById(ServiceBusEventSource.ClientCreateStartEvent, e => e.Payload.Contains(nameof(ServiceBusClient)) && e.Payload.Contains(client.FullyQualifiedNamespace));
var messageCount = 10;

Expand Down Expand Up @@ -134,7 +134,7 @@ public async Task LogsSessionEvents()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
await using var client = GetNoRetryClient();
await using var client = CreateNoRetryClient();
_listener.SingleEventById(ServiceBusEventSource.ClientCreateStartEvent, e => e.Payload.Contains(nameof(ServiceBusClient)) && e.Payload.Contains(client.FullyQualifiedNamespace));
var messageCount = 10;

Expand Down Expand Up @@ -282,7 +282,7 @@ public async Task LogsProcessorEvents()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = GetClient();
await using var client = CreateClient();
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(GetMessage());
await using var processor = client.CreateProcessor(scope.QueueName);
Expand Down Expand Up @@ -315,7 +315,7 @@ public async Task LogsProcessorExceptionEvent()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = GetClient();
await using var client = CreateClient();
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(GetMessage());
await using var processor = client.CreateProcessor(scope.QueueName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public abstract class ServiceBusLiveTestBase : ServiceBusTestBase
{
public ServiceBusTestEnvironment TestEnvironment { get; } = ServiceBusTestEnvironment.Instance;

protected ServiceBusClient GetNoRetryClient()
protected ServiceBusClient CreateNoRetryClient()
{
var options =
new ServiceBusClientOptions
Expand All @@ -29,7 +29,7 @@ protected ServiceBusClient GetNoRetryClient()
options);
}

protected ServiceBusClient GetClient(int tryTimeout = 15)
protected ServiceBusClient CreateClient(int tryTimeout = 15)
{
var retryOptions = new ServiceBusRetryOptions();
if (tryTimeout != default)
Expand Down
Loading