diff --git a/eng/Packages.Data.props b/eng/Packages.Data.props
index 09b083b85e15..2df14a82f91b 100644
--- a/eng/Packages.Data.props
+++ b/eng/Packages.Data.props
@@ -88,7 +88,7 @@
-
+
diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusClientFactory.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusClientFactory.cs
index 3966284ffbb6..ddb981cae989 100644
--- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusClientFactory.cs
+++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusClientFactory.cs
@@ -43,7 +43,7 @@ internal ServiceBusClient CreateClientFromSetting(string connection)
: _messagingProvider.CreateClient(connectionInfo.FullyQualifiedNamespace, connectionInfo.Credential, _options.ToClientOptions());
}
- internal ServiceBusAdministrationClient CreateAdministrationClient(string connection)
+ internal virtual ServiceBusAdministrationClient CreateAdministrationClient(string connection)
{
var connectionInfo = ResolveConnectionInformation(connection);
if (connectionInfo.ConnectionString != null)
diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusScaleMonitor.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusScaleMonitor.cs
index 4cc112946928..cd399a1d2e19 100644
--- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusScaleMonitor.cs
+++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusScaleMonitor.cs
@@ -57,23 +57,48 @@ async Task IScaleMonitor.GetMetricsAsync()
public async Task GetMetricsAsync()
{
- ServiceBusReceivedMessage message = null;
+ ServiceBusReceivedMessage activeMessage = null;
string entityName = _serviceBusEntityType == ServiceBusEntityType.Queue ? "queue" : "topic";
try
{
- // Peek the first message in the queue without removing it from the queue
- // PeekAsync remembers the sequence number of the last message, so the second call returns the second message instead of the first one
- // Use PeekBySequenceNumberAsync with fromSequenceNumber = 0 to always get the first available message
- message = await _receiver.Value.PeekMessageAsync(fromSequenceNumber: 0).ConfigureAwait(false);
+ // Do a first attempt to peek one message from the head of the queue
+ var peekedMessage = await _receiver.Value.PeekMessageAsync(fromSequenceNumber: 0).ConfigureAwait(false);
+ if (peekedMessage == null)
+ {
+ // ignore it. The Get[Queue|Topic]MetricsAsync methods deal with activeMessage being null
+ }
+ else if (MessageIsActive(peekedMessage))
+ {
+ activeMessage = peekedMessage;
+ }
+ else
+ {
+ // Do another attempt to peek ten message from last peek sequence number
+ var peekedMessages = await _receiver.Value.PeekMessagesAsync(10, fromSequenceNumber: peekedMessage.SequenceNumber).ConfigureAwait(false);
+ foreach (var receivedMessage in peekedMessages )
+ {
+ if (MessageIsActive(receivedMessage))
+ {
+ activeMessage = receivedMessage;
+ break;
+ }
+ }
+
+ // Batch contains messages but none are active in the peeked batch
+ if (peekedMessages.Count > 0 && activeMessage == null)
+ {
+ _logger.LogDebug("{_serviceBusEntityType} {_entityPath} contains multiple messages but none are active in the peeked batch.");
+ }
+ }
if (_serviceBusEntityType == ServiceBusEntityType.Queue)
{
- return await GetQueueMetricsAsync(message).ConfigureAwait(false);
+ return await GetQueueMetricsAsync(activeMessage).ConfigureAwait(false);
}
else
{
- return await GetTopicMetricsAsync(message).ConfigureAwait(false);
+ return await GetTopicMetricsAsync(activeMessage).ConfigureAwait(false);
}
}
catch (ServiceBusException ex)
@@ -95,7 +120,7 @@ public async Task GetMetricsAsync()
}
// Path for connection strings with no manage claim
- return CreateTriggerMetrics(message, 0, 0, 0, _isListeningOnDeadLetterQueue);
+ return CreateTriggerMetrics(activeMessage, 0, 0, 0, _isListeningOnDeadLetterQueue);
}
private async Task GetQueueMetricsAsync(ServiceBusReceivedMessage message)
@@ -163,6 +188,11 @@ internal static ServiceBusTriggerMetrics CreateTriggerMetrics(ServiceBusReceived
};
}
+ private static bool MessageIsActive(ServiceBusReceivedMessage message)
+ {
+ return message.State == ServiceBusMessageState.Active;
+ }
+
ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context)
{
return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast().ToArray());
diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusScaleMonitorTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusScaleMonitorTests.cs
index 1c77dc1aa179..72f5cb6e7d54 100644
--- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusScaleMonitorTests.cs
+++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusScaleMonitorTests.cs
@@ -4,8 +4,10 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
+using Azure.Messaging.ServiceBus.Administration;
using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Scale;
@@ -31,6 +33,7 @@ public class ServiceBusScaleMonitorTests
private Mock _mockProvider;
private Mock _mockClientFactory;
private Mock _mockMessageProcessor;
+ private Mock _mockMessageReceiver;
private TestLoggerProvider _loggerProvider;
private LoggerFactory _loggerFactory;
private string _functionId = "test-functionid";
@@ -38,6 +41,7 @@ public class ServiceBusScaleMonitorTests
private string _testConnection = "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123=";
private string _connection = "connection";
private ServiceBusClient _client;
+ private Mock _mockAdminClient;
[SetUp]
public void Setup()
@@ -46,7 +50,9 @@ public void Setup()
_client = new ServiceBusClient(_testConnection);
ServiceBusProcessorOptions processorOptions = new ServiceBusProcessorOptions();
ServiceBusProcessor messageProcessor = _client.CreateProcessor(_entityPath);
- ServiceBusReceiver receiver = _client.CreateReceiver(_entityPath);
+
+ _mockAdminClient = new Mock(MockBehavior.Strict);
+
_mockMessageProcessor = new Mock(MockBehavior.Strict, messageProcessor);
var configuration = ConfigurationUtilities.CreateConfiguration(new KeyValuePair(_connection, _testConnection));
@@ -59,6 +65,8 @@ public void Setup()
new AzureEventSourceLogForwarder(new NullLoggerFactory()),
new OptionsWrapper(_serviceBusOptions));
+ _mockMessageReceiver = new Mock();
+
_mockProvider
.Setup(p => p.CreateMessageProcessor(_client, _entityPath, It.IsAny()))
.Returns(_mockMessageProcessor.Object);
@@ -67,6 +75,13 @@ public void Setup()
.Setup(p => p.CreateClient(_testConnection, It.IsAny()))
.Returns(_client);
+ _mockProvider
+ .Setup(p => p.CreateBatchMessageReceiver(_client, _entityPath, It.IsAny()))
+ .Returns(_mockMessageReceiver.Object);
+
+ _mockClientFactory.Setup(p => p.CreateAdministrationClient(_testConnection))
+ .Returns(_mockAdminClient.Object);
+
_loggerFactory = new LoggerFactory();
_loggerProvider = new TestLoggerProvider();
_loggerFactory.AddProvider(_loggerProvider);
@@ -102,7 +117,9 @@ public void ScaleMonitorDescriptor_ReturnsExpectedValue()
[Test]
public void GetMetrics_ReturnsExpectedResult()
{
- // Unable to test QueueTime because of restrictions on creating Message objects
+ var utcNow = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(10));
+
+ var message = ServiceBusModelFactory.ServiceBusReceivedMessage(enqueuedTime: utcNow);
// Test base case
var metrics = ServiceBusScaleMonitor.CreateTriggerMetrics(null, 0, 0, 0, false);
@@ -113,19 +130,19 @@ public void GetMetrics_ReturnsExpectedResult()
Assert.AreNotEqual(default(DateTime), metrics.Timestamp);
// Test messages on main queue
- metrics = ServiceBusScaleMonitor.CreateTriggerMetrics(null, 10, 0, 0, false);
+ metrics = ServiceBusScaleMonitor.CreateTriggerMetrics(message, 10, 0, 0, false);
Assert.AreEqual(0, metrics.PartitionCount);
Assert.AreEqual(10, metrics.MessageCount);
- Assert.AreEqual(TimeSpan.FromSeconds(0), metrics.QueueTime);
+ Assert.That(metrics.QueueTime, Is.GreaterThanOrEqualTo(TimeSpan.FromSeconds(10)));
Assert.AreNotEqual(default(DateTime), metrics.Timestamp);
// Test listening on dead letter queue
- metrics = ServiceBusScaleMonitor.CreateTriggerMetrics(null, 10, 100, 0, true);
+ metrics = ServiceBusScaleMonitor.CreateTriggerMetrics(message, 10, 100, 0, true);
Assert.AreEqual(0, metrics.PartitionCount);
Assert.AreEqual(100, metrics.MessageCount);
- Assert.AreEqual(TimeSpan.FromSeconds(0), metrics.QueueTime);
+ Assert.That(metrics.QueueTime, Is.GreaterThanOrEqualTo(TimeSpan.FromSeconds(10)));
Assert.AreNotEqual(default(DateTime), metrics.Timestamp);
// Test partitions
@@ -137,6 +154,220 @@ public void GetMetrics_ReturnsExpectedResult()
Assert.AreNotEqual(default(DateTime), metrics.Timestamp);
}
+ [Test]
+ public async Task GetMetrics_IgnoresScheduledMessages()
+ {
+ var scheduledMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(serviceBusMessageState: ServiceBusMessageState.Scheduled);
+
+ var anotherScheduledMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(serviceBusMessageState: ServiceBusMessageState.Scheduled);
+
+ _mockMessageReceiver.Setup(x => x.PeekMessageAsync(It.IsAny(), It.IsAny()))
+ .ReturnsAsync(scheduledMessage);
+
+ _mockMessageReceiver.Setup(x => x.PeekMessagesAsync(It.IsAny(), It.IsAny(), It.IsAny()))
+ .ReturnsAsync(new List { anotherScheduledMessage });
+
+ ServiceBusListener listener = CreateListener();
+
+ var metrics = await ((ServiceBusScaleMonitor)listener.GetMonitor()).GetMetricsAsync();
+
+ Assert.AreEqual(0, metrics.PartitionCount);
+ Assert.AreEqual(0, metrics.MessageCount);
+ Assert.AreEqual(TimeSpan.FromSeconds(0), metrics.QueueTime);
+ Assert.AreNotEqual(default(DateTime), metrics.Timestamp);
+ }
+
+ [Test]
+ public async Task GetMetrics_IgnoresDeferredMessages()
+ {
+ var deferredMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(serviceBusMessageState: ServiceBusMessageState.Deferred);
+ var anotherDeferredMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(serviceBusMessageState: ServiceBusMessageState.Deferred);
+
+ _mockMessageReceiver.Setup(x => x.PeekMessageAsync(It.IsAny(), It.IsAny()))
+ .ReturnsAsync(deferredMessage);
+
+ _mockMessageReceiver.Setup(x => x.PeekMessagesAsync(It.IsAny(), It.IsAny(), It.IsAny()))
+ .ReturnsAsync(new List { anotherDeferredMessage });
+
+ ServiceBusListener listener = CreateListener();
+
+ var metrics = await ((ServiceBusScaleMonitor)listener.GetMonitor()).GetMetricsAsync();
+
+ Assert.AreEqual(0, metrics.PartitionCount);
+ Assert.AreEqual(0, metrics.MessageCount);
+ Assert.AreEqual(TimeSpan.FromSeconds(0), metrics.QueueTime);
+ Assert.AreNotEqual(default(DateTime), metrics.Timestamp);
+ }
+
+ [Test]
+ public async Task GetMetrics_DoesNotPeekBatchesWhenFirstAttemptReturnsActive()
+ {
+ var activeMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(enqueuedTime: DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(30)), sequenceNumber: 2);
+
+ _mockMessageReceiver.Setup(x => x.PeekMessageAsync(It.IsAny(), It.IsAny()))
+ .ReturnsAsync(activeMessage);
+
+ _mockMessageReceiver.Verify(x => x.PeekMessagesAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Never);
+
+ ServiceBusListener listener = CreateListener();
+
+ var metrics = await ((ServiceBusScaleMonitor)listener.GetMonitor()).GetMetricsAsync();
+
+ Assert.AreEqual(0, metrics.PartitionCount);
+ Assert.AreEqual(1, metrics.MessageCount);
+ Assert.That(metrics.QueueTime, Is.GreaterThanOrEqualTo(TimeSpan.FromSeconds(30)));
+ Assert.AreNotEqual(default(DateTime), metrics.Timestamp);
+ }
+
+ [Test]
+ public async Task GetMetrics_DoesNotPeekBatchesWhenFirstAttemptReturnsNull()
+ {
+ _mockMessageReceiver.Setup(x => x.PeekMessageAsync(It.IsAny(), It.IsAny()))
+ .ReturnsAsync(() => null);
+
+ _mockMessageReceiver.Verify(x => x.PeekMessagesAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Never);
+
+ ServiceBusListener listener = CreateListener();
+
+ var metrics = await ((ServiceBusScaleMonitor)listener.GetMonitor()).GetMetricsAsync();
+
+ Assert.AreEqual(0, metrics.PartitionCount);
+ Assert.AreEqual(0, metrics.MessageCount);
+ Assert.AreEqual(TimeSpan.FromSeconds(0), metrics.QueueTime);
+ Assert.AreNotEqual(default(DateTime), metrics.Timestamp);
+ }
+
+ [Test]
+ public async Task GetMetrics_IgnoresEmptyBatch()
+ {
+ var deferredMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(serviceBusMessageState: ServiceBusMessageState.Deferred);
+
+ _mockMessageReceiver.Setup(x => x.PeekMessageAsync(It.IsAny(), It.IsAny()))
+ .ReturnsAsync(deferredMessage);
+
+ _mockMessageReceiver.Setup(x => x.PeekMessagesAsync(It.IsAny(), It.IsAny(), It.IsAny()))
+ .ReturnsAsync(new List());
+
+ ServiceBusListener listener = CreateListener();
+
+ var metrics = await ((ServiceBusScaleMonitor)listener.GetMonitor()).GetMetricsAsync();
+
+ Assert.AreEqual(0, metrics.PartitionCount);
+ Assert.AreEqual(0, metrics.MessageCount);
+ Assert.AreEqual(TimeSpan.FromSeconds(0), metrics.QueueTime);
+ Assert.AreNotEqual(default(DateTime), metrics.Timestamp);
+ }
+
+ [Test]
+ public async Task GetMetrics_UseSequenceNumberToRetrieveBatches()
+ {
+ var deferredMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(sequenceNumber: 12, serviceBusMessageState: ServiceBusMessageState.Deferred);
+ var activeMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(enqueuedTime: DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(30)), sequenceNumber: 2);
+
+ _mockMessageReceiver.Setup(x => x.PeekMessageAsync(It.IsAny(), It.IsAny()))
+ .ReturnsAsync(deferredMessage);
+
+ _mockMessageReceiver.Setup(x => x.PeekMessagesAsync(It.IsAny(), 12, It.IsAny()))
+ .ReturnsAsync(new List { activeMessage });
+
+ ServiceBusListener listener = CreateListener();
+
+ var metrics = await ((ServiceBusScaleMonitor)listener.GetMonitor()).GetMetricsAsync();
+
+ Assert.AreEqual(0, metrics.PartitionCount);
+ Assert.AreEqual(1, metrics.MessageCount);
+ Assert.That(metrics.QueueTime, Is.GreaterThanOrEqualTo(TimeSpan.FromSeconds(30)));
+ Assert.AreNotEqual(default(DateTime), metrics.Timestamp);
+ }
+
+ [Test]
+ public async Task GetMetrics_PeeksOneFromHeadAndTenWithBatching()
+ {
+ var deferredMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(serviceBusMessageState: ServiceBusMessageState.Deferred);
+ var activeMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(enqueuedTime: DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(30)), sequenceNumber: 2);
+
+ _mockMessageReceiver.Setup(x => x.PeekMessageAsync(0, It.IsAny()))
+ .ReturnsAsync(deferredMessage);
+
+ _mockMessageReceiver.Setup(x => x.PeekMessagesAsync(10, It.IsAny(), It.IsAny()))
+ .ReturnsAsync(new List { activeMessage, activeMessage });
+
+ ServiceBusListener listener = CreateListener();
+
+ var metrics = await ((ServiceBusScaleMonitor)listener.GetMonitor()).GetMetricsAsync();
+
+ Assert.AreEqual(0, metrics.PartitionCount);
+ Assert.AreEqual(1, metrics.MessageCount);
+ Assert.That(metrics.QueueTime, Is.GreaterThanOrEqualTo(TimeSpan.FromSeconds(30)));
+ Assert.AreNotEqual(default(DateTime), metrics.Timestamp);
+ }
+
+ [Test]
+ public async Task GetMetrics_IgnoresDeferredOrScheduledMessagesUntilItFindsAnActive()
+ {
+ var firstDeferredMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(serviceBusMessageState: ServiceBusMessageState.Deferred);
+ var secondScheduledMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(serviceBusMessageState: ServiceBusMessageState.Scheduled);
+ var activeMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(enqueuedTime: DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(30)), sequenceNumber: 2);
+
+ _mockMessageReceiver.Setup(x => x.PeekMessageAsync(It.IsAny(), It.IsAny()))
+ .ReturnsAsync(firstDeferredMessage);
+
+ _mockMessageReceiver.Setup(x => x.PeekMessagesAsync(It.IsAny(), It.IsAny(), It.IsAny()))
+ .ReturnsAsync(new List { secondScheduledMessage, activeMessage });
+
+ ServiceBusListener listener = CreateListener();
+
+ var serviceBusScaleMonitor = (ServiceBusScaleMonitor)listener.GetMonitor();
+ var metrics = await serviceBusScaleMonitor.GetMetricsAsync();
+
+ Assert.AreEqual(0, metrics.PartitionCount);
+ Assert.AreEqual(1, metrics.MessageCount);
+ Assert.That(metrics.QueueTime, Is.GreaterThanOrEqualTo(TimeSpan.FromSeconds(30)));
+ Assert.AreNotEqual(default(DateTime), metrics.Timestamp);
+ }
+
+ [Test]
+ public async Task GetMetrics_GiveUpAfterFirstAndBatchPeekDoesntReturnActive()
+ {
+ var firstDeferredMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(serviceBusMessageState: ServiceBusMessageState.Deferred);
+
+ _mockMessageReceiver.Setup(x => x.PeekMessageAsync(It.IsAny(), It.IsAny()))
+ .ReturnsAsync(firstDeferredMessage);
+
+ _mockMessageReceiver.Setup(x => x.PeekMessagesAsync(It.IsAny(), It.IsAny(), It.IsAny()))
+ .ReturnsAsync((int batchSize, long _, CancellationToken __) => Enumerable.Range(1, batchSize)
+ .Select(i => ServiceBusModelFactory.ServiceBusReceivedMessage(serviceBusMessageState: ServiceBusMessageState.Scheduled))
+ .ToList());
+
+ ServiceBusListener listener = CreateListener();
+
+ var serviceBusScaleMonitor = (ServiceBusScaleMonitor)listener.GetMonitor();
+ var metrics = await serviceBusScaleMonitor.GetMetricsAsync();
+
+ Assert.AreEqual(0, metrics.PartitionCount);
+ Assert.AreEqual(0, metrics.MessageCount);
+ Assert.AreEqual(TimeSpan.FromSeconds(0), metrics.QueueTime);
+ Assert.AreNotEqual(default(DateTime), metrics.Timestamp);
+ }
+
+ [Test]
+ public async Task GetMetrics_CalculatesMetrics()
+ {
+ var message = ServiceBusModelFactory.ServiceBusReceivedMessage(enqueuedTime: DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(30)));
+
+ _mockMessageReceiver.Setup(x => x.PeekMessageAsync(It.IsAny(), It.IsAny()))
+ .ReturnsAsync(message);
+
+ ServiceBusListener listener = CreateListener();
+
+ var metrics = await ((ServiceBusScaleMonitor)listener.GetMonitor()).GetMetricsAsync();
+
+ Assert.AreEqual(0, metrics.PartitionCount);
+ Assert.AreEqual(1, metrics.MessageCount);
+ Assert.That(metrics.QueueTime, Is.GreaterThanOrEqualTo(TimeSpan.FromSeconds(30)));
+ Assert.AreNotEqual(default(DateTime), metrics.Timestamp);
+ }
+
[Test]
public async Task GetMetrics_HandlesExceptions()
{