From dd43abf01327195411fc2c12e0ea2be7a9942511 Mon Sep 17 00:00:00 2001 From: Jesse Squire Date: Wed, 3 Feb 2021 18:07:59 -0500 Subject: [PATCH] [Event Hubs Client] Move Blob Checkpoint Store to Shared (#18395) The focus of these changes is to complete migration of the Blobs Checkpoint Store from the `Azure.Messaging.EventHubs.Processor` project to the `Azure.Messaging.EventHubs.Shared` project. To facilitate sharing the checkpoint store between the processor and the Functions extensions, the implementation for the store was moved into the shared project, though its tests remained in the processor. In order to follow the pattern of keeping locality between shared code items and their tests, the checkpoint store tests, both unit and live, have been migrated to the shared testing project. In support of this, some of the live testing infrastructure specific to managing storage resources has also been migrated into the shared project as part of the `BlobTesting` category of shared items. Some refactoring and reformatting of the storge tests has also been performed to improve consistency with other Event Hubs code, trim dead areas, and improve readability. --- .../BlobsCheckpointStore.Diagnostics.cs | 6 +- ...Messaging.EventHubs.Processor.Tests.csproj | 9 +- .../BlobsCheckpointStoreDiagnosticsTests.cs | 771 ++++++++++++++++++ ...ntHubs.Shared.BlobStorageTesting.projitems | 16 + .../Azure.Messaging.EventHubs.Shared.shproj | 4 +- .../src/BlobStorageTesting}/StorageScope.cs | 13 +- .../StorageTestEnvironment.cs | 2 +- .../src/Testing/EventHubScope.cs | 12 +- .../src/Testing/LiveResourceManager.cs | 2 +- ...re.Messaging.EventHubs.Shared.Tests.csproj | 20 +- .../BlobsCheckpointStoreLiveTests.cs | 413 ++++------ .../BlobsCheckpointStoreTests.cs | 361 ++++---- .../BlobsCheckpointStore.Diagnostics.cs | 319 ++++++++ .../tests/Infrastructure/IBlobEventLogger.cs | 293 +++++++ .../PartitionOwnershipExtensions.cs | 2 +- .../PartitionOwnershipExtensionsTests.cs | 2 +- .../tests/Infrastructure/TestRunFixture.cs | 55 ++ .../src/Azure.Messaging.EventHubs.csproj | 3 +- .../Azure.Messaging.EventHubs.Tests.csproj | 1 - ....WebJobs.Extensions.EventHubs.Tests.csproj | 3 +- .../tests/WebJobsEventHubTestBase.cs | 1 - 21 files changed, 1850 insertions(+), 458 deletions(-) rename sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/{Storage => Diagnostics}/BlobsCheckpointStore.Diagnostics.cs (98%) create mode 100644 sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Diagnostics/BlobsCheckpointStoreDiagnosticsTests.cs create mode 100644 sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Azure.Messaging.EventHubs.Shared.BlobStorageTesting.projitems rename sdk/eventhub/{Azure.Messaging.EventHubs.Processor/tests/Infrastructure => Azure.Messaging.EventHubs.Shared/src/BlobStorageTesting}/StorageScope.cs (93%) rename sdk/eventhub/{Azure.Messaging.EventHubs.Processor/tests/Infrastructure => Azure.Messaging.EventHubs.Shared/src/BlobStorageTesting}/StorageTestEnvironment.cs (99%) rename sdk/eventhub/{Azure.Messaging.EventHubs.Processor/tests/CheckpointStore => Azure.Messaging.EventHubs.Shared/tests/BlobCheckpointStore}/BlobsCheckpointStoreLiveTests.cs (75%) rename sdk/eventhub/{Azure.Messaging.EventHubs.Processor/tests/CheckpointStore => Azure.Messaging.EventHubs.Shared/tests/BlobCheckpointStore}/BlobsCheckpointStoreTests.cs (89%) create mode 100644 sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/BlobsCheckpointStore.Diagnostics.cs create mode 100644 sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/IBlobEventLogger.cs rename sdk/eventhub/{Azure.Messaging.EventHubs.Processor => Azure.Messaging.EventHubs.Shared}/tests/Infrastructure/PartitionOwnershipExtensions.cs (97%) rename sdk/eventhub/{Azure.Messaging.EventHubs.Processor => Azure.Messaging.EventHubs.Shared}/tests/Infrastructure/PartitionOwnershipExtensionsTests.cs (99%) create mode 100644 sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/TestRunFixture.cs diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobsCheckpointStore.Diagnostics.cs similarity index 98% rename from sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs rename to sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobsCheckpointStore.Diagnostics.cs index 217bac1b9a3b6..fd2d50cee24bd 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobsCheckpointStore.Diagnostics.cs @@ -241,7 +241,11 @@ partial void OwnershipNotClaimable(string partitionId, /// The name of the consumer group the ownership is associated with. /// The identifier of the processor that attempted to claim the ownership for. /// - partial void OwnershipClaimed(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier) => + partial void OwnershipClaimed(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier) => Logger.OwnershipClaimed(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier); /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Azure.Messaging.EventHubs.Processor.Tests.csproj b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Azure.Messaging.EventHubs.Processor.Tests.csproj index e8de971090606..addcf9717b11c 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Azure.Messaging.EventHubs.Processor.Tests.csproj +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Azure.Messaging.EventHubs.Processor.Tests.csproj @@ -17,12 +17,10 @@ + - - - @@ -30,10 +28,7 @@ - - - - + diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Diagnostics/BlobsCheckpointStoreDiagnosticsTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Diagnostics/BlobsCheckpointStoreDiagnosticsTests.cs new file mode 100644 index 0000000000000..f1e799fb15cef --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Diagnostics/BlobsCheckpointStoreDiagnosticsTests.cs @@ -0,0 +1,771 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.EventHubs.Core; +using Azure.Messaging.EventHubs.Primitives; +using Azure.Messaging.EventHubs.Processor.Diagnostics; +using Azure.Storage; +using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Models; +using Moq; +using NUnit.Framework; + +namespace Azure.Messaging.EventHubs.Processor.Tests +{ + /// + /// The suite of tests for the + /// class. + /// + /// + [TestFixture] + public class BlobsCheckpointStoreDiagnosticsTests + { + private const string FullyQualifiedNamespace = "fqns"; + private const string EventHubName = "name"; + private const string ConsumerGroup = "group"; + private const string MatchingEtag = "etag"; + private const string WrongEtag = "wrongEtag"; + private const string PartitionId = "1"; + + private readonly EventHubsRetryPolicy DefaultRetryPolicy = new BasicRetryPolicy(new EventHubsRetryOptions()); + private readonly string OwnershipIdentifier = Guid.NewGuid().ToString(); + + /// + /// Verifies basic functionality of ListOwnershipAsync and ensures the appropriate events are emitted on success. + /// + /// + [Test] + public async Task ListOwnershipLogsStartAndComplete() + { + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/{Guid.NewGuid().ToString()}", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot", + new Dictionary {{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}}) + }; + + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, DefaultRetryPolicy); + + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + await target.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None); + + mockLog.Verify(m => m.ListOwnershipStart(FullyQualifiedNamespace, EventHubName, ConsumerGroup)); + mockLog.Verify(m => m.ListOwnershipComplete(FullyQualifiedNamespace, EventHubName, ConsumerGroup, blobList.Count)); + } + + /// + /// Verifies basic functionality of ListOwnershipAsync and ensures the appropriate events are emitted on failure. + /// + /// + [Test] + public void ListOwnershipLogsErrorOnException() + { + var ex = new RequestFailedException(0, "foo", BlobErrorCode.ContainerNotFound.ToString(), null); + var target = new BlobsCheckpointStore(new MockBlobContainerClient(getBlobsAsyncException: ex), + DefaultRetryPolicy); + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + Assert.That(async () => await target.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None), Throws.InstanceOf()); + mockLog.Verify(m => m.ListOwnershipError(FullyQualifiedNamespace, EventHubName, ConsumerGroup, ex.Message)); + } + + /// + /// Verifies basic functionality of ClaimOwnershipAsync and ensures the appropriate logging. + /// + /// + [Test] + public async Task ClaimOwnershipLogsStartAndComplete() + { + var partitionOwnership = new List + { + new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = FullyQualifiedNamespace, + EventHubName = EventHubName, + ConsumerGroup = ConsumerGroup, + OwnerIdentifier = OwnershipIdentifier, + PartitionId = PartitionId, + LastModifiedTime = DateTime.UtcNow + } + }; + + var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/1", _ => { }); + var target = new BlobsCheckpointStore(mockBlobContainerClient, DefaultRetryPolicy); + + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + var result = await target.ClaimOwnershipAsync(partitionOwnership, CancellationToken.None); + mockLog.Verify(m => m.ClaimOwnershipStart(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, OwnershipIdentifier)); + mockLog.Verify(m => m.ClaimOwnershipComplete(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, OwnershipIdentifier)); + } + + /// + /// Verifies basic functionality of ClaimOwnershipAsync and ensures the appropriate logging. + /// + /// + [Test] + public void ClaimOwnershipLogsErrors() + { + var partitionOwnership = new List + { + new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = FullyQualifiedNamespace, + EventHubName = EventHubName, + ConsumerGroup = ConsumerGroup, + OwnerIdentifier = OwnershipIdentifier, + PartitionId = PartitionId, + LastModifiedTime = DateTime.UtcNow + } + }; + + var expectedException = new DllNotFoundException("BOOM!"); + var mockLog = new Mock(); + var mockContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/1", client => client.UploadBlobException = expectedException); + + var target = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); + target.Logger = mockLog.Object; + + Assert.That(async () => await target.ClaimOwnershipAsync(partitionOwnership, CancellationToken.None), Throws.Exception.EqualTo(expectedException)); + mockLog.Verify(m => m.ClaimOwnershipError(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, OwnershipIdentifier, expectedException.Message)); + } + + /// + /// Verifies basic functionality of ClaimOwnershipAsync and ensures the appropriate events are emitted on success. + /// + /// + [Test] + public async Task ClaimOwnershipForNewPartitionLogsOwnershipClaimed() + { + var partitionOwnership = new List + { + new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = FullyQualifiedNamespace, + EventHubName = EventHubName, + ConsumerGroup = ConsumerGroup, + OwnerIdentifier = OwnershipIdentifier, + PartitionId = PartitionId, + LastModifiedTime = DateTime.UtcNow + } + }; + + var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/1", _ => { }); + var target = new BlobsCheckpointStore(mockBlobContainerClient, DefaultRetryPolicy); + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + var result = await target.ClaimOwnershipAsync(partitionOwnership, CancellationToken.None); + mockLog.Verify(m => m.OwnershipClaimed(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, OwnershipIdentifier)); + } + + /// + /// Verifies basic functionality of ClaimOwnershipAsync and ensures the appropriate events are emitted on success. + /// + /// + [Test] + public async Task ClaimOwnershipForExistingPartitionLogsOwnershipClaimed() + { + var blobInfo = BlobsModelFactory.BlobInfo(new ETag($@"""{MatchingEtag}"""), DateTime.UtcNow); + + var partitionOwnership = new List + { + new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = FullyQualifiedNamespace, + EventHubName = EventHubName, + ConsumerGroup = ConsumerGroup, + OwnerIdentifier = OwnershipIdentifier, + PartitionId = PartitionId, + LastModifiedTime = DateTime.UtcNow, + Version = MatchingEtag + } + }; + + var mockContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/1", client => client.BlobInfo = blobInfo); + var target = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + var result = await target.ClaimOwnershipAsync(partitionOwnership, CancellationToken.None); + mockLog.Verify(m => m.OwnershipClaimed(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, OwnershipIdentifier)); + } + + /// + /// Verifies basic functionality of ClaimOwnershipAsync and ensures the appropriate events are emitted on success. + /// + /// + [Test] + public async Task ClaimOwnershipForExistingPartitionWithWrongEtagLogsOwnershipNotClaimable() + { + var blobInfo = BlobsModelFactory.BlobInfo(new ETag($@"""{WrongEtag}"""), DateTime.UtcNow); + + var partitionOwnership = new List + { + new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = FullyQualifiedNamespace, + EventHubName = EventHubName, + ConsumerGroup = ConsumerGroup, + OwnerIdentifier = OwnershipIdentifier, + PartitionId = PartitionId, + LastModifiedTime = DateTime.UtcNow, + Version = MatchingEtag + } + }; + + var mockContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/1", client => client.BlobInfo = blobInfo); + var target = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + var result = await target.ClaimOwnershipAsync(partitionOwnership, CancellationToken.None); + mockLog.Verify(m => m.OwnershipNotClaimable(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, OwnershipIdentifier, It.Is(e => e.Contains(BlobErrorCode.ConditionNotMet.ToString())))); + } + + /// + /// Verifies basic functionality of ClaimOwnershipAsync and ensures the appropriate events are emitted on failure. + /// + /// + [Test] + public void ClaimOwnershipForMissingPartitionLogsClaimOwnershipError() + { + var partitionOwnership = new List + { + new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = FullyQualifiedNamespace, + EventHubName = EventHubName, + ConsumerGroup = ConsumerGroup, + OwnerIdentifier = OwnershipIdentifier, + PartitionId = PartitionId, + LastModifiedTime = DateTime.UtcNow, + Version = MatchingEtag + } + }; + + var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/1", _ => { }); + var target = new BlobsCheckpointStore(mockBlobContainerClient, DefaultRetryPolicy); + + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + Assert.That(async () => await target.ClaimOwnershipAsync(partitionOwnership, CancellationToken.None), Throws.InstanceOf()); + mockLog.Verify(m => m.ClaimOwnershipError(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, OwnershipIdentifier, It.Is(e => e.Contains(BlobErrorCode.BlobNotFound.ToString())))); + } + + /// + /// Verifies basic functionality of ListCheckpointsAsync and ensures the appropriate events are emitted on success. + /// + /// + [Test] + public async Task ListCheckpointsLogsStartAndComplete() + { + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{Guid.NewGuid().ToString()}", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot", + new Dictionary + { + {BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}, + {BlobMetadataKey.Offset, "0"} + }) + }; + + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, DefaultRetryPolicy); + + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None); + + mockLog.Verify(m => m.ListCheckpointsStart(FullyQualifiedNamespace, EventHubName, ConsumerGroup)); + mockLog.Verify(m => m.ListCheckpointsComplete(FullyQualifiedNamespace, EventHubName, ConsumerGroup, blobList.Count)); + } + + /// + /// Verifies basic functionality of ListCheckpointsAsync and ensures the appropriate events are emitted when errors occur. + /// + /// + [Test] + public void ListCheckpointsLogsErrors() + { + var expectedException = new DllNotFoundException("BOOM!"); + var mockLog = new Mock(); + var mockContainerClient = new MockBlobContainerClient() { GetBlobsAsyncException = expectedException }; + + var target = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); + target.Logger = mockLog.Object; + + Assert.That(async () => await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None), Throws.Exception.EqualTo(expectedException)); + mockLog.Verify(m => m.ListCheckpointsError(FullyQualifiedNamespace, EventHubName, ConsumerGroup, expectedException.Message)); + } + + /// + /// Verifies basic functionality of ListCheckpointsAsync and ensures the appropriate events are emitted on success. + /// + /// + [Test] + public async Task ListCheckpointsLogsInvalidCheckpoint() + { + var partitionId = Guid.NewGuid().ToString(); + + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{partitionId}", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot", + new Dictionary {{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}}) + }; + + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, DefaultRetryPolicy); + + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None); + mockLog.Verify(m => m.InvalidCheckpointFound(partitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup)); + } + + /// + /// Verifies basic functionality of ListCheckpointsAsync and ensures the appropriate events are emitted on failure. + /// + /// + [Test] + public void ListCheckpointsForMissingPartitionLogsListCheckpointsError() + { + var ex = new RequestFailedException(0, "foo", BlobErrorCode.ContainerNotFound.ToString(), null); + var target = new BlobsCheckpointStore(new MockBlobContainerClient(getBlobsAsyncException: ex), DefaultRetryPolicy); + + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + Assert.That(async () => await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None), Throws.InstanceOf()); + mockLog.Verify(m => m.ListCheckpointsError(FullyQualifiedNamespace, EventHubName, ConsumerGroup, ex.Message)); + } + + /// + /// Verifies basic functionality of UpdateCheckpointAsync and ensures the appropriate events are emitted on success. + /// + /// + [Test] + public async Task UpdateCheckpointLogsStartAndCompleteWhenTheBlobExists() + { + var checkpoint = new EventProcessorCheckpoint + { + FullyQualifiedNamespace = FullyQualifiedNamespace, + EventHubName = EventHubName, + ConsumerGroup = ConsumerGroup, + PartitionId = PartitionId, + }; + + var blobInfo = BlobsModelFactory.BlobInfo(new ETag($@"""{MatchingEtag}"""), DateTime.UtcNow); + + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/{Guid.NewGuid().ToString()}", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot", + new Dictionary {{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}}) + }; + + var mockContainerClient = new MockBlobContainerClient() { Blobs = blobList }; + + mockContainerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/1", client => + { + client.BlobInfo = blobInfo; + client.UploadBlobException = new Exception("Upload should not be called"); + }); + + var target = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); + + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + await target.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), CancellationToken.None); + mockLog.Verify(log => log.UpdateCheckpointStart(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup)); + mockLog.Verify(log => log.UpdateCheckpointComplete(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup)); + } + + /// + /// Verifies basic functionality of UpdateCheckpointAsync and ensures the appropriate events are emitted on success. + /// + /// + [Test] + public async Task UpdateCheckpointLogsStartAndCompleteWhenTheBlobDoesNotExist() + { + var checkpoint = new EventProcessorCheckpoint + { + FullyQualifiedNamespace = FullyQualifiedNamespace, + EventHubName = EventHubName, + ConsumerGroup = ConsumerGroup, + PartitionId = PartitionId, + }; + + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/{Guid.NewGuid().ToString()}", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot", + new Dictionary {{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}}) + }; + var mockBlobContainerClient = new MockBlobContainerClient() { Blobs = blobList }; + mockBlobContainerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/1", _ => { }); + + var target = new BlobsCheckpointStore(mockBlobContainerClient, DefaultRetryPolicy); + + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + await target.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), CancellationToken.None); + mockLog.Verify(log => log.UpdateCheckpointStart(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup)); + mockLog.Verify(log => log.UpdateCheckpointComplete(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup)); + } + + /// + /// Verifies basic functionality of UpdateCheckpointAsync and ensures the appropriate logs are written + /// when exceptions occur. + /// + /// + [Test] + public void UpdateCheckpointLogsErrorsWhenTheBlobExists() + { + var checkpoint = new EventProcessorCheckpoint + { + FullyQualifiedNamespace = FullyQualifiedNamespace, + EventHubName = EventHubName, + ConsumerGroup = ConsumerGroup, + PartitionId = PartitionId, + }; + + var expectedException = new DllNotFoundException("BOOM!"); + var mockLog = new Mock(); + + var mockContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/1", client => + { + client.BlobClientSetMetadataException = expectedException; + client.UploadBlobException = new Exception("Upload should not be called"); + }); + + var target = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); + target.Logger = mockLog.Object; + + Assert.That(async () => await target.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), CancellationToken.None), Throws.Exception.EqualTo(expectedException)); + mockLog.Verify(log => log.UpdateCheckpointError(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, expectedException.Message)); + } + + /// + /// Verifies basic functionality of UpdateCheckpointAsync and ensures the appropriate logs are written + /// when exceptions occur. + /// + /// + [Test] + public void UpdateCheckpointLogsErrorsWhenTheBlobDoesNotExist() + { + var checkpoint = new EventProcessorCheckpoint + { + FullyQualifiedNamespace = FullyQualifiedNamespace, + EventHubName = EventHubName, + ConsumerGroup = ConsumerGroup, + PartitionId = PartitionId, + }; + + var expectedException = new DllNotFoundException("BOOM!"); + var mockLog = new Mock(); + + var mockContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/1", client => + { + client.UploadBlobException = expectedException; + }); + + var target = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); + target.Logger = mockLog.Object; + + Assert.That(async () => await target.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), CancellationToken.None), Throws.Exception.EqualTo(expectedException)); + mockLog.Verify(log => log.UpdateCheckpointError(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, expectedException.Message)); + } + + /// + /// Verifies basic functionality of UpdateCheckpointAsync and ensures the appropriate events are emitted on failure. + /// + /// + [Test] + public void UpdateCheckpointForMissingContainerLogsCheckpointUpdateError() + { + var checkpoint = new EventProcessorCheckpoint + { + FullyQualifiedNamespace = FullyQualifiedNamespace, + EventHubName = EventHubName, + ConsumerGroup = ConsumerGroup, + PartitionId = PartitionId + }; + + var ex = new RequestFailedException(404, BlobErrorCode.ContainerNotFound.ToString(), BlobErrorCode.ContainerNotFound.ToString(), null); + var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/1", client => client.UploadBlobException = ex); + var target = new BlobsCheckpointStore(mockBlobContainerClient, DefaultRetryPolicy); + + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + Assert.That(async () => await target.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), CancellationToken.None), Throws.InstanceOf()); + mockLog.Verify(m => m.UpdateCheckpointError(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, ex.Message)); + } + + /// + /// Verifies basic functionality of GetCheckpointAsync and ensures the appropriate events are emitted on success. + /// + /// + [Test] + public async Task GetCheckpointLogsStartAndComplete() + { + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot", + new Dictionary + { + {BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}, + {BlobMetadataKey.Offset, "0"} + }) + }; + + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, DefaultRetryPolicy); + + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None); + + mockLog.Verify(m => m.GetCheckpointStart(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0")); + mockLog.Verify(m => m.GetCheckpointComplete(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0")); + } + + /// + /// Verifies basic functionality of GetCheckpointAsync and ensures the appropriate events are emitted when errors occur. + /// + /// + [Test] + public void GetCheckpointLogsErrors() + { + var expectedException = new DllNotFoundException("BOOM!"); + var mockContainerClient = new MockBlobContainerClient() { GetBlobsAsyncException = expectedException }; + var target = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); + + mockContainerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/0", client => + { + client.GetPropertiesException = expectedException; + }); + + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + Assert.That(async () => await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None), Throws.Exception.EqualTo(expectedException)); + mockLog.Verify(m => m.GetCheckpointError(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", expectedException.Message)); + } + + /// + /// Verifies basic functionality of GetCheckpointAsync and ensures the appropriate events are emitted on success. + /// + /// + [Test] + public async Task GetCheckpointLogsInvalidCheckpoint() + { + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot", + new Dictionary {{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}}) + }; + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, DefaultRetryPolicy); + + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None); + mockLog.Verify(m => m.InvalidCheckpointFound("0", FullyQualifiedNamespace, EventHubName, ConsumerGroup)); + } + + private class MockBlobContainerClient : BlobContainerClient + { + public override Uri Uri { get; } + public override string AccountName { get; } + public override string Name { get; } + internal IEnumerable Blobs; + internal Exception GetBlobsAsyncException; + internal Dictionary BlobClients = new (); + + public MockBlobContainerClient(string accountName = "blobAccount", + string containerName = "container", + Exception getBlobsAsyncException = null) + { + GetBlobsAsyncException = getBlobsAsyncException; + Blobs = Enumerable.Empty(); + AccountName = accountName; + Name = containerName; + Uri = new Uri("https://foo"); + } + public override AsyncPageable GetBlobsAsync(BlobTraits traits = BlobTraits.None, BlobStates states = BlobStates.None, string prefix = null, CancellationToken cancellationToken = default) + { + if (GetBlobsAsyncException != null) + { + throw GetBlobsAsyncException; + } + + return new MockAsyncPageable(Blobs.Where(b => prefix == null || b.Name.StartsWith(prefix, StringComparison.Ordinal))); + } + + public override BlobClient GetBlobClient(string blobName) + { + if (BlobClients.TryGetValue(blobName, out var client)) + { + return client; + } + + var blob = Blobs.SingleOrDefault(c => c.Name == blobName); + if (blob != null) + { + return new MockBlobClient(blobName) + { + Properties = BlobsModelFactory.BlobProperties(metadata: blob.Metadata) + }; + } + + return new MockBlobClient(blobName); + } + + internal MockBlobContainerClient AddBlobClient(string name, Action configure) + { + var client = new MockBlobClient(name); + configure(client); + BlobClients[name] = client; + return this; + } + } + + private class MockBlobClient : BlobClient + { + public override string Name { get; } + + internal BlobInfo BlobInfo; + internal BlobProperties Properties; + internal Exception UploadBlobException; + internal Exception BlobClientSetMetadataException; + internal DllNotFoundException GetPropertiesException; + public MockBlobClient(string blobName) + { + Name = blobName; + } + + public override Task> SetMetadataAsync(IDictionary metadata, BlobRequestConditions conditions = null, CancellationToken cancellationToken = default) + { + if (BlobClientSetMetadataException != null) + { + throw BlobClientSetMetadataException; + } + + if (BlobInfo == null) + { + throw new RequestFailedException(404, BlobErrorCode.BlobNotFound.ToString(), BlobErrorCode.BlobNotFound.ToString(), default); + } + + if ((conditions == null) || (BlobInfo.ETag.Equals($@"""{conditions.IfMatch}"""))) + { + return Task.FromResult(Response.FromValue(BlobInfo, Mock.Of())); + } + + throw new RequestFailedException(412, BlobErrorCode.ConditionNotMet.ToString(), BlobErrorCode.ConditionNotMet.ToString(), default); + } + + public override Task> UploadAsync(Stream content, BlobHttpHeaders httpHeaders = null, IDictionary metadata = null, BlobRequestConditions conditions = null, IProgress progressHandler = null, AccessTier? accessTier = null, StorageTransferOptions transferOptions = default, CancellationToken cancellationToken = default) + { + if (UploadBlobException != null) + { + throw UploadBlobException; + } + + if (BlobInfo != null) + { + throw new RequestFailedException(409, BlobErrorCode.BlobAlreadyExists.ToString(), BlobErrorCode.BlobAlreadyExists.ToString(), default); + } + + return Task.FromResult( + Response.FromValue( + BlobsModelFactory.BlobContentInfo(new ETag("etag"), DateTime.UtcNow, new byte[] { }, string.Empty, 0L), + Mock.Of())); + } + + public override Task DownloadToAsync(Stream destination, CancellationToken cancellationToken) => Task.FromResult(Mock.Of()); + + public override Task> GetPropertiesAsync(BlobRequestConditions conditions = null, CancellationToken cancellationToken = default) + { + if (GetPropertiesException != null) + { + throw GetPropertiesException; + } + + if (Properties == null) + { + throw new RequestFailedException(404, BlobErrorCode.BlobNotFound.ToString(), BlobErrorCode.BlobNotFound.ToString(), default); + } + + return Task.FromResult(Response.FromValue(Properties, Mock.Of())); + } + } + + private class MockAsyncPageable : AsyncPageable + { + private readonly IEnumerable Items; + + internal MockAsyncPageable(IEnumerable items) + { + Items = items; + } + public override IAsyncEnumerable> AsPages(string continuationToken = null, int? pageSizeHint = null) + { + return CratePageResponse(Items); + } + + internal async IAsyncEnumerable> CratePageResponse

(IEnumerable

value) + { + await Task.Delay(0); + yield return new MockPage

(value); + } + } + + private class MockPage : Page + { + private readonly IReadOnlyList InnerValues; + public override IReadOnlyList Values => InnerValues; + + public override string ContinuationToken => throw new NotImplementedException(); + + public override Response GetRawResponse() => throw new NotImplementedException(); + + public MockPage(IEnumerable items) + { + InnerValues = items.ToList().AsReadOnly(); + } + } + } +} diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Azure.Messaging.EventHubs.Shared.BlobStorageTesting.projitems b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Azure.Messaging.EventHubs.Shared.BlobStorageTesting.projitems new file mode 100644 index 0000000000000..68c34b2a8372d --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Azure.Messaging.EventHubs.Shared.BlobStorageTesting.projitems @@ -0,0 +1,16 @@ + + + + $(MSBuildAllProjects);$(MSBuildThisFileFullPath) + true + b9a2bfb3-5636-45b8-9e94-f429ebf3fc1d + + + + Azure.Messaging.EventHubs.Tests + + + + + + diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Azure.Messaging.EventHubs.Shared.shproj b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Azure.Messaging.EventHubs.Shared.shproj index d4fd17e1eb695..0edece3895a2d 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Azure.Messaging.EventHubs.Shared.shproj +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Azure.Messaging.EventHubs.Shared.shproj @@ -11,9 +11,11 @@ + + + - diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/StorageScope.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobStorageTesting/StorageScope.cs similarity index 93% rename from sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/StorageScope.cs rename to sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobStorageTesting/StorageScope.cs index ae1a02f47f213..fbb6e5c036544 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/StorageScope.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobStorageTesting/StorageScope.cs @@ -5,13 +5,12 @@ using System.Runtime.CompilerServices; using System.Text.RegularExpressions; using System.Threading.Tasks; -using Azure.Messaging.EventHubs.Tests; using Microsoft.Azure.Management.ResourceManager; using Microsoft.Azure.Management.Storage; using Microsoft.Azure.Management.Storage.Models; using Microsoft.Rest; -namespace Azure.Messaging.EventHubs.Processor.Tests +namespace Azure.Messaging.EventHubs.Tests { ///

/// Provides a dynamically created Azure blob container instance which exists only in the context @@ -32,7 +31,7 @@ public sealed class StorageScope : IAsyncDisposable private static readonly Uri AzureResourceManagerUri = new Uri(EventHubsTestEnvironment.Instance.ResourceManagerUrl); /// Serves as a sentinel flag to denote when the instance has been disposed. - private bool _disposed = false; + private volatile bool _disposed = false; /// /// The name of the blob storage container that was created. @@ -64,7 +63,7 @@ public async ValueTask DisposeAsync() var resourceGroup = EventHubsTestEnvironment.Instance.ResourceGroup; var storageAccount = StorageTestEnvironment.Instance.StorageAccountName; - var token = await ResourceManager.AquireManagementTokenAsync().ConfigureAwait(false); + var token = await ResourceManager.AcquireManagementTokenAsync().ConfigureAwait(false); var client = new StorageManagementClient(AzureResourceManagerUri, new TokenCredentials(token)) { SubscriptionId = EventHubsTestEnvironment.Instance.SubscriptionId }; try @@ -103,7 +102,7 @@ public static async Task CreateAsync([CallerMemberName] string cal var resourceGroup = EventHubsTestEnvironment.Instance.ResourceGroup; var storageAccount = StorageTestEnvironment.Instance.StorageAccountName; - var token = await ResourceManager.AquireManagementTokenAsync().ConfigureAwait(false); + var token = await ResourceManager.AcquireManagementTokenAsync().ConfigureAwait(false); string CreateName() => $"{ Guid.NewGuid().ToString("D").Substring(0, 13) }-{ caller }"; @@ -125,7 +124,7 @@ public static async Task CreateAsync([CallerMemberName] string cal { var subscription = EventHubsTestEnvironment.Instance.SubscriptionId; var resourceGroup = EventHubsTestEnvironment.Instance.ResourceGroup; - var token = await ResourceManager.AquireManagementTokenAsync().ConfigureAwait(false); + var token = await ResourceManager.AcquireManagementTokenAsync().ConfigureAwait(false); static string CreateName() => $"neteventhubs{ Guid.NewGuid().ToString("N").Substring(0, 12) }"; @@ -152,7 +151,7 @@ public static async Task DeleteStorageAccountAsync(string accountName) { var subscription = EventHubsTestEnvironment.Instance.SubscriptionId; var resourceGroup = EventHubsTestEnvironment.Instance.ResourceGroup; - var token = await ResourceManager.AquireManagementTokenAsync().ConfigureAwait(false); + var token = await ResourceManager.AcquireManagementTokenAsync().ConfigureAwait(false); using (var client = new StorageManagementClient(AzureResourceManagerUri, new TokenCredentials(token)) { SubscriptionId = subscription }) { diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/StorageTestEnvironment.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobStorageTesting/StorageTestEnvironment.cs similarity index 99% rename from sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/StorageTestEnvironment.cs rename to sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobStorageTesting/StorageTestEnvironment.cs index 8b3bbc6d42dc6..d5c712df4d946 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/StorageTestEnvironment.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobStorageTesting/StorageTestEnvironment.cs @@ -6,7 +6,7 @@ using System.Threading.Tasks; using Azure.Core.TestFramework; -namespace Azure.Messaging.EventHubs.Processor.Tests +namespace Azure.Messaging.EventHubs.Tests { /// /// Represents the ambient environment for Azure storage resource in which the test suite is diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubScope.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubScope.cs index d1e2d70a93c45..9f21e81c3e43b 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubScope.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubScope.cs @@ -30,7 +30,7 @@ public sealed class EventHubScope : IAsyncDisposable private static readonly Uri AzureResourceManagerUri = new Uri(EventHubsTestEnvironment.Instance.ResourceManagerUrl); /// Serves as a sentinel flag to denote when the instance has been disposed. - private bool _disposed = false; + private volatile bool _disposed = false; /// /// The name of the Event Hub that was created. @@ -88,7 +88,7 @@ public async ValueTask DisposeAsync() var resourceGroup = EventHubsTestEnvironment.Instance.ResourceGroup; var eventHubNamespace = EventHubsTestEnvironment.Instance.EventHubsNamespace; - var token = await ResourceManager.AquireManagementTokenAsync().ConfigureAwait(false); + var token = await ResourceManager.AcquireManagementTokenAsync().ConfigureAwait(false); var client = new EventHubManagementClient(AzureResourceManagerUri, new TokenCredentials(token)) { SubscriptionId = EventHubsTestEnvironment.Instance.SubscriptionId }; try @@ -147,7 +147,7 @@ public static Task CreateAsync(int partitionCount, { var subscription = EventHubsTestEnvironment.Instance.SubscriptionId; var resourceGroup = EventHubsTestEnvironment.Instance.ResourceGroup; - var token = await ResourceManager.AquireManagementTokenAsync().ConfigureAwait(false); + var token = await ResourceManager.AcquireManagementTokenAsync().ConfigureAwait(false); string CreateName() => $"net-eventhubs-{ Guid.NewGuid().ToString("D") }"; @@ -174,7 +174,7 @@ public static async Task DeleteNamespaceAsync(string namespaceName) { var subscription = EventHubsTestEnvironment.Instance.SubscriptionId; var resourceGroup = EventHubsTestEnvironment.Instance.ResourceGroup; - var token = await ResourceManager.AquireManagementTokenAsync().ConfigureAwait(false); + var token = await ResourceManager.AcquireManagementTokenAsync().ConfigureAwait(false); using (var client = new EventHubManagementClient(AzureResourceManagerUri, new TokenCredentials(token)) { SubscriptionId = subscription }) { @@ -218,7 +218,7 @@ private static Task BuildScope(int partitionCount, /// private static async Task BuildScopeFromExistingEventHub() { - var token = await ResourceManager.AquireManagementTokenAsync().ConfigureAwait(false); + var token = await ResourceManager.AcquireManagementTokenAsync().ConfigureAwait(false); using (var client = new EventHubManagementClient(AzureResourceManagerUri, new TokenCredentials(token)) { SubscriptionId = EventHubsTestEnvironment.Instance.SubscriptionId }) { @@ -253,7 +253,7 @@ private static async Task BuildScopeWithNewEventHub(int partition var groups = (consumerGroups ?? Enumerable.Empty()).ToList(); var resourceGroup = EventHubsTestEnvironment.Instance.ResourceGroup; var eventHubNamespace = EventHubsTestEnvironment.Instance.EventHubsNamespace; - var token = await ResourceManager.AquireManagementTokenAsync().ConfigureAwait(false); + var token = await ResourceManager.AcquireManagementTokenAsync().ConfigureAwait(false); string CreateName() => $"{ Guid.NewGuid().ToString("D").Substring(0, 13) }-{ caller }"; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/LiveResourceManager.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/LiveResourceManager.cs index d0580a678d106..e59fcc2decd22 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/LiveResourceManager.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/LiveResourceManager.cs @@ -75,7 +75,7 @@ public async Task QueryResourceGroupLocationAsync(string accessToken, /// /// The token to use for management operations against the Event Hubs Live test namespace. /// - public async Task AquireManagementTokenAsync() + public async Task AcquireManagementTokenAsync() { var token = s_managementToken; var authority = new Uri(new Uri(EventHubsTestEnvironment.Instance.AuthorityHostUrl), EventHubsTestEnvironment.Instance.TenantId).ToString(); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Azure.Messaging.EventHubs.Shared.Tests.csproj b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Azure.Messaging.EventHubs.Shared.Tests.csproj index 9f2ccf0b4050b..924ff6324126f 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Azure.Messaging.EventHubs.Shared.Tests.csproj +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Azure.Messaging.EventHubs.Shared.Tests.csproj @@ -12,10 +12,16 @@ + + + + + + - + @@ -29,17 +35,13 @@ + + - - - - - - - - + + diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/BlobCheckpointStore/BlobsCheckpointStoreLiveTests.cs similarity index 75% rename from sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreLiveTests.cs rename to sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/BlobCheckpointStore/BlobsCheckpointStoreLiveTests.cs index 166b059ca8704..43aa5a7161a02 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreLiveTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/BlobCheckpointStore/BlobsCheckpointStoreLiveTests.cs @@ -9,11 +9,11 @@ using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Core; using Azure.Messaging.EventHubs.Primitives; -using Azure.Messaging.EventHubs.Tests; +using Azure.Messaging.EventHubs.Processor; using Azure.Storage.Blobs; using NUnit.Framework; -namespace Azure.Messaging.EventHubs.Processor.Tests +namespace Azure.Messaging.EventHubs.Tests { /// /// The suite of live tests for the @@ -30,7 +30,10 @@ namespace Azure.Messaging.EventHubs.Processor.Tests [Category(TestCategory.DisallowVisualStudioLiveUnitTesting)] public class BlobsCheckpointStoreLiveTests { - /// The default retry policy to use for the test cases in this class. + /// + /// The default retry policy to use for the test cases in this class. + /// + /// private EventHubsRetryPolicy DefaultRetryPolicy { get; } = new BasicRetryPolicy(new EventHubsRetryOptions()); /// @@ -45,7 +48,6 @@ public async Task BlobStorageManagerCanListOwnership() { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); Assert.That(async () => await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup", default), Throws.Nothing); @@ -64,7 +66,6 @@ public async Task BlobStorageManagerCanListCheckpoints() { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); Assert.That(async () => await checkpointStore.ListCheckpointsAsync("namespace", "eventHubName", "consumerGroup", default), Throws.Nothing); @@ -85,8 +86,8 @@ public async Task BlobStorageManagerCanClaimOwnership(string version) { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); + var ownershipList = new List { // Null version and non-null version hit different paths of the code, calling different methods that connect @@ -119,8 +120,8 @@ public async Task BlobStorageManagerCanUpdateCheckpoint() { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); + var ownershipList = new List { // Make sure the ownership exists beforehand so we hit all storage SDK calls in the checkpoint store. @@ -166,9 +167,8 @@ public async Task ListOwnershipAsyncReturnsEmptyIEnumerableWhenThereAreNoOwnersh { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); - IEnumerable ownership = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup", default); + var ownership = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup", default); Assert.That(ownership, Is.Not.Null.And.Empty); } @@ -186,9 +186,8 @@ public async Task ListCheckpointAsyncReturnsEmptyIEnumerableWhenThereAreNoCheckp { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); - IEnumerable checkpoints = await checkpointStore.ListCheckpointsAsync("namespace", "eventHubName", "consumerGroup", default); + var checkpoints = await checkpointStore.ListCheckpointsAsync("namespace", "eventHubName", "consumerGroup", default); Assert.That(checkpoints, Is.Not.Null.And.Empty); } @@ -206,24 +205,22 @@ public async Task FirstOwnershipClaimSucceeds() { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); var ownershipList = new List(); - var ownership = - new EventProcessorPartitionOwnership - { - FullyQualifiedNamespace = "namespace", - EventHubName = "eventHubName", - ConsumerGroup = "consumerGroup", - OwnerIdentifier = "ownerIdentifier", - PartitionId = "partitionId" - }; - ownershipList.Add(ownership); + var ownership = new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = "namespace", + EventHubName = "eventHubName", + ConsumerGroup = "consumerGroup", + OwnerIdentifier = "ownerIdentifier", + PartitionId = "partitionId" + }; + ownershipList.Add(ownership); await checkpointStore.ClaimOwnershipAsync(ownershipList, default); - IEnumerable storedOwnershipList = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup", default); + var storedOwnershipList = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup", default); Assert.That(storedOwnershipList, Is.Not.Null); Assert.That(storedOwnershipList.Count, Is.EqualTo(1)); @@ -245,24 +242,22 @@ public async Task CheckReturnedEtagContainsSingleQuotes() var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); var ownershipList = new List(); - var ownership = - new EventProcessorPartitionOwnership - { - FullyQualifiedNamespace = "namespace", - EventHubName = "eventHubName", - ConsumerGroup = "consumerGroup", - OwnerIdentifier = "ownerIdentifier", - PartitionId = "partitionId" - }; - ownershipList.Add(ownership); + var ownership = new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = "namespace", + EventHubName = "eventHubName", + ConsumerGroup = "consumerGroup", + OwnerIdentifier = "ownerIdentifier", + PartitionId = "partitionId" + }; - IEnumerable claimedOwnership = await checkpointStore.ClaimOwnershipAsync(ownershipList, default); - IEnumerable storedOwnershipList = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup", default); + ownershipList.Add(ownership); + var claimedOwnership = await checkpointStore.ClaimOwnershipAsync(ownershipList, default); + var storedOwnershipList = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup", default); var claimedOwnershipMatch = s_doubleQuotesExpression.Match(claimedOwnership.First().Version); var storedOwnershipListMatch = s_doubleQuotesExpression.Match(storedOwnershipList.First().Version); @@ -283,26 +278,23 @@ public async Task OwnershipClaimSetsLastModifiedTimeAndVersion() { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); var ownershipList = new List(); - var ownership = - new EventProcessorPartitionOwnership - { - FullyQualifiedNamespace = "namespace", - EventHubName = "eventHubName", - ConsumerGroup = "consumerGroup", - OwnerIdentifier = "ownerIdentifier", - PartitionId = "partitionId" - }; - ownershipList.Add(ownership); + var ownership = new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = "namespace", + EventHubName = "eventHubName", + ConsumerGroup = "consumerGroup", + OwnerIdentifier = "ownerIdentifier", + PartitionId = "partitionId" + }; + ownershipList.Add(ownership); await checkpointStore.ClaimOwnershipAsync(ownershipList, default); Assert.That(ownership.LastModifiedTime, Is.Not.EqualTo(default(DateTimeOffset))); Assert.That(ownership.LastModifiedTime, Is.GreaterThan(DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(5)))); - Assert.That(ownership.Version, Is.Not.Null); } } @@ -321,41 +313,31 @@ public async Task OwnershipClaimFailsWhenVersionIsInvalid(string version) { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); - var ownershipList = new List(); - var firstOwnership = - new EventProcessorPartitionOwnership - { - FullyQualifiedNamespace = "namespace", - EventHubName = "eventHubName", - ConsumerGroup = "consumerGroup", - OwnerIdentifier = "ownerIdentifier", - PartitionId = "partitionId" - }; - - ownershipList.Add(firstOwnership); - - await checkpointStore.ClaimOwnershipAsync(ownershipList, default); - - ownershipList.Clear(); - var secondOwnership = - new EventProcessorPartitionOwnership - { - FullyQualifiedNamespace = "namespace", - EventHubName = "eventHubName", - ConsumerGroup = "consumerGroup", - OwnerIdentifier = "ownerIdentifier", - PartitionId = "partitionId", - Version = version - }; + var firstOwnership = new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = "namespace", + EventHubName = "eventHubName", + ConsumerGroup = "consumerGroup", + OwnerIdentifier = "ownerIdentifier", + PartitionId = "partitionId" + }; - ownershipList.Add(secondOwnership); + await checkpointStore.ClaimOwnershipAsync(new[] { firstOwnership }, default); - await checkpointStore.ClaimOwnershipAsync(ownershipList, default); + var secondOwnership = new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = "namespace", + EventHubName = "eventHubName", + ConsumerGroup = "consumerGroup", + OwnerIdentifier = "ownerIdentifier", + PartitionId = "partitionId", + Version = version + }; - IEnumerable storedOwnershipList = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup", default); + await checkpointStore.ClaimOwnershipAsync(new[] { secondOwnership }, default); + var storedOwnershipList = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup", default); Assert.That(storedOwnershipList, Is.Not.Null); Assert.That(storedOwnershipList.Count, Is.EqualTo(1)); @@ -375,26 +357,20 @@ public async Task OwnershipClaimFailsWhenVersionExistsAndOwnershipDoesNotExist() { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); - var ownershipList = new List(); - - var eTaggyOwnership = - new EventProcessorPartitionOwnership - { - FullyQualifiedNamespace = "namespace", - EventHubName = "eventHubName", - ConsumerGroup = "consumerGroup", - OwnerIdentifier = "ownerIdentifier", - PartitionId = "partitionId", - Version = "ETag" - }; - ownershipList.Add(eTaggyOwnership); - - await checkpointStore.ClaimOwnershipAsync(ownershipList, default); + var eTaggyOwnership = new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = "namespace", + EventHubName = "eventHubName", + ConsumerGroup = "consumerGroup", + OwnerIdentifier = "ownerIdentifier", + PartitionId = "partitionId", + Version = "ETag" + }; - IEnumerable storedOwnershipList = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup", default); + await checkpointStore.ClaimOwnershipAsync(new[] { eTaggyOwnership }, default); + var storedOwnershipList = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup", default); Assert.That(storedOwnershipList, Is.Not.Null.And.Empty); } @@ -412,45 +388,33 @@ public async Task OwnershipClaimSucceedsWhenVersionIsValid() { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); - var ownershipList = new List(); - var firstOwnership = - new EventProcessorPartitionOwnership - { - FullyQualifiedNamespace = "namespace", - EventHubName = "eventHubName", - ConsumerGroup = "consumerGroup", - OwnerIdentifier = "ownerIdentifier", - PartitionId = "partitionId" - }; - - ownershipList.Add(firstOwnership); - - await checkpointStore.ClaimOwnershipAsync(ownershipList, default); - - // Version must have been set by the checkpoint store. - - var version = firstOwnership.Version; - ownershipList.Clear(); + var firstOwnership = new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = "namespace", + EventHubName = "eventHubName", + ConsumerGroup = "consumerGroup", + OwnerIdentifier = "ownerIdentifier", + PartitionId = "partitionId" + }; - var secondOwnership = - new EventProcessorPartitionOwnership - { - FullyQualifiedNamespace = "namespace", - EventHubName = "eventHubName", - ConsumerGroup = "consumerGroup", - OwnerIdentifier = "ownerIdentifier", - PartitionId = "partitionId", - Version = version - }; + await checkpointStore.ClaimOwnershipAsync(new[] { firstOwnership }, default); - ownershipList.Add(secondOwnership); + // The first ownership's version should have been set by the checkpoint store. - await checkpointStore.ClaimOwnershipAsync(ownershipList, default); + var secondOwnership = new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = "namespace", + EventHubName = "eventHubName", + ConsumerGroup = "consumerGroup", + OwnerIdentifier = "ownerIdentifier", + PartitionId = "partitionId", + Version = firstOwnership.Version + }; - IEnumerable storedOwnershipList = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup", default); + await checkpointStore.ClaimOwnershipAsync(new[] { secondOwnership }, default); + var storedOwnershipList = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup", default); Assert.That(storedOwnershipList, Is.Not.Null); Assert.That(storedOwnershipList.Count, Is.EqualTo(1)); @@ -470,12 +434,11 @@ public async Task ClaimOwnershipAsyncCanClaimMultipleOwnership() { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); var ownershipList = new List(); var ownershipCount = 5; - for (int i = 0; i < ownershipCount; i++) + for (var i = 0; i < ownershipCount; i++) { ownershipList.Add( new EventProcessorPartitionOwnership @@ -489,8 +452,7 @@ public async Task ClaimOwnershipAsyncCanClaimMultipleOwnership() } await checkpointStore.ClaimOwnershipAsync(ownershipList, default); - - IEnumerable storedOwnershipList = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup", default); + var storedOwnershipList = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup", default); Assert.That(storedOwnershipList, Is.Not.Null); Assert.That(storedOwnershipList.Count, Is.EqualTo(ownershipCount)); @@ -517,7 +479,6 @@ public async Task ClaimOwnershipAsyncReturnsOnlyTheSuccessfullyClaimedOwnership( { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); var ownershipList = new List(); var ownershipCount = 5; @@ -540,7 +501,6 @@ public async Task ClaimOwnershipAsyncReturnsOnlyTheSuccessfullyClaimedOwnership( // The versions must have been set by the checkpoint store. var versions = ownershipList.Select(ownership => ownership.Version).ToList(); - ownershipList.Clear(); // Use a valid eTag when 'i' is odd. This way, we can expect 'ownershipCount / 2' successful @@ -562,8 +522,8 @@ public async Task ClaimOwnershipAsyncReturnsOnlyTheSuccessfullyClaimedOwnership( }); } - IEnumerable claimedOwnershipList = await checkpointStore.ClaimOwnershipAsync(ownershipList, default); - IEnumerable expectedOwnership = ownershipList.Where(ownership => int.Parse(ownership.PartitionId) % 2 == 1); + var claimedOwnershipList = await checkpointStore.ClaimOwnershipAsync(ownershipList, default); + var expectedOwnership = ownershipList.Where(ownership => int.Parse(ownership.PartitionId) % 2 == 1); Assert.That(claimedOwnershipList, Is.Not.Null); Assert.That(claimedOwnershipList.Count, Is.EqualTo(expectedClaimedCount)); @@ -590,45 +550,33 @@ public async Task OwnershipClaimDoesNotInterfereWithOtherConsumerGroups() { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); - var ownershipList = new List(); - var firstOwnership = - new EventProcessorPartitionOwnership - { - FullyQualifiedNamespace = "namespace", - EventHubName = "eventHubName", - ConsumerGroup = "consumerGroup1", - OwnerIdentifier = "ownerIdentifier", - PartitionId = "partitionId" - }; - - ownershipList.Add(firstOwnership); - - await checkpointStore.ClaimOwnershipAsync(ownershipList, default); - // Version must have been set by the checkpoint store. + var firstOwnership = new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = "namespace", + EventHubName = "eventHubName", + ConsumerGroup = "consumerGroup1", + OwnerIdentifier = "ownerIdentifier", + PartitionId = "partitionId" + }; - var version = firstOwnership.Version; + await checkpointStore.ClaimOwnershipAsync(new[] { firstOwnership }, default); - ownershipList.Clear(); + // The first ownership's version should have been set by the checkpoint store. - var secondOwnership = - new EventProcessorPartitionOwnership - { - FullyQualifiedNamespace = "namespace", - EventHubName = "eventHubName", - ConsumerGroup = "consumerGroup2", - OwnerIdentifier = "ownerIdentifier", - PartitionId = "partitionId", - Version = version - }; - - ownershipList.Add(secondOwnership); - - Assert.That(async () => await checkpointStore.ClaimOwnershipAsync(ownershipList, default), Throws.InstanceOf()); + var secondOwnership = new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = "namespace", + EventHubName = "eventHubName", + ConsumerGroup = "consumerGroup2", + OwnerIdentifier = "ownerIdentifier", + PartitionId = "partitionId", + Version = firstOwnership.Version + }; - IEnumerable storedOwnershipList = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup1", default); + Assert.That(async () => await checkpointStore.ClaimOwnershipAsync(new[] { secondOwnership }, default), Throws.InstanceOf()); + var storedOwnershipList = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup1", default); Assert.That(storedOwnershipList, Is.Not.Null); Assert.That(storedOwnershipList.Count, Is.EqualTo(1)); @@ -648,45 +596,33 @@ public async Task OwnershipClaimDoesNotInterfereWithOtherEventHubs() { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); - var ownershipList = new List(); - var firstOwnership = - new EventProcessorPartitionOwnership - { - FullyQualifiedNamespace = "namespace", - EventHubName = "eventHubName1", - ConsumerGroup = "consumerGroup", - OwnerIdentifier = "ownerIdentifier", - PartitionId = "partitionId" - }; - - ownershipList.Add(firstOwnership); - await checkpointStore.ClaimOwnershipAsync(ownershipList, default); - - // Version must have been set by the checkpoint store. + var firstOwnership = new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = "namespace", + EventHubName = "eventHubName1", + ConsumerGroup = "consumerGroup", + OwnerIdentifier = "ownerIdentifier", + PartitionId = "partitionId" + }; - var version = firstOwnership.Version; + await checkpointStore.ClaimOwnershipAsync(new[] { firstOwnership }, default); - ownershipList.Clear(); + // The first ownership's version should have been set by the checkpoint store. - var secondOwnership = - new EventProcessorPartitionOwnership - { - FullyQualifiedNamespace = "namespace", - EventHubName = "eventHubName2", - ConsumerGroup = "consumerGroup", - OwnerIdentifier = "ownerIdentifier", - PartitionId = "partitionId", - Version = version - }; - - ownershipList.Add(secondOwnership); - - Assert.That(async () => await checkpointStore.ClaimOwnershipAsync(ownershipList, default), Throws.InstanceOf()); + var secondOwnership = new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = "namespace", + EventHubName = "eventHubName2", + ConsumerGroup = "consumerGroup", + OwnerIdentifier = "ownerIdentifier", + PartitionId = "partitionId", + Version = firstOwnership.Version + }; - IEnumerable storedOwnershipList = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName1", "consumerGroup", default); + Assert.That(async () => await checkpointStore.ClaimOwnershipAsync(new[] { secondOwnership }, default), Throws.InstanceOf()); + var storedOwnershipList = await checkpointStore.ListOwnershipAsync("namespace", "eventHubName1", "consumerGroup", default); Assert.That(storedOwnershipList, Is.Not.Null); Assert.That(storedOwnershipList.Count, Is.EqualTo(1)); @@ -706,44 +642,32 @@ public async Task OwnershipClaimDoesNotInterfereWithOtherNamespaces() { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); - var ownershipList = new List(); - var firstOwnership = - new EventProcessorPartitionOwnership - { - FullyQualifiedNamespace = "namespace1", - EventHubName = "eventHubName", - ConsumerGroup = "consumerGroup", - OwnerIdentifier = "ownerIdentifier", - PartitionId = "partitionId" - }; - ownershipList.Add(firstOwnership); - - await checkpointStore.ClaimOwnershipAsync(ownershipList, default); - - // Version must have been set by the checkpoint store. + var firstOwnership = new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = "namespace1", + EventHubName = "eventHubName", + ConsumerGroup = "consumerGroup", + OwnerIdentifier = "ownerIdentifier", + PartitionId = "partitionId" + }; - var version = firstOwnership.Version; + await checkpointStore.ClaimOwnershipAsync(new[] { firstOwnership }, default); - ownershipList.Clear(); + // The first ownership's version should have been set by the checkpoint store. - var secondOwnership = - new EventProcessorPartitionOwnership - { - FullyQualifiedNamespace = "namespace2", - EventHubName = "eventHubName", - ConsumerGroup = "consumerGroup", - OwnerIdentifier = "ownerIdentifier", - PartitionId = "partitionId", - Version = version - }; - - ownershipList.Add(secondOwnership); - - Assert.That(async () => await checkpointStore.ClaimOwnershipAsync(ownershipList, default), Throws.InstanceOf()); + var secondOwnership = new EventProcessorPartitionOwnership + { + FullyQualifiedNamespace = "namespace2", + EventHubName = "eventHubName", + ConsumerGroup = "consumerGroup", + OwnerIdentifier = "ownerIdentifier", + PartitionId = "partitionId", + Version = firstOwnership.Version + }; + Assert.That(async () => await checkpointStore.ClaimOwnershipAsync(new[] { secondOwnership }, default), Throws.InstanceOf()); var storedOwnershipList = await checkpointStore.ListOwnershipAsync("namespace1", "eventHubName", "consumerGroup", default); Assert.That(storedOwnershipList, Is.Not.Null); @@ -764,7 +688,6 @@ public async Task ListOwnershipFailsWhenContainerDoesNotExist() { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, $"test-container-{Guid.NewGuid()}"); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); Assert.That(async () => await checkpointStore.ListOwnershipAsync("namespace", "eventHubName", "consumerGroup", default), Throws.InstanceOf()); @@ -783,7 +706,6 @@ public async Task ListCheckpointsFailsWhenContainerDoesNotExist() { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, $"test-container-{Guid.NewGuid()}"); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); Assert.That(async () => await checkpointStore.ListCheckpointsAsync("namespace", "eventHubName", "consumerGroup", default), Throws.InstanceOf()); @@ -802,7 +724,6 @@ public async Task CheckpointUpdateFailsWhenContainerDoesNotExist() { var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString; var containerClient = new BlobContainerClient(storageConnectionString, $"test-container-{Guid.NewGuid()}"); - var checkpointStore = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy); var checkpoint = new EventProcessorCheckpoint @@ -1004,8 +925,8 @@ await checkpointStore.UpdateCheckpointAsync(new EventProcessorCheckpoint PartitionId = "partitionId" }, mockEvent, default); - IEnumerable storedCheckpointsList1 = await checkpointStore.ListCheckpointsAsync("namespace", "eventHubName", "consumerGroup1", default); - IEnumerable storedCheckpointsList2 = await checkpointStore.ListCheckpointsAsync("namespace", "eventHubName", "consumerGroup2", default); + var storedCheckpointsList1 = await checkpointStore.ListCheckpointsAsync("namespace", "eventHubName", "consumerGroup1", default); + var storedCheckpointsList2 = await checkpointStore.ListCheckpointsAsync("namespace", "eventHubName", "consumerGroup2", default); Assert.That(storedCheckpointsList1, Is.Not.Null); Assert.That(storedCheckpointsList1.Count, Is.EqualTo(1)); @@ -1050,8 +971,8 @@ await checkpointStore.UpdateCheckpointAsync(new EventProcessorCheckpoint PartitionId = "partitionId" }, mockEvent, default); - IEnumerable storedCheckpointsList1 = await checkpointStore.ListCheckpointsAsync("namespace", "eventHubName1", "consumerGroup", default); - IEnumerable storedCheckpointsList2 = await checkpointStore.ListCheckpointsAsync("namespace", "eventHubName2", "consumerGroup", default); + var storedCheckpointsList1 = await checkpointStore.ListCheckpointsAsync("namespace", "eventHubName1", "consumerGroup", default); + var storedCheckpointsList2 = await checkpointStore.ListCheckpointsAsync("namespace", "eventHubName2", "consumerGroup", default); Assert.That(storedCheckpointsList1, Is.Not.Null); Assert.That(storedCheckpointsList1.Count, Is.EqualTo(1)); @@ -1096,8 +1017,8 @@ await checkpointStore.UpdateCheckpointAsync(new EventProcessorCheckpoint PartitionId = "partitionId" }, mockEvent, default); - IEnumerable storedCheckpointsList1 = await checkpointStore.ListCheckpointsAsync("namespace1", "eventHubName", "consumerGroup", default); - IEnumerable storedCheckpointsList2 = await checkpointStore.ListCheckpointsAsync("namespace2", "eventHubName", "consumerGroup", default); + var storedCheckpointsList1 = await checkpointStore.ListCheckpointsAsync("namespace1", "eventHubName", "consumerGroup", default); + var storedCheckpointsList2 = await checkpointStore.ListCheckpointsAsync("namespace2", "eventHubName", "consumerGroup", default); Assert.That(storedCheckpointsList1, Is.Not.Null); Assert.That(storedCheckpointsList1.Count, Is.EqualTo(1)); @@ -1142,13 +1063,13 @@ await checkpointStore.UpdateCheckpointAsync(new EventProcessorCheckpoint PartitionId = "partitionId2" }, mockEvent, default); - IEnumerable storedCheckpointsList = await checkpointStore.ListCheckpointsAsync("namespace", "eventHubName", "consumerGroup", default); + var storedCheckpointsList = await checkpointStore.ListCheckpointsAsync("namespace", "eventHubName", "consumerGroup", default); Assert.That(storedCheckpointsList, Is.Not.Null); Assert.That(storedCheckpointsList.Count, Is.EqualTo(2)); - EventProcessorCheckpoint storedCheckpoint1 = storedCheckpointsList.First(checkpoint => checkpoint.PartitionId == "partitionId1"); - EventProcessorCheckpoint storedCheckpoint2 = storedCheckpointsList.First(checkpoint => checkpoint.PartitionId == "partitionId2"); + var storedCheckpoint1 = storedCheckpointsList.First(checkpoint => checkpoint.PartitionId == "partitionId1"); + var storedCheckpoint2 = storedCheckpointsList.First(checkpoint => checkpoint.PartitionId == "partitionId2"); Assert.That(storedCheckpoint1, Is.Not.Null); Assert.That(storedCheckpoint2, Is.Not.Null); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/BlobCheckpointStore/BlobsCheckpointStoreTests.cs similarity index 89% rename from sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs rename to sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/BlobCheckpointStore/BlobsCheckpointStoreTests.cs index 7f70beee1e508..64deeb0b286aa 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/BlobCheckpointStore/BlobsCheckpointStoreTests.cs @@ -11,21 +11,21 @@ using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Core; using Azure.Messaging.EventHubs.Primitives; -using Azure.Messaging.EventHubs.Processor.Diagnostics; -using Azure.Messaging.EventHubs.Tests; +using Azure.Messaging.EventHubs.Processor; using Azure.Storage; using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; using Moq; using NUnit.Framework; -namespace Azure.Messaging.EventHubs.Processor.Tests +namespace Azure.Messaging.EventHubs.Tests { /// /// The suite of tests for the /// class. /// /// + [TestFixture] public class BlobsCheckpointStoreTests { private const string FullyQualifiedNamespace = "FqNs"; @@ -37,6 +37,8 @@ public class BlobsCheckpointStoreTests private const string MatchingEtag = "etag"; private const string WrongEtag = "wrongEtag"; private const string PartitionId = "1"; + + private readonly EventHubsRetryPolicy DefaultRetryPolicy = new BasicRetryPolicy(new EventHubsRetryOptions()); private readonly string OwnershipIdentifier = Guid.NewGuid().ToString(); /// @@ -89,12 +91,13 @@ public async Task ListOwnershipLogsStartAndComplete() "snapshot", new Dictionary {{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}}) }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, - new BasicRetryPolicy(new EventHubsRetryOptions())); - var mockLog = new Mock(); + + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, DefaultRetryPolicy); + + var mockLog = new Mock(); target.Logger = mockLog.Object; - await target.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + await target.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None); mockLog.Verify(m => m.ListOwnershipStart(FullyQualifiedNamespace, EventHubName, ConsumerGroup)); mockLog.Verify(m => m.ListOwnershipComplete(FullyQualifiedNamespace, EventHubName, ConsumerGroup, blobList.Count)); @@ -108,13 +111,12 @@ public async Task ListOwnershipLogsStartAndComplete() public void ListOwnershipLogsErrorOnException() { var ex = new RequestFailedException(0, "foo", BlobErrorCode.ContainerNotFound.ToString(), null); + var target = new BlobsCheckpointStore(new MockBlobContainerClient(getBlobsAsyncException: ex), DefaultRetryPolicy); - var target = new BlobsCheckpointStore(new MockBlobContainerClient(getBlobsAsyncException: ex), - new BasicRetryPolicy(new EventHubsRetryOptions())); - var mockLog = new Mock(); + var mockLog = new Mock(); target.Logger = mockLog.Object; - Assert.That(async () => await target.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()), Throws.InstanceOf()); + Assert.That(async () => await target.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None), Throws.InstanceOf()); mockLog.Verify(m => m.ListOwnershipError(FullyQualifiedNamespace, EventHubName, ConsumerGroup, ex.Message)); } @@ -138,15 +140,14 @@ public async Task ClaimOwnershipLogsStartAndComplete() } }; - var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", _ => { }); - var target = new BlobsCheckpointStore(mockBlobContainerClient, - new BasicRetryPolicy(new EventHubsRetryOptions())); - var mockLog = new Mock(); + var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/ownership/1", _ => { }); + var target = new BlobsCheckpointStore(mockBlobContainerClient, DefaultRetryPolicy); + var mockLog = new Mock(); target.Logger = mockLog.Object; - var result = (await target.ClaimOwnershipAsync(partitionOwnership, new CancellationToken())).ToList(); - + var result = (await target.ClaimOwnershipAsync(partitionOwnership, CancellationToken.None)).ToList(); CollectionAssert.AreEquivalent(partitionOwnership, result); + mockLog.Verify(m => m.ClaimOwnershipStart(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, OwnershipIdentifier)); mockLog.Verify(m => m.ClaimOwnershipComplete(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, OwnershipIdentifier)); } @@ -172,13 +173,13 @@ public void ClaimOwnershipLogsErrors() }; var expectedException = new DllNotFoundException("BOOM!"); - var mockLog = new Mock(); - var mockContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", client => client.UploadBlobException = expectedException); - var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); + var mockLog = new Mock(); + var mockContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/ownership/1", client => client.UploadBlobException = expectedException); + var target = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); target.Logger = mockLog.Object; - Assert.That(async () => await target.ClaimOwnershipAsync(partitionOwnership, new CancellationToken()), Throws.Exception.EqualTo(expectedException)); + Assert.That(async () => await target.ClaimOwnershipAsync(partitionOwnership, CancellationToken.None), Throws.Exception.EqualTo(expectedException)); mockLog.Verify(m => m.ClaimOwnershipError(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, OwnershipIdentifier, expectedException.Message)); } @@ -202,15 +203,15 @@ public async Task ClaimOwnershipForNewPartitionLogsOwnershipClaimed() } }; - var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", _ => { }); - var target = new BlobsCheckpointStore(mockBlobContainerClient, - new BasicRetryPolicy(new EventHubsRetryOptions())); - var mockLog = new Mock(); - target.Logger = mockLog.Object; + var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/ownership/1", _ => { }); + var target = new BlobsCheckpointStore(mockBlobContainerClient, DefaultRetryPolicy); - var result = (await target.ClaimOwnershipAsync(partitionOwnership, new CancellationToken())).ToList(); + var mockLog = new Mock(); + target.Logger = mockLog.Object; + var result = (await target.ClaimOwnershipAsync(partitionOwnership, CancellationToken.None)).ToList(); CollectionAssert.AreEquivalent(partitionOwnership, result); + mockLog.Verify(m => m.OwnershipClaimed(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, OwnershipIdentifier)); } @@ -237,15 +238,15 @@ public async Task ClaimOwnershipForExistingPartitionLogsOwnershipClaimed() } }; - var mockContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", client => client.BlobInfo = blobInfo); - var target = new BlobsCheckpointStore(mockContainerClient, - new BasicRetryPolicy(new EventHubsRetryOptions())); - var mockLog = new Mock(); - target.Logger = mockLog.Object; + var mockContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/ownership/1", client => client.BlobInfo = blobInfo); + var target = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); - var result = (await target.ClaimOwnershipAsync(partitionOwnership, new CancellationToken())).ToList(); + var mockLog = new Mock(); + target.Logger = mockLog.Object; + var result = (await target.ClaimOwnershipAsync(partitionOwnership, CancellationToken.None)).ToList(); CollectionAssert.AreEquivalent(partitionOwnership, result); + mockLog.Verify(m => m.OwnershipClaimed(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, OwnershipIdentifier)); } @@ -272,15 +273,15 @@ public async Task ClaimOwnershipForExistingPartitionWithWrongEtagLogsOwnershipNo } }; - var mockContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", client => client.BlobInfo = blobInfo); - var target = new BlobsCheckpointStore(mockContainerClient, - new BasicRetryPolicy(new EventHubsRetryOptions())); - var mockLog = new Mock(); - target.Logger = mockLog.Object; + var mockContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/ownership/1", client => client.BlobInfo = blobInfo); + var target = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); - var result = (await target.ClaimOwnershipAsync(partitionOwnership, new CancellationToken())).ToList(); + var mockLog = new Mock(); + target.Logger = mockLog.Object; + var result = (await target.ClaimOwnershipAsync(partitionOwnership, CancellationToken.None)).ToList(); CollectionAssert.IsEmpty(result); + mockLog.Verify(m => m.OwnershipNotClaimable(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, OwnershipIdentifier, It.Is(e => e.Contains(BlobErrorCode.ConditionNotMet.ToString())))); } @@ -289,7 +290,7 @@ public async Task ClaimOwnershipForExistingPartitionWithWrongEtagLogsOwnershipNo /// /// [Test] - public void ClaimOwnershipForMissingPartitionThrowsAndLogsOwnershipNotClaimable() + public void ClaimOwnershipForMissingPartitionThrowsAndLogsClaimOwnershipError() { var partitionOwnership = new List { @@ -305,14 +306,14 @@ public void ClaimOwnershipForMissingPartitionThrowsAndLogsOwnershipNotClaimable( } }; - var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", _ => { }); - var target = new BlobsCheckpointStore(mockBlobContainerClient, - new BasicRetryPolicy(new EventHubsRetryOptions())); + var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/ownership/1", _ => { }); + var target = new BlobsCheckpointStore(mockBlobContainerClient, DefaultRetryPolicy); - var mockLog = new Mock(); + var mockLog = new Mock(); target.Logger = mockLog.Object; - Assert.That(async () => await target.ClaimOwnershipAsync(partitionOwnership, new CancellationToken()), Throws.InstanceOf()); + Assert.That(async () => await target.ClaimOwnershipAsync(partitionOwnership, CancellationToken.None), Throws.InstanceOf()); + mockLog.Verify(m => m.ClaimOwnershipError(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, OwnershipIdentifier, It.Is(e => e.Contains(BlobErrorCode.BlobNotFound.ToString())))); } /// @@ -334,12 +335,13 @@ public async Task ListCheckpointsLogsStartAndComplete() {BlobMetadataKey.Offset, "0"} }) }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, - new BasicRetryPolicy(new EventHubsRetryOptions())); - var mockLog = new Mock(); + + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, DefaultRetryPolicy); + + var mockLog = new Mock(); target.Logger = mockLog.Object; - await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None); mockLog.Verify(m => m.ListCheckpointsStart(FullyQualifiedNamespace, EventHubName, ConsumerGroup)); mockLog.Verify(m => m.ListCheckpointsComplete(FullyQualifiedNamespace, EventHubName, ConsumerGroup, blobList.Count)); @@ -368,8 +370,8 @@ public async Task ListCheckpointsUsesOffsetAsTheStartingPositionWhenPresent() {BlobMetadataKey.SequenceNumber, "7777"} }) }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, new BasicRetryPolicy(new EventHubsRetryOptions())); - var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, DefaultRetryPolicy); + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None); Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); Assert.That(checkpoints.Single().StartingPosition, Is.EqualTo(expectedStartingPosition)); @@ -397,8 +399,9 @@ public async Task ListCheckpointsUsesSequenceNumberAsTheStartingPositionWhenNoOf {BlobMetadataKey.SequenceNumber, expectedSequence.ToString()} }) }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, new BasicRetryPolicy(new EventHubsRetryOptions())); - var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, DefaultRetryPolicy); + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None); Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); Assert.That(checkpoints.Single().StartingPosition, Is.EqualTo(expectedStartingPosition)); @@ -425,12 +428,12 @@ public async Task ListCheckpointsConsidersDataInvalidWithNoOffsetOrSequenceNumbe }) }; - var mockLogger = new Mock(); - var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, new BasicRetryPolicy(new EventHubsRetryOptions())); + var mockLogger = new Mock(); + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, DefaultRetryPolicy); target.Logger = mockLogger.Object; - var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None); Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); Assert.That(checkpoints.Any(), Is.False, "No valid checkpoints should exist."); @@ -458,6 +461,7 @@ public async Task ListCheckpointsPreferredNewCheckpointOverLegacy() {BlobMetadataKey.SequenceNumber, "960182"}, {BlobMetadataKey.Offset, "14"} }), + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/{partitionId}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -465,6 +469,7 @@ public async Task ListCheckpointsPreferredNewCheckpointOverLegacy() }; var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/{partitionId}", client => { client.Content = Encoding.UTF8.GetBytes("{" + @@ -474,11 +479,11 @@ public async Task ListCheckpointsPreferredNewCheckpointOverLegacy() "\"Epoch\":386," + "\"Offset\":\"13\"," + "\"SequenceNumber\":960180" + - "}"); + "}"); }); - var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), initializeWithLegacyCheckpoints: true); - var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + var target = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy, initializeWithLegacyCheckpoints: true); + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None); Assert.That(checkpoints, Has.One.Items, "A single checkpoint should have been returned."); Assert.That(checkpoints.Single().StartingPosition, Is.EqualTo(EventPosition.FromOffset(14, false))); @@ -492,8 +497,9 @@ public async Task ListCheckpointsPreferredNewCheckpointOverLegacy() [Test] public async Task ListCheckpointsMergesNewAndLegacyCheckpoints() { - string partitionId1 = Guid.NewGuid().ToString(); - string partitionId2 = Guid.NewGuid().ToString(); + var partitionId1 = Guid.NewGuid().ToString(); + var partitionId2 = Guid.NewGuid().ToString(); + var blobList = new List { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/{partitionId1}", @@ -506,6 +512,7 @@ public async Task ListCheckpointsMergesNewAndLegacyCheckpoints() {BlobMetadataKey.SequenceNumber, "960182"}, {BlobMetadataKey.Offset, "14"} }), + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/{partitionId2}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -522,11 +529,11 @@ public async Task ListCheckpointsMergesNewAndLegacyCheckpoints() "\"Epoch\":386," + "\"Offset\":\"13\"," + "\"SequenceNumber\":960180" + - "}"); + "}"); }); - var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), initializeWithLegacyCheckpoints: true); - var checkpoints = (await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken())).ToArray(); + var target = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy, initializeWithLegacyCheckpoints: true); + var checkpoints = (await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None)).ToArray(); Assert.That(checkpoints, Has.Exactly(2).Items, "Two checkpoints should have been returned."); Assert.That(checkpoints[0].StartingPosition, Is.EqualTo(EventPosition.FromOffset(14, false))); @@ -563,8 +570,8 @@ public async Task ListCheckpointsUsesOffsetAsTheStartingPositionWhenPresentInLeg "}"); }); - var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), initializeWithLegacyCheckpoints: true); - var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + var target = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy, initializeWithLegacyCheckpoints: true); + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None); Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); Assert.That(checkpoints.Single().StartingPosition, Is.EqualTo(EventPosition.FromOffset(13, false))); @@ -590,20 +597,21 @@ public async Task ListCheckpointSkipsCheckpointsWhenOffsetIsNullOrEmptyInLegacyC }; var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client => { client.Content = Encoding.UTF8.GetBytes("{" + - "\"Offset\":" + offset + "," + - "\"SequenceNumber\":" + sequenceNumber + "," + - "\"PartitionId\":\"8\"," + - "\"Owner\":\"cc397fe0-6771-4eaa-a8df-1997efeb3c87\"," + - "\"Token\":\"ab00a395-4c39-4939-89d5-10c04b4553af\"," + - "\"Epoch\":3" + + "\"Offset\":" + offset + "," + + "\"SequenceNumber\":" + sequenceNumber + "," + + "\"PartitionId\":\"8\"," + + "\"Owner\":\"cc397fe0-6771-4eaa-a8df-1997efeb3c87\"," + + "\"Token\":\"ab00a395-4c39-4939-89d5-10c04b4553af\"," + + "\"Epoch\":3" + "}"); }); - var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), initializeWithLegacyCheckpoints: true); - var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + var target = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy, initializeWithLegacyCheckpoints: true); + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None); Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); Assert.That(checkpoints, Is.Empty, "No valid checkpoints should exist."); @@ -625,6 +633,7 @@ public async Task ListCheckpointsConsidersDataInvalidWithNoOffsetOrSequenceNumbe }; var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client => { client.Content = Encoding.UTF8.GetBytes("{" + @@ -634,12 +643,12 @@ public async Task ListCheckpointsConsidersDataInvalidWithNoOffsetOrSequenceNumbe "\"Epoch\":386}"); }); - var mockLogger = new Mock(); - var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), initializeWithLegacyCheckpoints: true); + var mockLogger = new Mock(); + var target = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy, initializeWithLegacyCheckpoints: true); target.Logger = mockLogger.Object; - var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None); Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); Assert.That(checkpoints.Any(), Is.False, "No valid checkpoints should exist."); @@ -671,12 +680,12 @@ public async Task ListCheckpointsConsidersDataInvalidWithLegacyCheckpointBlobCon client.Content = Encoding.UTF8.GetBytes(json); }); - var mockLogger = new Mock(); - var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), initializeWithLegacyCheckpoints: true); + var mockLogger = new Mock(); + var target = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy, initializeWithLegacyCheckpoints: true); target.Logger = mockLogger.Object; - var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None); Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); Assert.That(checkpoints.Any(), Is.False, "No valid checkpoints should exist."); @@ -692,13 +701,13 @@ public async Task ListCheckpointsConsidersDataInvalidWithLegacyCheckpointBlobCon public void ListCheckpointsLogsErrors() { var expectedException = new DllNotFoundException("BOOM!"); - var mockLog = new Mock(); + var mockLog = new Mock(); var mockContainerClient = new MockBlobContainerClient() { GetBlobsAsyncException = expectedException }; - var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); + var target = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); target.Logger = mockLog.Object; - Assert.That(async () => await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()), Throws.Exception.EqualTo(expectedException)); + Assert.That(async () => await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None), Throws.Exception.EqualTo(expectedException)); mockLog.Verify(m => m.ListCheckpointsError(FullyQualifiedNamespace, EventHubName, ConsumerGroup, expectedException.Message)); } @@ -719,12 +728,12 @@ public async Task ListCheckpointsLogsInvalidCheckpoint() "snapshot", new Dictionary {{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}}) }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, - new BasicRetryPolicy(new EventHubsRetryOptions())); - var mockLog = new Mock(); + + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, DefaultRetryPolicy); + var mockLog = new Mock(); target.Logger = mockLog.Object; - await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None); mockLog.Verify(m => m.InvalidCheckpointFound(partitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup)); } @@ -733,16 +742,15 @@ public async Task ListCheckpointsLogsInvalidCheckpoint() /// /// [Test] - public void ListCheckpointsForMissingPartitionThrowsAndLogsOwnershipNotClaimable() + public void ListCheckpointsForMissingPartitionThrowsAndLogsListCheckpointsError() { var ex = new RequestFailedException(0, "foo", BlobErrorCode.ContainerNotFound.ToString(), null); - var target = new BlobsCheckpointStore(new MockBlobContainerClient(getBlobsAsyncException: ex), - new BasicRetryPolicy(new EventHubsRetryOptions())); - var mockLog = new Mock(); + DefaultRetryPolicy); + var mockLog = new Mock(); target.Logger = mockLog.Object; - Assert.That(async () => await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()), Throws.InstanceOf()); + Assert.That(async () => await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None), Throws.InstanceOf()); mockLog.Verify(m => m.ListCheckpointsError(FullyQualifiedNamespace, EventHubName, ConsumerGroup, ex.Message)); } @@ -773,18 +781,19 @@ public async Task UpdateCheckpointLogsStartAndCompleteWhenTheBlobExists() }; var mockContainerClient = new MockBlobContainerClient() { Blobs = blobList }; - mockContainerClient.AddBlobClient("fqns/name/group/checkpoint/1", client => + + mockContainerClient.AddBlobClient($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/1", client => { client.BlobInfo = blobInfo; client.UploadBlobException = new Exception("Upload should not be called"); }); - var target = new BlobsCheckpointStore(mockContainerClient, - new BasicRetryPolicy(new EventHubsRetryOptions())); - var mockLog = new Mock(); + var target = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); + + var mockLog = new Mock(); target.Logger = mockLog.Object; - await target.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), new CancellationToken()); + await target.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), CancellationToken.None); mockLog.Verify(log => log.UpdateCheckpointStart(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup)); mockLog.Verify(log => log.UpdateCheckpointComplete(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup)); } @@ -812,14 +821,16 @@ public async Task UpdateCheckpointLogsStartAndCompleteWhenTheBlobDoesNotExist() "snapshot", new Dictionary {{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}}) }; + var mockBlobContainerClient = new MockBlobContainerClient() { Blobs = blobList }; - mockBlobContainerClient.AddBlobClient("fqns/name/group/checkpoint/1", _ => { }); - var target = new BlobsCheckpointStore(mockBlobContainerClient, - new BasicRetryPolicy(new EventHubsRetryOptions())); - var mockLog = new Mock(); + mockBlobContainerClient.AddBlobClient($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/1", _ => { }); + + var target = new BlobsCheckpointStore(mockBlobContainerClient, DefaultRetryPolicy); + + var mockLog = new Mock(); target.Logger = mockLog.Object; - await target.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), new CancellationToken()); + await target.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), CancellationToken.None); mockLog.Verify(log => log.UpdateCheckpointStart(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup)); mockLog.Verify(log => log.UpdateCheckpointComplete(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup)); } @@ -841,19 +852,18 @@ public void UpdateCheckpointLogsErrorsWhenTheBlobExists() }; var expectedException = new DllNotFoundException("BOOM!"); - var mockLog = new Mock(); + var mockLog = new Mock(); - var mockContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/checkpoint/1", client => + var mockContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/1", client => { client.BlobClientSetMetadataException = expectedException; client.UploadBlobException = new Exception("Upload should not be called"); }); - var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); - + var target = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); target.Logger = mockLog.Object; - Assert.That(async () => await target.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), new CancellationToken()), Throws.Exception.EqualTo(expectedException)); + Assert.That(async () => await target.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), CancellationToken.None), Throws.Exception.EqualTo(expectedException)); mockLog.Verify(log => log.UpdateCheckpointError(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, expectedException.Message)); } @@ -874,16 +884,17 @@ public void UpdateCheckpointLogsErrorsWhenTheBlobDoesNotExist() }; var expectedException = new DllNotFoundException("BOOM!"); - var mockLog = new Mock(); - var mockContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/checkpoint/1", client => + var mockLog = new Mock(); + + var mockContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/1", client => { client.UploadBlobException = expectedException; }); - var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); + var target = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); target.Logger = mockLog.Object; - Assert.That(async () => await target.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), new CancellationToken()), Throws.Exception.EqualTo(expectedException)); + Assert.That(async () => await target.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), CancellationToken.None), Throws.Exception.EqualTo(expectedException)); mockLog.Verify(log => log.UpdateCheckpointError(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, expectedException.Message)); } @@ -903,14 +914,13 @@ public void UpdateCheckpointForMissingContainerThrowsAndLogsCheckpointUpdateErro }; var ex = new RequestFailedException(404, BlobErrorCode.ContainerNotFound.ToString(), BlobErrorCode.ContainerNotFound.ToString(), null); - var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/checkpoint/1", client => client.UploadBlobException = ex); - var target = new BlobsCheckpointStore(mockBlobContainerClient, - new BasicRetryPolicy(new EventHubsRetryOptions())); - var mockLog = new Mock(); - target.Logger = mockLog.Object; + var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/1", client => client.UploadBlobException = ex); + var target = new BlobsCheckpointStore(mockBlobContainerClient, DefaultRetryPolicy); - Assert.That(async () => await target.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), new CancellationToken()), Throws.InstanceOf()); + var mockLog = new Mock(); + target.Logger = mockLog.Object; + Assert.That(async () => await target.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), CancellationToken.None), Throws.InstanceOf()); mockLog.Verify(m => m.UpdateCheckpointError(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, ex.Message)); } @@ -927,7 +937,7 @@ public void ListOwnershipAsyncSurfacesNonRetriableExceptions(Exception exception var serviceCalls = 0; var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); mockContainerClient.GetBlobsAsyncCallback = (traits, states, prefix, token) => { @@ -955,7 +965,7 @@ public void ListOwnershipAsyncSurfacesNonRetriableExceptions(Exception exception public async Task ListOwnershipAsyncDelegatesTheCancellationToken() { var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); using var cancellationSource = new CancellationTokenSource(); var stateBeforeCancellation = default(bool?); @@ -1024,7 +1034,8 @@ public void ClaimOwnershipAsyncSurfacesNonRetriableExceptionsWhenVersionIsNull(E throw exception; }; }); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); + + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -1066,7 +1077,8 @@ public void ClaimOwnershipAsyncSurfacesNonRetriableExceptionsWhenVersionIsNotNul throw exception; }; }); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); + + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -1134,8 +1146,8 @@ public async Task ClaimOwnershipAsyncDelegatesTheCancellationToken(string versio }; } }); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); await checkpointStore.ClaimOwnershipAsync(new List() { ownership }, cancellationSource.Token); Assert.That(stateBeforeCancellation.HasValue, Is.True, "State before cancellation should have been captured."); @@ -1171,9 +1183,8 @@ public void ListCheckpointsAsyncSurfacesNonRetriableExceptions(Exception excepti { var expectedServiceCalls = 1; var serviceCalls = 0; - var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); mockContainerClient.GetBlobsAsyncCallback = (traits, states, prefix, token) => { @@ -1201,7 +1212,7 @@ public void ListCheckpointsAsyncSurfacesNonRetriableExceptions(Exception excepti public async Task ListCheckpointsAsyncDelegatesTheCancellationToken() { var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); using var cancellationSource = new CancellationTokenSource(); var stateBeforeCancellation = default(bool?); @@ -1252,8 +1263,8 @@ public void UpdateCheckpointAsyncSurfacesNonRetriableExceptionsWhenTheBlobExists { var expectedServiceCalls = 1; var serviceCalls = 0; - var blobInfo = BlobsModelFactory.BlobInfo(new ETag($@"""{MatchingEtag}"""), DateTime.UtcNow); + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/checkpoint/pid", client => { client.BlobInfo = blobInfo; @@ -1264,7 +1275,8 @@ public void UpdateCheckpointAsyncSurfacesNonRetriableExceptionsWhenTheBlobExists throw exception; }; }); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); + + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); var checkpoint = new EventProcessorCheckpoint { @@ -1313,7 +1325,8 @@ public void UpdateCheckpointAsyncSurfacesNonRetriableExceptionsWhenTheBlobDoesNo throw exception; }; }); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); + + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -1354,7 +1367,8 @@ public async Task UpdateCheckpointAsyncDelegatesTheCancellationTokenWhenTheBlobE } }; }); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); + + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); var checkpoint = new EventProcessorCheckpoint { @@ -1405,8 +1419,7 @@ public async Task UpdateCheckpointAsyncDelegatesTheCancellationTokenWhenTheBlobD }; }); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); - + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); await checkpointStore.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), cancellationSource.Token); Assert.That(stateBeforeCancellation.HasValue, Is.True, "State before cancellation should have been captured."); @@ -1458,12 +1471,13 @@ public async Task GetCheckpointLogsStartAndComplete() {BlobMetadataKey.Offset, "0"} }) }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, - new BasicRetryPolicy(new EventHubsRetryOptions())); - var mockLog = new Mock(); + + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, DefaultRetryPolicy); + + var mockLog = new Mock(); target.Logger = mockLog.Object; - await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None); mockLog.Verify(m => m.GetCheckpointStart(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0")); mockLog.Verify(m => m.GetCheckpointComplete(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0")); @@ -1492,8 +1506,9 @@ public async Task GetCheckpointUsesOffsetAsTheStartingPositionWhenPresent() {BlobMetadataKey.SequenceNumber, "7777"} }) }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, new BasicRetryPolicy(new EventHubsRetryOptions())); - var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, DefaultRetryPolicy); + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None); Assert.That(checkpoint, Is.Not.Null, "A set of checkpoint should have been returned."); Assert.That(checkpoint.StartingPosition, Is.EqualTo(expectedStartingPosition)); @@ -1521,8 +1536,9 @@ public async Task GetCheckpointUsesSequenceNumberAsTheStartingPositionWhenNoOffs {BlobMetadataKey.SequenceNumber, expectedSequence.ToString()} }) }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, new BasicRetryPolicy(new EventHubsRetryOptions())); - var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, DefaultRetryPolicy); + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None); Assert.That(checkpoint, Is.Not.Null, "A set of checkpoints should have been returned."); Assert.That(checkpoint.StartingPosition, Is.EqualTo(expectedStartingPosition)); @@ -1547,15 +1563,14 @@ public async Task GetCheckpointConsidersDataInvalidWithNoOffsetOrSequenceNumber( }) }; - var mockLogger = new Mock(); - var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, new BasicRetryPolicy(new EventHubsRetryOptions())); + var mockLogger = new Mock(); + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, DefaultRetryPolicy); target.Logger = mockLogger.Object; - var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None); Assert.That(checkpoint, Is.Null, "No valid checkpoints should exist."); - mockLogger.Verify(log => log.InvalidCheckpointFound("0", FullyQualifiedNamespace, EventHubName, ConsumerGroup)); } @@ -1578,6 +1593,7 @@ public async Task GetCheckpointPreferredNewCheckpointOverLegacy() {BlobMetadataKey.SequenceNumber, "960182"}, {BlobMetadataKey.Offset, "14"} }), + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/0", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -1585,6 +1601,7 @@ public async Task GetCheckpointPreferredNewCheckpointOverLegacy() }; var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/0", client => { client.Content = Encoding.UTF8.GetBytes("{" + @@ -1597,8 +1614,8 @@ public async Task GetCheckpointPreferredNewCheckpointOverLegacy() "}"); }); - var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), initializeWithLegacyCheckpoints: true); - var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + var target = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy, initializeWithLegacyCheckpoints: true); + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None); Assert.That(checkpoint, Is.Not.Null, "A single checkpoint should have been returned."); Assert.That(checkpoint.StartingPosition, Is.EqualTo(EventPosition.FromOffset(14, false))); @@ -1621,6 +1638,7 @@ public async Task GetCheckpointsUsesOffsetAsTheStartingPositionWhenPresentInLega }; var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client => { client.Content = Encoding.UTF8.GetBytes("{" + @@ -1633,8 +1651,8 @@ public async Task GetCheckpointsUsesOffsetAsTheStartingPositionWhenPresentInLega "}"); }); - var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), initializeWithLegacyCheckpoints: true); - var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + var target = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy, initializeWithLegacyCheckpoints: true); + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None); Assert.That(checkpoint, Is.Not.Null, "A set of checkpoints should have been returned."); Assert.That(checkpoint.StartingPosition, Is.EqualTo(EventPosition.FromOffset(13, false))); @@ -1660,6 +1678,7 @@ public async Task GetCheckpointSkipsCheckpointsWhenOffsetIsNullOrEmptyInLegacyCh }; var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client => { client.Content = Encoding.UTF8.GetBytes("{" + @@ -1672,8 +1691,8 @@ public async Task GetCheckpointSkipsCheckpointsWhenOffsetIsNullOrEmptyInLegacyCh "}"); }); - var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), initializeWithLegacyCheckpoints: true); - var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + var target = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy, initializeWithLegacyCheckpoints: true); + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None); Assert.That(checkpoint, Is.Null, "No valid checkpoints should exist."); } @@ -1694,24 +1713,25 @@ public async Task GetCheckpointConsidersDataInvalidWithNoOffsetOrSequenceNumberL }; var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client => { client.Content = Encoding.UTF8.GetBytes("{" + "\"PartitionId\":\"0\"," + "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + - "\"Epoch\":386}"); + "\"Epoch\":386" + + "}"); }); - var mockLogger = new Mock(); - var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), initializeWithLegacyCheckpoints: true); + var mockLogger = new Mock(); + var target = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy, initializeWithLegacyCheckpoints: true); target.Logger = mockLogger.Object; - var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None); Assert.That(checkpoint, Is.Null, "No valid checkpoints should exist."); - mockLogger.Verify(log => log.InvalidCheckpointFound("0", FullyQualifiedNamespace, EventHubName, ConsumerGroup)); } @@ -1739,15 +1759,14 @@ public async Task GetCheckpointConsidersDataInvalidWithLegacyCheckpointBlobConta client.Content = Encoding.UTF8.GetBytes(json); }); - var mockLogger = new Mock(); - var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), initializeWithLegacyCheckpoints: true); + var mockLogger = new Mock(); + var target = new BlobsCheckpointStore(containerClient, DefaultRetryPolicy, initializeWithLegacyCheckpoints: true); target.Logger = mockLogger.Object; - var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None); Assert.That(checkpoint, Is.Null, "No valid checkpoints should exist."); - mockLogger.Verify(log => log.InvalidCheckpointFound("0", FullyQualifiedNamespace, EventHubName, ConsumerGroup)); } @@ -1759,10 +1778,10 @@ public async Task GetCheckpointConsidersDataInvalidWithLegacyCheckpointBlobConta public void GetCheckpointLogsErrors() { var expectedException = new DllNotFoundException("BOOM!"); - var mockLog = new Mock(); + var mockLog = new Mock(); var mockContainerClient = new MockBlobContainerClient() { GetBlobsAsyncException = expectedException }; - var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); + var target = new BlobsCheckpointStore(mockContainerClient, DefaultRetryPolicy); target.Logger = mockLog.Object; mockContainerClient.AddBlobClient($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/0", client => @@ -1770,7 +1789,7 @@ public void GetCheckpointLogsErrors() client.GetPropertiesException = expectedException; }); - Assert.That(async () => await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()), Throws.Exception.EqualTo(expectedException)); + Assert.That(async () => await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None), Throws.Exception.EqualTo(expectedException)); mockLog.Verify(m => m.GetCheckpointError(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", expectedException.Message)); } @@ -1789,12 +1808,13 @@ public async Task GetCheckpointLogsInvalidCheckpoint() "snapshot", new Dictionary {{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}}) }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, - new BasicRetryPolicy(new EventHubsRetryOptions())); - var mockLog = new Mock(); + + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, DefaultRetryPolicy); + + var mockLog = new Mock(); target.Logger = mockLog.Object; - await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, CancellationToken.None); mockLog.Verify(m => m.InvalidCheckpointFound("0", FullyQualifiedNamespace, EventHubName, ConsumerGroup)); } @@ -1806,13 +1826,12 @@ public async Task GetCheckpointLogsInvalidCheckpoint() public async Task GetCheckpointForMissingPartitionReturnsNull() { var ex = new RequestFailedException(0, "foo", BlobErrorCode.ContainerNotFound.ToString(), null); + var target = new BlobsCheckpointStore(new MockBlobContainerClient(getBlobsAsyncException: ex), DefaultRetryPolicy); - var target = new BlobsCheckpointStore(new MockBlobContainerClient(getBlobsAsyncException: ex), - new BasicRetryPolicy(new EventHubsRetryOptions())); - var mockLog = new Mock(); + var mockLog = new Mock(); target.Logger = mockLog.Object; - var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None); Assert.That(checkpoint, Is.Null); } @@ -1894,7 +1913,7 @@ public MockBlobClient(string blobName) Name = blobName; } - public override Task> SetMetadataAsync(IDictionary metadata, BlobRequestConditions conditions = null, CancellationToken cancellationToken = default(CancellationToken)) + public override Task> SetMetadataAsync(IDictionary metadata, BlobRequestConditions conditions = null, CancellationToken cancellationToken = default) { SetMetadataAsyncCallback?.Invoke(metadata, conditions, cancellationToken); @@ -1942,7 +1961,7 @@ public override async Task DownloadToAsync(Stream destination, Cancell return Mock.Of(); } - public override Task> GetPropertiesAsync(BlobRequestConditions conditions = null, CancellationToken cancellationToken = new CancellationToken()) + public override Task> GetPropertiesAsync(BlobRequestConditions conditions = null, CancellationToken cancellationToken = default) { if (GetPropertiesException != null) { diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/BlobsCheckpointStore.Diagnostics.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/BlobsCheckpointStore.Diagnostics.cs new file mode 100644 index 0000000000000..dfed39e07c549 --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/BlobsCheckpointStore.Diagnostics.cs @@ -0,0 +1,319 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using Azure.Messaging.EventHubs.Tests; +using Moq; + +namespace Azure.Messaging.EventHubs.Processor +{ + /// + /// A storage blob service that keeps track of checkpoints and ownership. + /// + /// + internal sealed partial class BlobsCheckpointStore + { + /// + /// The instance of which can be mocked for testing. + /// + /// + public IBlobEventLogger Logger { get; set; } = Mock.Of(); + + /// + /// Indicates that an attempt to retrieve a list of ownership has completed. + /// + /// + /// The fully qualified Event Hubs namespace the ownership are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership are associated with. + /// The amount of ownership received from the storage service. + /// + partial void ListOwnershipComplete(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + int ownershipCount) => + Logger.ListOwnershipComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, ownershipCount); + + /// + /// Indicates that an unhandled exception was encountered while retrieving a list of ownership. + /// + /// + /// The fully qualified Event Hubs namespace the ownership are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership are associated with. + /// The exception that occurred. + /// + partial void ListOwnershipError(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + Exception exception) => + Logger.ListOwnershipError(fullyQualifiedNamespace, eventHubName, consumerGroup, exception.Message); + + /// + /// Indicates that an attempt to retrieve a list of ownership has started. + /// + /// + /// The fully qualified Event Hubs namespace the ownership are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership are associated with. + /// + partial void ListOwnershipStart(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup) => + Logger.ListOwnershipStart(fullyQualifiedNamespace, eventHubName, consumerGroup); + + /// + /// Indicates that an attempt to retrieve a list of checkpoints has completed. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoints are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoints are associated with. + /// The amount of checkpoints received from the storage service. + /// + partial void ListCheckpointsComplete(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + int checkpointCount) => + Logger.ListCheckpointsComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, checkpointCount); + + /// + /// Indicates that an unhandled exception was encountered while retrieving a list of checkpoints. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoints are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership are associated with. + /// The exception that occurred. + /// + partial void ListCheckpointsError(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + Exception exception) => + Logger.ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, exception.Message); + + /// + /// Indicates that invalid checkpoint data was found during an attempt to retrieve a list of checkpoints. + /// + /// + /// The identifier of the partition the data is associated with. + /// The fully qualified Event Hubs namespace the data is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the data is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the data is associated with. + /// + partial void InvalidCheckpointFound(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup) => + Logger.InvalidCheckpointFound(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup); + + /// + /// Indicates that an attempt to retrieve a list of checkpoints has started. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoints are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoints are associated with. + /// + partial void ListCheckpointsStart(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup) => + Logger.ListCheckpointsStart(fullyQualifiedNamespace, eventHubName, consumerGroup); + + /// + /// Indicates that an unhandled exception was encountered while updating a checkpoint. + /// + /// + /// The identifier of the partition being checkpointed. + /// The fully qualified Event Hubs namespace the checkpoint is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The exception that occurred. + /// + partial void UpdateCheckpointError(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + Exception exception) => + Logger.UpdateCheckpointError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, exception.Message); + + /// + /// Indicates that an attempt to update a checkpoint has completed. + /// + /// + /// The identifier of the partition being checkpointed. + /// The fully qualified Event Hubs namespace the checkpoint is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// + partial void UpdateCheckpointComplete(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup) => + Logger.UpdateCheckpointComplete(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup); + + /// + /// Indicates that an attempt to create/update a checkpoint has started. + /// + /// + /// The identifier of the partition being checkpointed. + /// The fully qualified Event Hubs namespace the checkpoint is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// + partial void UpdateCheckpointStart(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup) => + Logger.UpdateCheckpointStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup); + + /// + /// Indicates that an attempt to retrieve claim partition ownership has completed. + /// + /// + /// The identifier of the partition being claimed. + /// The fully qualified Event Hubs namespace the ownership is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership is associated with. + /// The identifier of the processor that attempted to claim the ownership for. + /// + partial void ClaimOwnershipComplete(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier) => + Logger.ClaimOwnershipComplete(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier); + + /// + /// Indicates that an exception was encountered while attempting to retrieve claim partition ownership. + /// + /// + /// The identifier of the partition being claimed. + /// The fully qualified Event Hubs namespace the ownership is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership is associated with. + /// The identifier of the processor that attempted to claim the ownership for. + /// The exception that occurred. + /// + partial void ClaimOwnershipError(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier, + Exception exception) => + Logger.ClaimOwnershipError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier, exception.Message); + + /// + /// Indicates that ownership was unable to be claimed. + /// + /// + /// The identifier of the partition being claimed. + /// The fully qualified Event Hubs namespace the ownership is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership is associated with. + /// The identifier of the processor that attempted to claim the ownership for. + /// The message for the failure. + /// + partial void OwnershipNotClaimable(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier, + string message) => + Logger.OwnershipNotClaimable(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier, message); + + /// + /// Indicates that ownership was successfully claimed. + /// + /// + /// The identifier of the partition being claimed. + /// The fully qualified Event Hubs namespace the ownership is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership is associated with. + /// The identifier of the processor that attempted to claim the ownership for. + /// + partial void OwnershipClaimed(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier) => + Logger.OwnershipClaimed(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier); + + /// + /// Indicates that an attempt to claim a partition ownership has started. + /// + /// + /// The identifier of the partition being claimed. + /// The fully qualified Event Hubs namespace the ownership is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership is associated with. + /// The identifier of the processor that attempted to claim the ownership for. + /// + partial void ClaimOwnershipStart(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier) => + Logger.ClaimOwnershipStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier); + + /// + /// Indicates that a was created. + /// + /// + /// The type name for the checkpoint store. + /// The Storage account name corresponding to the associated container client. + /// The name of the associated container client. + /// + partial void BlobsCheckpointStoreCreated(string typeName, + string accountName, + string containerName) => + Logger.BlobsCheckpointStoreCreated(typeName, accountName, containerName); + + /// + /// Indicates that an attempt to retrieve a checkpoint has started. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// + partial void GetCheckpointStart(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId) => + Logger.GetCheckpointStart(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId); + + /// + /// Indicates that an attempt to retrieve a checkpoint has completed. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// + partial void GetCheckpointComplete(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId) => + Logger.GetCheckpointComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId); + + /// + /// Indicates that an unhandled exception was encountered while retrieving a checkpoint. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// The message for the exception that occurred. + /// + partial void GetCheckpointError(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId, + Exception exception) => + Logger.GetCheckpointError(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId, exception.Message); + } +} diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/IBlobEventLogger.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/IBlobEventLogger.cs new file mode 100644 index 0000000000000..52c6281db7d43 --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/IBlobEventLogger.cs @@ -0,0 +1,293 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; + +namespace Azure.Messaging.EventHubs.Tests +{ + /// + /// The contract for logging + /// operations. + /// + /// + public interface IBlobEventLogger + { + /// + /// Indicates that an attempt to retrieve a list of ownership has completed. + /// + /// + /// The fully qualified Event Hubs namespace the ownership are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership are associated with. + /// The amount of ownership received from the storage service. + /// + void ListOwnershipComplete(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + int ownershipCount); + + /// + /// Indicates that an unhandled exception was encountered while retrieving a list of ownership. + /// + /// + /// The fully qualified Event Hubs namespace the ownership are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership are associated with. + /// The exception that occurred. + /// + void ListOwnershipError(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string exception); + + /// + /// Indicates that an attempt to retrieve a list of ownership has started. + /// + /// + /// The fully qualified Event Hubs namespace the ownership are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership are associated with. + /// + void ListOwnershipStart(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup); + + /// + /// Indicates that an attempt to retrieve a list of checkpoints has completed. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoints are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoints are associated with. + /// The amount of checkpoints received from the storage service. + /// + void ListCheckpointsComplete(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + int checkpointCount); + + /// + /// Indicates that an unhandled exception was encountered while retrieving a list of checkpoints. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoints are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership are associated with. + /// The exception that occurred. + /// + void ListCheckpointsError(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string exception); + + /// + /// Indicates that invalid checkpoint data was found during an attempt to retrieve a list of checkpoints. + /// + /// + /// The identifier of the partition the data is associated with. + /// The fully qualified Event Hubs namespace the data is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the data is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the data is associated with. + /// + void InvalidCheckpointFound(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup); + + /// + /// Indicates that an attempt to retrieve a list of checkpoints has started. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoints are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoints are associated with. + /// + void ListCheckpointsStart(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup); + + /// + /// Indicates that an unhandled exception was encountered while updating a checkpoint. + /// + /// + /// The identifier of the partition being checkpointed. + /// The fully qualified Event Hubs namespace the checkpoint is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The exception that occurred. + /// + void UpdateCheckpointError(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string exception); + + /// + /// Indicates that an attempt to update a checkpoint has completed. + /// + /// + /// The identifier of the partition being checkpointed. + /// The fully qualified Event Hubs namespace the checkpoint is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// + void UpdateCheckpointComplete(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup); + + /// + /// Indicates that an attempt to create/update a checkpoint has started. + /// + /// + /// The identifier of the partition being checkpointed. + /// The fully qualified Event Hubs namespace the checkpoint is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// + void UpdateCheckpointStart(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup); + + /// + /// Indicates that an attempt to retrieve claim partition ownership has completed. + /// + /// + /// The identifier of the partition being claimed. + /// The fully qualified Event Hubs namespace the ownership is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership is associated with. + /// The identifier of the processor that attempted to claim the ownership for. + /// + void ClaimOwnershipComplete(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier); + + /// + /// Indicates that an exception was encountered while attempting to retrieve claim partition ownership. + /// + /// + /// The identifier of the partition being claimed. + /// The fully qualified Event Hubs namespace the ownership is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership is associated with. + /// The identifier of the processor that attempted to claim the ownership for. + /// The exception that occurred. + /// + void ClaimOwnershipError(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier, + string exception); + + /// + /// Indicates that ownership was unable to be claimed. + /// + /// + /// The identifier of the partition being claimed. + /// The fully qualified Event Hubs namespace the ownership is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership is associated with. + /// The identifier of the processor that attempted to claim the ownership for. + /// The message for the failure. + /// + void OwnershipNotClaimable(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier, + string message); + + /// + /// Indicates that ownership was successfully claimed. + /// + /// + /// The identifier of the partition being claimed. + /// The fully qualified Event Hubs namespace the ownership is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership is associated with. + /// The identifier of the processor that attempted to claim the ownership for. + /// + void OwnershipClaimed(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier); + + /// + /// Indicates that an attempt to claim a partition ownership has started. + /// + /// + /// The identifier of the partition being claimed. + /// The fully qualified Event Hubs namespace the ownership is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership is associated with. + /// The identifier of the processor that attempted to claim the ownership for. + /// + void ClaimOwnershipStart(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier); + + /// + /// Indicates that a was created. + /// + /// + /// The type name for the checkpoint store. + /// The Storage account name corresponding to the associated container client. + /// The name of the associated container client. + /// + void BlobsCheckpointStoreCreated(string typeName, + string accountName, + string containerName); + + /// + /// Indicates that an attempt to retrieve a checkpoint has started. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// + void GetCheckpointStart(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId); + + /// + /// Indicates that an attempt to retrieve a checkpoint has completed. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// + void GetCheckpointComplete(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId); + + /// + /// Indicates that an unhandled exception was encountered while retrieving a checkpoint. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// The message for the exception that occurred. + /// + void GetCheckpointError(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId, + string exception); + } +} diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/PartitionOwnershipExtensions.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/PartitionOwnershipExtensions.cs similarity index 97% rename from sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/PartitionOwnershipExtensions.cs rename to sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/PartitionOwnershipExtensions.cs index 9849965243ce1..1d73403554ae6 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/PartitionOwnershipExtensions.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/PartitionOwnershipExtensions.cs @@ -4,7 +4,7 @@ using System; using Azure.Messaging.EventHubs.Primitives; -namespace Azure.Messaging.EventHubs.Processor.Tests +namespace Azure.Messaging.EventHubs.Tests { /// /// The set of extension methods for the diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/PartitionOwnershipExtensionsTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/PartitionOwnershipExtensionsTests.cs similarity index 99% rename from sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/PartitionOwnershipExtensionsTests.cs rename to sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/PartitionOwnershipExtensionsTests.cs index 2e0060b21bf6b..7ec92882bcc6d 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/PartitionOwnershipExtensionsTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/PartitionOwnershipExtensionsTests.cs @@ -5,7 +5,7 @@ using Azure.Messaging.EventHubs.Primitives; using NUnit.Framework; -namespace Azure.Messaging.EventHubs.Processor.Tests +namespace Azure.Messaging.EventHubs.Tests { /// /// The suite of tests for the diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/TestRunFixture.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/TestRunFixture.cs new file mode 100644 index 0000000000000..50edfcc61934c --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Infrastructure/TestRunFixture.cs @@ -0,0 +1,55 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using NUnit.Framework; + +namespace Azure.Messaging.EventHubs.Tests +{ + /// + /// Serves as a fixture for operations that are scoped to the entire + /// test run pass, rather than specific to a given test or test fixture. + /// + /// + [SetUpFixture] + public class TestRunFixture + { + /// + /// Performs the tasks needed to clean up after a test run + /// has completed. + /// + /// + [OneTimeTearDown] + public void Teardown() + { + // Clean-up should not be considered a critical failure that results in a test run failure. Due + // to ARM being temperamental, some management operations may be rejected. Throwing here + // does not help to ensure resource cleanup. + // + // Because resources may be orphaned outside of an observed exception, throwing to raise awareness + // is not sufficient for all scenarios; since an external process is already needed to manage + // orphans, there is no benefit to failing the run; allow the test results to be reported. + + try + { + if (EventHubsTestEnvironment.Instance.ShouldRemoveNamespaceAfterTestRunCompletion) + { + EventHubScope.DeleteNamespaceAsync(EventHubsTestEnvironment.Instance.EventHubsNamespace).GetAwaiter().GetResult(); + } + } + catch + { + } + + try + { + if (StorageTestEnvironment.Instance.ShouldRemoveStorageAccountAfterTestRunCompletion) + { + StorageScope.DeleteStorageAccountAsync(StorageTestEnvironment.Instance.StorageAccountName).GetAwaiter().GetResult(); + } + } + catch + { + } + } + } +} diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Azure.Messaging.EventHubs.csproj b/sdk/eventhub/Azure.Messaging.EventHubs/src/Azure.Messaging.EventHubs.csproj index 34bee49629811..4a6a8a562f822 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Azure.Messaging.EventHubs.csproj +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Azure.Messaging.EventHubs.csproj @@ -16,11 +16,10 @@ + - - diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Azure.Messaging.EventHubs.Tests.csproj b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Azure.Messaging.EventHubs.Tests.csproj index 3b744368e092a..900f433bbc752 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Azure.Messaging.EventHubs.Tests.csproj +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Azure.Messaging.EventHubs.Tests.csproj @@ -33,5 +33,4 @@ - diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj index 32dedaecda9b1..6a0cc92cf5272 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj @@ -24,12 +24,11 @@ + - - diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/WebJobsEventHubTestBase.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/WebJobsEventHubTestBase.cs index bfe42feb81c76..3fb50caa6e176 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/WebJobsEventHubTestBase.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/WebJobsEventHubTestBase.cs @@ -4,7 +4,6 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; -using Azure.Messaging.EventHubs.Processor.Tests; using Azure.Messaging.EventHubs.Tests; using Microsoft.Azure.WebJobs.EventHubs; using Microsoft.Azure.WebJobs.Host.TestCommon;