Skip to content

Commit

Permalink
Service Bus trigger should ignore scheduled/deferred messages (Azure#…
Browse files Browse the repository at this point in the history
…25127)

* Better tests for QueueTime

* Test to verify scheduled messages are ignored

* Ignore scheduled messages for scaling and queue time

* Add additional verification test for metrics

* Extract into method with better name

* Add ability to extract the state from the annotated message

* Test helper

* Modifiers

* Adjust tests

* Change implementation

* Switch tests to assume peeking

* Naive peeking

* Switch to single peek and storing sequence number

* Make sure to not run into actual admin client operations that time out and slow down the tests

* More test scenarios for new algorithm

* New algorithm as discussed

* Better variable names

* Fix styling

* Upgrade to Azure.Messaging.ServiceBus 7.6.0

* Review comments

* Update sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusScaleMonitor.cs

Co-authored-by: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
  • Loading branch information
danielmarbach and JoshLove-msft authored Feb 14, 2022
1 parent 94830e4 commit 372bf0a
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 16 deletions.
2 changes: 1 addition & 1 deletion eng/Packages.Data.props
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
<PackageReference Update="Azure.Data.Tables" Version="12.4.0" />
<PackageReference Update="Azure.Messaging.EventHubs" Version="5.6.2" />
<PackageReference Update="Azure.Messaging.EventGrid" Version="4.8.1" />
<PackageReference Update="Azure.Messaging.ServiceBus" Version="7.5.1" />
<PackageReference Update="Azure.Messaging.ServiceBus" Version="7.6.0" />
<PackageReference Update="Azure.Messaging.WebPubSub" Version="1.0.0" />
<PackageReference Update="Azure.Identity" Version="1.5.0" />
<PackageReference Update="Azure.Security.KeyVault.Secrets" Version="4.2.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ internal ServiceBusClient CreateClientFromSetting(string connection)
: _messagingProvider.CreateClient(connectionInfo.FullyQualifiedNamespace, connectionInfo.Credential, _options.ToClientOptions());
}

internal ServiceBusAdministrationClient CreateAdministrationClient(string connection)
internal virtual ServiceBusAdministrationClient CreateAdministrationClient(string connection)
{
var connectionInfo = ResolveConnectionInformation(connection);
if (connectionInfo.ConnectionString != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,48 @@ async Task<ScaleMetrics> IScaleMonitor.GetMetricsAsync()

public async Task<ServiceBusTriggerMetrics> GetMetricsAsync()
{
ServiceBusReceivedMessage message = null;
ServiceBusReceivedMessage activeMessage = null;
string entityName = _serviceBusEntityType == ServiceBusEntityType.Queue ? "queue" : "topic";

try
{
// Peek the first message in the queue without removing it from the queue
// PeekAsync remembers the sequence number of the last message, so the second call returns the second message instead of the first one
// Use PeekBySequenceNumberAsync with fromSequenceNumber = 0 to always get the first available message
message = await _receiver.Value.PeekMessageAsync(fromSequenceNumber: 0).ConfigureAwait(false);
// Do a first attempt to peek one message from the head of the queue
var peekedMessage = await _receiver.Value.PeekMessageAsync(fromSequenceNumber: 0).ConfigureAwait(false);
if (peekedMessage == null)
{
// ignore it. The Get[Queue|Topic]MetricsAsync methods deal with activeMessage being null
}
else if (MessageIsActive(peekedMessage))
{
activeMessage = peekedMessage;
}
else
{
// Do another attempt to peek ten message from last peek sequence number
var peekedMessages = await _receiver.Value.PeekMessagesAsync(10, fromSequenceNumber: peekedMessage.SequenceNumber).ConfigureAwait(false);
foreach (var receivedMessage in peekedMessages )
{
if (MessageIsActive(receivedMessage))
{
activeMessage = receivedMessage;
break;
}
}

// Batch contains messages but none are active in the peeked batch
if (peekedMessages.Count > 0 && activeMessage == null)
{
_logger.LogDebug("{_serviceBusEntityType} {_entityPath} contains multiple messages but none are active in the peeked batch.");
}
}

if (_serviceBusEntityType == ServiceBusEntityType.Queue)
{
return await GetQueueMetricsAsync(message).ConfigureAwait(false);
return await GetQueueMetricsAsync(activeMessage).ConfigureAwait(false);
}
else
{
return await GetTopicMetricsAsync(message).ConfigureAwait(false);
return await GetTopicMetricsAsync(activeMessage).ConfigureAwait(false);
}
}
catch (ServiceBusException ex)
Expand All @@ -95,7 +120,7 @@ public async Task<ServiceBusTriggerMetrics> GetMetricsAsync()
}

// Path for connection strings with no manage claim
return CreateTriggerMetrics(message, 0, 0, 0, _isListeningOnDeadLetterQueue);
return CreateTriggerMetrics(activeMessage, 0, 0, 0, _isListeningOnDeadLetterQueue);
}

private async Task<ServiceBusTriggerMetrics> GetQueueMetricsAsync(ServiceBusReceivedMessage message)
Expand Down Expand Up @@ -163,6 +188,11 @@ internal static ServiceBusTriggerMetrics CreateTriggerMetrics(ServiceBusReceived
};
}

private static bool MessageIsActive(ServiceBusReceivedMessage message)
{
return message.State == ServiceBusMessageState.Active;
}

ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context)
{
return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast<ServiceBusTriggerMetrics>().ToArray());
Expand Down
Loading

0 comments on commit 372bf0a

Please sign in to comment.