From 2b9a2213d66456b17113941de1a47fe0d04bd1c8 Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Mon, 22 Mar 2021 10:56:28 -0700 Subject: [PATCH] Porting changes from https://github.com/Azure/azure-cosmos-dotnet-v3/pull/888 --- .../ChangeFeedProcessorCore.cs | 12 +- .../Configuration/ChangeFeedLeaseOptions.cs | 8 +- .../ChangeFeedProcessorOptions.cs | 2 +- .../Configuration/CheckpointFrequency.cs | 11 - .../FeedManagement/PartitionSupervisorCore.cs | 8 +- .../PartitionSupervisorFactoryCore.cs | 14 +- .../FeedProcessing/AutoCheckpointer.cs | 54 +--- .../FeedProcessing/ChangeFeedObserver.cs | 8 +- .../FeedProcessing/ChangeFeedObserverBase.cs | 12 +- .../ChangeFeedObserverCloseReason.cs | 2 +- .../ChangeFeedObserverContext.cs | 2 +- .../ChangeFeedObserverContextCore.cs | 4 +- .../ChangeFeedObserverFactory.cs | 10 +- .../ChangeFeedObserverFactoryCore.cs | 61 ++++- .../CheckpointerObserverFactory.cs | 34 +-- .../FeedProcessing/FeedProcessor.cs | 4 +- .../FeedProcessing/FeedProcessorCore.cs | 44 +-- .../FeedProcessing/FeedProcessorFactory.cs | 4 +- .../FeedProcessorFactoryCore.cs | 11 +- ...tionWrappingChangeFeedObserverDecorator.cs | 14 +- .../src/Resource/Container/Container.cs | 20 ++ .../Resource/Container/ContainerCore.Items.cs | 27 +- .../Resource/Container/ContainerInlineCore.cs | 7 + .../ChangeFeed/DynamicStreamTests.cs | 254 ++++++++++++++++++ .../ChangeFeed/AutoCheckPointTests.cs | 96 ++----- .../ChangeFeedProcessorCoreTests.cs | 32 +-- .../ChangeFeed/FeedProcessorCoreTests.cs | 75 ++---- ...rappingChangeFeedObserverDecoratorTests.cs | 58 ++-- .../ChangeFeed/PartitionControllerTests.cs | 25 +- .../ChangeFeed/PartitionSupervisorTests.cs | 26 +- 30 files changed, 576 insertions(+), 363 deletions(-) create mode 100644 Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/DynamicStreamTests.cs diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs index a4f5ee5aec..70ebadcd8f 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs @@ -16,9 +16,9 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Cosmos.Tracing; - internal sealed class ChangeFeedProcessorCore : ChangeFeedProcessor + internal sealed class ChangeFeedProcessorCore : ChangeFeedProcessor { - private readonly ChangeFeedObserverFactory observerFactory; + private readonly ChangeFeedObserverFactory observerFactory; private ContainerInternal leaseContainer; private string instanceName; private ContainerInternal monitoredContainer; @@ -28,7 +28,7 @@ internal sealed class ChangeFeedProcessorCore : ChangeFeedProcessor private DocumentServiceLeaseStoreManager documentServiceLeaseStoreManager; private bool initialized = false; - public ChangeFeedProcessorCore(ChangeFeedObserverFactory observerFactory) + public ChangeFeedProcessorCore(ChangeFeedObserverFactory observerFactory) { this.observerFactory = observerFactory ?? throw new ArgumentNullException(nameof(observerFactory)); } @@ -97,7 +97,7 @@ private PartitionManager BuildPartitionManager( string containerRid, Routing.PartitionKeyRangeCache partitionKeyRangeCache) { - CheckpointerObserverFactory factory = new CheckpointerObserverFactory(this.observerFactory, this.changeFeedProcessorOptions.CheckpointFrequency); + CheckpointerObserverFactory factory = new CheckpointerObserverFactory(this.observerFactory, this.changeFeedProcessorOptions.CheckpointFrequency); PartitionSynchronizerCore synchronizer = new PartitionSynchronizerCore( this.monitoredContainer, this.documentServiceLeaseStoreManager.LeaseContainer, @@ -106,10 +106,10 @@ private PartitionManager BuildPartitionManager( partitionKeyRangeCache, containerRid); BootstrapperCore bootstrapper = new BootstrapperCore(synchronizer, this.documentServiceLeaseStoreManager.LeaseStore, BootstrapperCore.DefaultLockTime, BootstrapperCore.DefaultSleepTime); - PartitionSupervisorFactoryCore partitionSuperviserFactory = new PartitionSupervisorFactoryCore( + PartitionSupervisorFactoryCore partitionSuperviserFactory = new PartitionSupervisorFactoryCore( factory, this.documentServiceLeaseStoreManager.LeaseManager, - new FeedProcessorFactoryCore(this.monitoredContainer, this.changeFeedProcessorOptions, this.documentServiceLeaseStoreManager.LeaseCheckpointer, this.monitoredContainer.ClientContext.SerializerCore), + new FeedProcessorFactoryCore(this.monitoredContainer, this.changeFeedProcessorOptions, this.documentServiceLeaseStoreManager.LeaseCheckpointer), this.changeFeedLeaseOptions); EqualPartitionsBalancingStrategy loadBalancingStrategy = new EqualPartitionsBalancingStrategy( diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/ChangeFeedLeaseOptions.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/ChangeFeedLeaseOptions.cs index 3b3a01fbdf..e72e00efb9 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/ChangeFeedLeaseOptions.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/ChangeFeedLeaseOptions.cs @@ -7,7 +7,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Configuration using System; /// - /// Options to control various aspects of partition distribution happening within instance. + /// Options to control various aspects of partition distribution happening within instance. /// internal class ChangeFeedLeaseOptions { @@ -24,7 +24,7 @@ public ChangeFeedLeaseOptions() } /// - /// Gets or sets renew interval for all leases currently held by instance. + /// Gets or sets renew interval for all leases currently held by instance. /// public TimeSpan LeaseRenewInterval { get; set; } @@ -35,12 +35,12 @@ public ChangeFeedLeaseOptions() /// /// Gets or sets the interval for which the lease is taken. If the lease is not renewed within this - /// interval, it will cause it to expire and ownership of the lease will move to another instance. + /// interval, it will cause it to expire and ownership of the lease will move to another instance. /// public TimeSpan LeaseExpirationInterval { get; set; } /// - /// Gets or sets a prefix to be used as part of the lease id. This can be used to support multiple instances of + /// Gets or sets a prefix to be used as part of the lease id. This can be used to support multiple instances of /// instances pointing at the same feed while using the same auxiliary collection. /// public string LeasePrefix { get; set; } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/ChangeFeedProcessorOptions.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/ChangeFeedProcessorOptions.cs index d8cf77ad64..9864cbc20a 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/ChangeFeedProcessorOptions.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/ChangeFeedProcessorOptions.cs @@ -8,7 +8,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Configuration using Microsoft.Azure.Cosmos; /// - /// Options to control various aspects of partition distribution happening within instance. + /// Options to control various aspects of partition distribution happening within instance. /// internal class ChangeFeedProcessorOptions { diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/CheckpointFrequency.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/CheckpointFrequency.cs index 327598a2f1..9e155dce03 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/CheckpointFrequency.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/CheckpointFrequency.cs @@ -4,7 +4,6 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Configuration { - using System; using Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement; /// @@ -18,15 +17,5 @@ internal class CheckpointFrequency /// Client code needs to explicitly checkpoint via /// public bool ExplicitCheckpoint { get; set; } - - /// - /// Gets or sets the value that specifies to checkpoint every specified number of docs. - /// - public int? ProcessedDocumentCount { get; set; } - - /// - /// Gets or sets the value that specifies to checkpoint every specified time interval. - /// - public TimeSpan? TimeInterval { get; set; } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedManagement/PartitionSupervisorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedManagement/PartitionSupervisorCore.cs index baf778aa9b..160bcc083f 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedManagement/PartitionSupervisorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedManagement/PartitionSupervisorCore.cs @@ -12,16 +12,16 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; using Microsoft.Azure.Cosmos.ChangeFeed.Utils; - internal sealed class PartitionSupervisorCore : PartitionSupervisor + internal sealed class PartitionSupervisorCore : PartitionSupervisor { private readonly DocumentServiceLease lease; - private readonly ChangeFeedObserver observer; + private readonly ChangeFeedObserver observer; private readonly FeedProcessor processor; private readonly LeaseRenewer renewer; private readonly CancellationTokenSource renewerCancellation = new CancellationTokenSource(); private CancellationTokenSource processorCancellation; - public PartitionSupervisorCore(DocumentServiceLease lease, ChangeFeedObserver observer, FeedProcessor processor, LeaseRenewer renewer) + public PartitionSupervisorCore(DocumentServiceLease lease, ChangeFeedObserver observer, FeedProcessor processor, LeaseRenewer renewer) { this.lease = lease; this.observer = observer; @@ -31,7 +31,7 @@ public PartitionSupervisorCore(DocumentServiceLease lease, ChangeFeedObserver public override async Task RunAsync(CancellationToken shutdownToken) { - ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.lease.CurrentLeaseToken); + ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.lease.CurrentLeaseToken); await this.observer.OpenAsync(context).ConfigureAwait(false); this.processorCancellation = CancellationTokenSource.CreateLinkedTokenSource(shutdownToken); diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedManagement/PartitionSupervisorFactoryCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedManagement/PartitionSupervisorFactoryCore.cs index 7e8f02fac3..4b11dd42ef 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedManagement/PartitionSupervisorFactoryCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedManagement/PartitionSupervisorFactoryCore.cs @@ -9,17 +9,17 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement using Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing; using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; - internal sealed class PartitionSupervisorFactoryCore : PartitionSupervisorFactory + internal sealed class PartitionSupervisorFactoryCore : PartitionSupervisorFactory { - private readonly ChangeFeedObserverFactory observerFactory; + private readonly ChangeFeedObserverFactory observerFactory; private readonly DocumentServiceLeaseManager leaseManager; private readonly ChangeFeedLeaseOptions changeFeedLeaseOptions; - private readonly FeedProcessorFactory partitionProcessorFactory; + private readonly FeedProcessorFactory partitionProcessorFactory; public PartitionSupervisorFactoryCore( - ChangeFeedObserverFactory observerFactory, + ChangeFeedObserverFactory observerFactory, DocumentServiceLeaseManager leaseManager, - FeedProcessorFactory partitionProcessorFactory, + FeedProcessorFactory partitionProcessorFactory, ChangeFeedLeaseOptions options) { this.observerFactory = observerFactory ?? throw new ArgumentNullException(nameof(observerFactory)); @@ -35,11 +35,11 @@ public override PartitionSupervisor Create(DocumentServiceLease lease) throw new ArgumentNullException(nameof(lease)); } - ChangeFeedObserver changeFeedObserver = this.observerFactory.CreateObserver(); + ChangeFeedObserver changeFeedObserver = this.observerFactory.CreateObserver(); FeedProcessor processor = this.partitionProcessorFactory.Create(lease, changeFeedObserver); LeaseRenewerCore renewer = new LeaseRenewerCore(lease, this.leaseManager, this.changeFeedLeaseOptions.LeaseRenewInterval); - return new PartitionSupervisorCore(lease, changeFeedObserver, processor, renewer); + return new PartitionSupervisorCore(lease, changeFeedObserver, processor, renewer); } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/AutoCheckpointer.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/AutoCheckpointer.cs index 616990aaad..a4c1664612 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/AutoCheckpointer.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/AutoCheckpointer.cs @@ -5,31 +5,21 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing { using System; - using System.Collections.Generic; + using System.IO; using System.Threading; using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.ChangeFeed.Configuration; - internal sealed class AutoCheckpointer : ChangeFeedObserver + internal sealed class AutoCheckpointer : ChangeFeedObserver { - private readonly CheckpointFrequency checkpointFrequency; - private readonly ChangeFeedObserver observer; - private int processedDocCount; - private DateTime lastCheckpointTime = DateTime.UtcNow; + private readonly ChangeFeedObserver observer; - public AutoCheckpointer(CheckpointFrequency checkpointFrequency, ChangeFeedObserver observer) + public AutoCheckpointer(ChangeFeedObserver observer) { - if (checkpointFrequency == null) - { - throw new ArgumentNullException(nameof(checkpointFrequency)); - } - if (observer == null) { throw new ArgumentNullException(nameof(observer)); } - this.checkpointFrequency = checkpointFrequency; this.observer = observer; } @@ -43,38 +33,14 @@ public override Task CloseAsync(ChangeFeedObserverContext context, ChangeFeedObs return this.observer.CloseAsync(context, reason); } - public override async Task ProcessChangesAsync(ChangeFeedObserverContext context, IReadOnlyCollection docs, CancellationToken cancellationToken) - { - await this.observer.ProcessChangesAsync(context, docs, cancellationToken).ConfigureAwait(false); - this.processedDocCount += docs.Count; - - if (this.IsCheckpointNeeded()) - { - await context.CheckpointAsync().ConfigureAwait(false); - this.processedDocCount = 0; - this.lastCheckpointTime = DateTime.UtcNow; - } - } - - private bool IsCheckpointNeeded() + public override async Task ProcessChangesAsync( + ChangeFeedObserverContext context, + Stream stream, + CancellationToken cancellationToken) { - if (!this.checkpointFrequency.ProcessedDocumentCount.HasValue && !this.checkpointFrequency.TimeInterval.HasValue) - { - return true; - } - - if (this.processedDocCount >= this.checkpointFrequency.ProcessedDocumentCount) - { - return true; - } - - TimeSpan delta = DateTime.UtcNow - this.lastCheckpointTime; - if (delta >= this.checkpointFrequency.TimeInterval) - { - return true; - } + await this.observer.ProcessChangesAsync(context, stream, cancellationToken).ConfigureAwait(false); - return false; + await context.CheckpointAsync().ConfigureAwait(false); } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserver.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserver.cs index 94aa0fe9a5..c856b00a9e 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserver.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserver.cs @@ -4,14 +4,14 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing { - using System.Collections.Generic; + using System.IO; using System.Threading; using System.Threading.Tasks; /// /// This interface is used to deliver change events to document feed observers. /// - internal abstract class ChangeFeedObserver + internal abstract class ChangeFeedObserver { /// /// This is called when change feed observer is opened. @@ -32,9 +32,9 @@ internal abstract class ChangeFeedObserver /// This is called when document changes are available on change feed. /// /// The context specifying partition for this change event, etc. - /// The documents changed. + /// The document streams that contain the change feed events. /// Token to signal that the partition processing is going to finish. /// A Task to allow asynchronous execution. - public abstract Task ProcessChangesAsync(ChangeFeedObserverContext context, IReadOnlyCollection docs, CancellationToken cancellationToken); + public abstract Task ProcessChangesAsync(ChangeFeedObserverContext context, Stream stream, CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverBase.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverBase.cs index 1dcbc98372..4316a47ac1 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverBase.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverBase.cs @@ -4,16 +4,16 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing { - using System.Collections.Generic; + using System.IO; using System.Threading; using System.Threading.Tasks; using static Microsoft.Azure.Cosmos.Container; - internal sealed class ChangeFeedObserverBase : ChangeFeedObserver + internal sealed class ChangeFeedObserverBase : ChangeFeedObserver { - private readonly ChangesHandler onChanges; + private readonly ChangesStreamHandler onChanges; - public ChangeFeedObserverBase(ChangesHandler onChanges) + public ChangeFeedObserverBase(ChangesStreamHandler onChanges) { this.onChanges = onChanges; } @@ -28,9 +28,9 @@ public override Task OpenAsync(ChangeFeedObserverContext context) return Task.CompletedTask; } - public override Task ProcessChangesAsync(ChangeFeedObserverContext context, IReadOnlyCollection docs, CancellationToken cancellationToken) + public override Task ProcessChangesAsync(ChangeFeedObserverContext context, Stream stream, CancellationToken cancellationToken) { - return this.onChanges(docs, cancellationToken); + return this.onChanges(stream, cancellationToken); } } } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverCloseReason.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverCloseReason.cs index 5405643646..64b0af28ba 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverCloseReason.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverCloseReason.cs @@ -5,7 +5,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing { /// - /// The reason for the to close. + /// The reason for the to close. /// internal enum ChangeFeedObserverCloseReason { diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverContext.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverContext.cs index 6091aa81b9..54f9bd7947 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverContext.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverContext.cs @@ -7,7 +7,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing using System.Threading.Tasks; /// - /// Represents the context passed to events. + /// Represents the context passed to events. /// internal abstract class ChangeFeedObserverContext { diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverContextCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverContextCore.cs index 62c56c6a9f..b271b81dd2 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverContextCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverContextCore.cs @@ -9,9 +9,9 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing using Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement; /// - /// The context passed to events. + /// The context passed to events. /// - internal sealed class ChangeFeedObserverContextCore : ChangeFeedObserverContext + internal sealed class ChangeFeedObserverContextCore : ChangeFeedObserverContext { private readonly PartitionCheckpointer checkpointer; diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverFactory.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverFactory.cs index 885c99e67e..5f385a3dbf 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverFactory.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverFactory.cs @@ -5,14 +5,14 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing { /// - /// Factory class used to create instance(s) of . + /// Factory class used to create instance(s) of . /// - internal abstract class ChangeFeedObserverFactory + internal abstract class ChangeFeedObserverFactory { /// - /// Creates an instance of a . + /// Creates an instance of a . /// - /// An instance of a . - public abstract ChangeFeedObserver CreateObserver(); + /// An instance of a . + public abstract ChangeFeedObserver CreateObserver(); } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverFactoryCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverFactoryCore.cs index 36d2c161ea..210eb87f83 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverFactoryCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverFactoryCore.cs @@ -4,20 +4,71 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing { + using System; + using System.Collections.Generic; + using System.IO; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.ChangeFeed.Exceptions; using static Microsoft.Azure.Cosmos.Container; - internal sealed class ChangeFeedObserverFactoryCore : ChangeFeedObserverFactory + internal sealed class ChangeFeedObserverFactoryCore : ChangeFeedObserverFactory + { + private readonly ChangesStreamHandler onChanges; + + public ChangeFeedObserverFactoryCore(ChangesStreamHandler onChanges) + { + this.onChanges = onChanges ?? throw new ArgumentNullException(nameof(onChanges)); + } + + public override ChangeFeedObserver CreateObserver() + { + return new ChangeFeedObserverBase(this.onChanges); + } + } + + internal sealed class ChangeFeedObserverFactoryCore : ChangeFeedObserverFactory { private readonly ChangesHandler onChanges; + private readonly CosmosSerializerCore serializerCore; - public ChangeFeedObserverFactoryCore(ChangesHandler onChanges) + public ChangeFeedObserverFactoryCore( + ChangesHandler onChanges, + CosmosSerializerCore serializerCore) { - this.onChanges = onChanges; + this.onChanges = onChanges ?? throw new ArgumentNullException(nameof(onChanges)); + this.serializerCore = serializerCore ?? throw new ArgumentNullException(nameof(serializerCore)); } - public override ChangeFeedObserver CreateObserver() + public override ChangeFeedObserver CreateObserver() { - return new ChangeFeedObserverBase(this.onChanges); + return new ChangeFeedObserverBase(this.ChangesStreamHandlerAsync); + } + + private Task ChangesStreamHandlerAsync( + Stream changes, + CancellationToken cancellationToken) + { + IReadOnlyCollection asFeedResponse; + try + { + asFeedResponse = CosmosFeedResponseSerializer.FromFeedResponseStream( + this.serializerCore, + changes); + + if (!asFeedResponse.Any()) + { + return Task.CompletedTask; + } + } + catch (Exception serializationException) + { + // Error using custom serializer to parse stream + throw new ObserverException(serializationException); + } + + return this.onChanges(asFeedResponse, cancellationToken); } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/CheckpointerObserverFactory.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/CheckpointerObserverFactory.cs index 2d93d3d7ff..e813a9c110 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/CheckpointerObserverFactory.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/CheckpointerObserverFactory.cs @@ -8,47 +8,37 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing using Microsoft.Azure.Cosmos.ChangeFeed.Configuration; /// - /// Factory class used to create instance(s) of . + /// Factory class used to create instance(s) of . /// - internal sealed class CheckpointerObserverFactory : ChangeFeedObserverFactory + internal sealed class CheckpointerObserverFactory : ChangeFeedObserverFactory { - private readonly ChangeFeedObserverFactory observerFactory; + private readonly ChangeFeedObserverFactory observerFactory; private readonly CheckpointFrequency checkpointFrequency; /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// /// Instance of Observer Factory /// Defined - public CheckpointerObserverFactory(ChangeFeedObserverFactory observerFactory, CheckpointFrequency checkpointFrequency) + public CheckpointerObserverFactory(ChangeFeedObserverFactory observerFactory, CheckpointFrequency checkpointFrequency) { - if (observerFactory == null) - { - throw new ArgumentNullException(nameof(observerFactory)); - } - - if (checkpointFrequency == null) - { - throw new ArgumentNullException(nameof(checkpointFrequency)); - } - - this.observerFactory = observerFactory; - this.checkpointFrequency = checkpointFrequency; + this.observerFactory = observerFactory ?? throw new ArgumentNullException(nameof(observerFactory)); + this.checkpointFrequency = checkpointFrequency ?? throw new ArgumentNullException(nameof(checkpointFrequency)); } /// - /// Creates a new instance of . + /// Creates a new instance of . /// - /// Created instance of . - public override ChangeFeedObserver CreateObserver() + /// Created instance of . + public override ChangeFeedObserver CreateObserver() { - ChangeFeedObserver observer = new ObserverExceptionWrappingChangeFeedObserverDecorator(this.observerFactory.CreateObserver()); + ChangeFeedObserver observer = new ObserverExceptionWrappingChangeFeedObserverDecorator(this.observerFactory.CreateObserver()); if (this.checkpointFrequency.ExplicitCheckpoint) { return observer; } - return new AutoCheckpointer(this.checkpointFrequency, observer); + return new AutoCheckpointer(observer); } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedProcessor.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedProcessor.cs index 8bd36f37ed..55bf42a68c 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedProcessor.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedProcessor.cs @@ -9,11 +9,11 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing /// /// Provides an API to run continuous processing on a single processing unit of some resource. - /// Created by after some lease is acquired by the current host. + /// Created by after some lease is acquired by the current host. /// Processing can perform the following tasks in a loop: /// 1. Read some data from the resource feed. /// 2. Handle possible problems with the read. - /// 3. Pass the obtained data to an observer by calling with the context . + /// 3. Pass the obtained data to an observer by calling with the context . /// internal abstract class FeedProcessor { diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedProcessorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedProcessorCore.cs index d1cb69b006..3e3e2f4aa7 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedProcessorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedProcessorCore.cs @@ -17,26 +17,23 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing using Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement; using Microsoft.Azure.Cosmos.Core.Trace; - internal sealed class FeedProcessorCore : FeedProcessor + internal sealed class FeedProcessorCore : FeedProcessor { private readonly ProcessorOptions options; private readonly PartitionCheckpointer checkpointer; - private readonly ChangeFeedObserver observer; + private readonly ChangeFeedObserver observer; private readonly FeedIterator resultSetIterator; - private readonly CosmosSerializerCore serializerCore; public FeedProcessorCore( - ChangeFeedObserver observer, + ChangeFeedObserver observer, FeedIterator resultSetIterator, ProcessorOptions options, - PartitionCheckpointer checkpointer, - CosmosSerializerCore serializerCore) + PartitionCheckpointer checkpointer) { - this.observer = observer; - this.options = options; - this.checkpointer = checkpointer; - this.resultSetIterator = resultSetIterator; - this.serializerCore = serializerCore; + this.observer = observer ?? throw new ArgumentNullException(nameof(observer)); + this.options = options ?? throw new ArgumentNullException(nameof(options)); + this.checkpointer = checkpointer ?? throw new ArgumentNullException(nameof(checkpointer)); + this.resultSetIterator = resultSetIterator ?? throw new ArgumentNullException(nameof(resultSetIterator)); } public override async Task RunAsync(CancellationToken cancellationToken) @@ -116,29 +113,8 @@ private void HandleFailedRequest( private Task DispatchChangesAsync(ResponseMessage response, CancellationToken cancellationToken) { - ChangeFeedObserverContext context = new ChangeFeedObserverContextCore(this.options.LeaseToken, response, this.checkpointer); - IEnumerable asFeedResponse; - try - { - asFeedResponse = CosmosFeedResponseSerializer.FromFeedResponseStream( - this.serializerCore, - response.Content); - } - catch (Exception serializationException) - { - // Error using custom serializer to parse stream - throw new ObserverException(serializationException); - } - - // When StartFromBeginning is used, the first request returns OK but no content - if (!asFeedResponse.Any()) - { - return Task.CompletedTask; - } - - List asReadOnlyList = new List(asFeedResponse); - - return this.observer.ProcessChangesAsync(context, asReadOnlyList, cancellationToken); + ChangeFeedObserverContext context = new ChangeFeedObserverContextCore(this.options.LeaseToken, response, this.checkpointer); + return this.observer.ProcessChangesAsync(context, response.Content, cancellationToken); } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedProcessorFactory.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedProcessorFactory.cs index 635198b99e..bd25314cf8 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedProcessorFactory.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedProcessorFactory.cs @@ -9,7 +9,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing /// /// Factory class used to create instance(s) of . /// - internal abstract class FeedProcessorFactory + internal abstract class FeedProcessorFactory { /// /// Creates an instance of a . @@ -17,6 +17,6 @@ internal abstract class FeedProcessorFactory /// Lease to be used for feed processing /// Observer to be used /// An instance of a . - public abstract FeedProcessor Create(DocumentServiceLease lease, ChangeFeedObserver observer); + public abstract FeedProcessor Create(DocumentServiceLease lease, ChangeFeedObserver observer); } } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedProcessorFactoryCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedProcessorFactoryCore.cs index fa695406d0..2434e70135 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedProcessorFactoryCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/FeedProcessorFactoryCore.cs @@ -10,26 +10,23 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing using Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement; using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; - internal class FeedProcessorFactoryCore : FeedProcessorFactory + internal class FeedProcessorFactoryCore : FeedProcessorFactory { private readonly ContainerInternal container; private readonly ChangeFeedProcessorOptions changeFeedProcessorOptions; private readonly DocumentServiceLeaseCheckpointer leaseCheckpointer; - private readonly CosmosSerializerCore serializerCore; public FeedProcessorFactoryCore( ContainerInternal container, ChangeFeedProcessorOptions changeFeedProcessorOptions, - DocumentServiceLeaseCheckpointer leaseCheckpointer, - CosmosSerializerCore serializerCore) + DocumentServiceLeaseCheckpointer leaseCheckpointer) { this.container = container ?? throw new ArgumentNullException(nameof(container)); this.changeFeedProcessorOptions = changeFeedProcessorOptions ?? throw new ArgumentNullException(nameof(changeFeedProcessorOptions)); this.leaseCheckpointer = leaseCheckpointer ?? throw new ArgumentNullException(nameof(leaseCheckpointer)); - this.serializerCore = serializerCore ?? throw new ArgumentNullException(nameof(serializerCore)); } - public override FeedProcessor Create(DocumentServiceLease lease, ChangeFeedObserver observer) + public override FeedProcessor Create(DocumentServiceLease lease, ChangeFeedObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (lease == null) throw new ArgumentNullException(nameof(lease)); @@ -56,7 +53,7 @@ public override FeedProcessor Create(DocumentServiceLease lease, ChangeFeedObser startTime: options.StartTime, startFromBeginning: options.StartFromBeginning); - return new FeedProcessorCore(observer, iterator, options, checkpointer, this.serializerCore); + return new FeedProcessorCore(observer, iterator, options, checkpointer); } } } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ObserverExceptionWrappingChangeFeedObserverDecorator.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ObserverExceptionWrappingChangeFeedObserverDecorator.cs index 26af72dc9e..3a452e2942 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ObserverExceptionWrappingChangeFeedObserverDecorator.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ObserverExceptionWrappingChangeFeedObserverDecorator.cs @@ -5,19 +5,19 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing { using System; - using System.Collections.Generic; + using System.IO; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.ChangeFeed.Exceptions; using Microsoft.Azure.Cosmos.Core.Trace; - internal sealed class ObserverExceptionWrappingChangeFeedObserverDecorator : ChangeFeedObserver + internal sealed class ObserverExceptionWrappingChangeFeedObserverDecorator : ChangeFeedObserver { - private ChangeFeedObserver changeFeedObserver; + private readonly ChangeFeedObserver changeFeedObserver; - public ObserverExceptionWrappingChangeFeedObserverDecorator(ChangeFeedObserver changeFeedObserver) + public ObserverExceptionWrappingChangeFeedObserverDecorator(ChangeFeedObserver changeFeedObserver) { - this.changeFeedObserver = changeFeedObserver; + this.changeFeedObserver = changeFeedObserver ?? throw new ArgumentNullException(nameof(changeFeedObserver)); } public override async Task CloseAsync(ChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason) @@ -48,11 +48,11 @@ public override async Task OpenAsync(ChangeFeedObserverContext context) } } - public override async Task ProcessChangesAsync(ChangeFeedObserverContext context, IReadOnlyCollection docs, CancellationToken cancellationToken) + public override async Task ProcessChangesAsync(ChangeFeedObserverContext context, Stream stream, CancellationToken cancellationToken) { try { - await this.changeFeedObserver.ProcessChangesAsync(context, docs, cancellationToken).ConfigureAwait(false); + await this.changeFeedObserver.ProcessChangesAsync(context, stream, cancellationToken).ConfigureAwait(false); } catch (Exception userException) { diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs index 2b4f414e85..61456fe9ed 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs @@ -1118,6 +1118,16 @@ public delegate Task ChangesHandler( IReadOnlyCollection changes, CancellationToken cancellationToken); + /// + /// Delegate to receive the changes within a execution. + /// + /// The changes that happened. + /// A cancellation token representing the current cancellation status of the instance. + /// A representing the asynchronous operation that is going to be done with the changes. + public delegate Task ChangesStreamHandler( + Stream changes, + CancellationToken cancellationToken); + /// /// Delegate to receive the estimation of pending changes to be read by the associated instance. /// @@ -1139,6 +1149,16 @@ public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder( string processorName, ChangesHandler onChangesDelegate); + /// + /// Initializes a for change feed processing. + /// + /// A name that identifies the Processor and the particular work it will do. + /// Delegate to receive changes. + /// An instance of + public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder( + string processorName, + ChangesStreamHandler onChangesDelegate); + /// /// Initializes a for change feed monitoring. /// diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs index 00eff3a68f..b73a5ac4c0 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs @@ -506,8 +506,31 @@ public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder( throw new ArgumentNullException(nameof(onChangesDelegate)); } - ChangeFeedObserverFactoryCore observerFactory = new ChangeFeedObserverFactoryCore(onChangesDelegate); - ChangeFeedProcessorCore changeFeedProcessor = new ChangeFeedProcessorCore(observerFactory); + ChangeFeedObserverFactoryCore observerFactory = new ChangeFeedObserverFactoryCore(onChangesDelegate, this.ClientContext.SerializerCore); + ChangeFeedProcessorCore changeFeedProcessor = new ChangeFeedProcessorCore(observerFactory); + return new ChangeFeedProcessorBuilder( + processorName: processorName, + container: this, + changeFeedProcessor: changeFeedProcessor, + applyBuilderConfiguration: changeFeedProcessor.ApplyBuildConfiguration); + } + + public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder( + string processorName, + ChangesStreamHandler onChangesDelegate) + { + if (processorName == null) + { + throw new ArgumentNullException(nameof(processorName)); + } + + if (onChangesDelegate == null) + { + throw new ArgumentNullException(nameof(onChangesDelegate)); + } + + ChangeFeedObserverFactoryCore observerFactory = new ChangeFeedObserverFactoryCore(onChangesDelegate); + ChangeFeedProcessorCore changeFeedProcessor = new ChangeFeedProcessorCore(observerFactory); return new ChangeFeedProcessorBuilder( processorName: processorName, container: this, diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs index 7a0b35d8da..97363def6c 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs @@ -373,6 +373,13 @@ public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder( return base.GetChangeFeedProcessorBuilder(processorName, onChangesDelegate); } + public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder( + string processorName, + ChangesStreamHandler onChangesDelegate) + { + return base.GetChangeFeedProcessorBuilder(processorName, onChangesDelegate); + } + public override ChangeFeedProcessorBuilder GetChangeFeedEstimatorBuilder(string processorName, ChangesEstimationHandler estimationDelegate, TimeSpan? estimationPeriod = null) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/DynamicStreamTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/DynamicStreamTests.cs new file mode 100644 index 0000000000..f5081b6e36 --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/DynamicStreamTests.cs @@ -0,0 +1,254 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed +{ + using System; + using System.Collections.Generic; + using System.IO; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Scripts; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Newtonsoft.Json; + using Newtonsoft.Json.Linq; + + [TestClass] + [TestCategory("ChangeFeed")] + public class DynamicStreamTests : BaseChangeFeedClientHelper + { + private readonly CosmosSerializerCore serializerCore = new CosmosSerializerCore(); + + [TestInitialize] + public async Task TestInitialize() + { + await base.ChangeFeedTestInit(); + + string PartitionKey = "/pk"; + ContainerResponse response = await this.database.CreateContainerAsync( + new ContainerProperties(id: Guid.NewGuid().ToString(), partitionKeyPath: PartitionKey), + throughput: 10000, + cancellationToken: this.cancellationToken); + this.Container = response; + } + + [TestCleanup] + public async Task Cleanup() + { + await base.TestCleanup(); + } + + [TestMethod] + public async Task TestWithRunningProcessor() + { + int partitionKey = 0; + ManualResetEvent allDocsProcessed = new ManualResetEvent(false); + + int processedDocCount = 0; + string accumulator = string.Empty; + ChangeFeedProcessor processor = this.Container + .GetChangeFeedProcessorBuilder("test", (Stream stream, CancellationToken token) => + { + IEnumerable asEnumerable = CosmosFeedResponseSerializer.FromFeedResponseStream(this.serializerCore, stream); + + processedDocCount += asEnumerable.Count(); + foreach (JObject doc in asEnumerable) + { + accumulator += doc["id"].ToString() + "."; + } + + if (processedDocCount == 10) + { + allDocsProcessed.Set(); + } + + return Task.CompletedTask; + }) + .WithInstanceName("random") + .WithLeaseContainer(this.LeaseContainer).Build(); + + // Start the processor, insert 1 document to generate a checkpoint + await processor.StartAsync(); + await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime); + foreach (int id in Enumerable.Range(0, 10)) + { + await this.Container.CreateItemAsync(new { id = id.ToString(), pk = partitionKey }); + } + + bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime); + await processor.StopAsync(); + Assert.IsTrue(isStartOk, "Timed out waiting for docs to process"); + Assert.AreEqual("0.1.2.3.4.5.6.7.8.9.", accumulator); + } + + [TestMethod] + public async Task TestWithFixedLeaseContainer() + { + await NonPartitionedContainerHelper.CreateNonPartitionedContainer( + this.database, + "fixedLeases"); + + Container fixedLeasesContainer = this.cosmosClient.GetContainer(this.database.Id, "fixedLeases"); + + try + { + + int partitionKey = 0; + ManualResetEvent allDocsProcessed = new ManualResetEvent(false); + + int processedDocCount = 0; + string accumulator = string.Empty; + ChangeFeedProcessor processor = this.Container + .GetChangeFeedProcessorBuilder("test", (Stream stream, CancellationToken token) => + { + IEnumerable asEnumerable = CosmosFeedResponseSerializer.FromFeedResponseStream(this.serializerCore, stream); + + processedDocCount += asEnumerable.Count(); + foreach (JObject doc in asEnumerable) + { + accumulator += doc["id"].ToString() + "."; + } + + if (processedDocCount == 10) + { + allDocsProcessed.Set(); + } + + return Task.CompletedTask; + }) + .WithInstanceName("random") + .WithLeaseContainer(fixedLeasesContainer).Build(); + + // Start the processor, insert 1 document to generate a checkpoint + await processor.StartAsync(); + await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime); + foreach (int id in Enumerable.Range(0, 10)) + { + await this.Container.CreateItemAsync(new { id = id.ToString(), pk = partitionKey }); + } + + bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime); + await processor.StopAsync(); + Assert.IsTrue(isStartOk, "Timed out waiting for docs to process"); + Assert.AreEqual("0.1.2.3.4.5.6.7.8.9.", accumulator); + } + finally + { + await fixedLeasesContainer.DeleteContainerAsync(); + } + } + + [TestMethod] + public async Task TestReducePageSizeScenario() + { + int partitionKey = 0; + // Create some docs to make sure that one separate response is returned for 1st execute of query before retries. + // These are to make sure continuation token is passed along during retries. + string sprocId = "createTwoDocs"; + string sprocBody = @"function(startIndex) { for (var i = 0; i < 2; ++i) __.createDocument( + __.getSelfLink(), + { id: 'doc' + (i + startIndex).toString(), value: 'y'.repeat(1500000), pk:0 }, + err => { if (err) throw err;} + );}"; + + Scripts scripts = this.Container.Scripts; + + StoredProcedureResponse storedProcedureResponse = + await scripts.CreateStoredProcedureAsync(new StoredProcedureProperties(sprocId, sprocBody)); + + ManualResetEvent allDocsProcessed = new ManualResetEvent(false); + + int processedDocCount = 0; + string accumulator = string.Empty; + ChangeFeedProcessor processor = this.Container + .GetChangeFeedProcessorBuilder("test", (Stream stream, CancellationToken token) => + { + IEnumerable asEnumerable = CosmosFeedResponseSerializer.FromFeedResponseStream(this.serializerCore, stream); + + processedDocCount += asEnumerable.Count(); + foreach (JObject doc in asEnumerable) + { + accumulator += doc["id"].ToString() + "."; + } + + if (processedDocCount == 5) + { + allDocsProcessed.Set(); + } + + return Task.CompletedTask; + }) + .WithStartFromBeginning() + .WithInstanceName("random") + .WithMaxItems(6) + .WithLeaseContainer(this.LeaseContainer).Build(); + + // Generate the payload + await scripts.ExecuteStoredProcedureAsync( + sprocId, + new PartitionKey(partitionKey), + new dynamic[] { 0 }); + + // Create 3 docs each 1.5MB. All 3 do not fit into MAX_RESPONSE_SIZE (4 MB). 2nd and 3rd are in same transaction. + string content = string.Format("{{\"id\": \"doc2\", \"value\": \"{0}\", \"pk\": 0}}", new string('x', 1500000)); + await this.Container.CreateItemAsync(JsonConvert.DeserializeObject(content), new PartitionKey(partitionKey)); + + await scripts.ExecuteStoredProcedureAsync(sprocId, new PartitionKey(partitionKey), new dynamic[] { 3 }); + + await processor.StartAsync(); + // Letting processor initialize and pickup changes + bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime); + await processor.StopAsync(); + Assert.IsTrue(isStartOk, "Timed out waiting for docs to process"); + Assert.AreEqual("doc0.doc1.doc2.doc3.doc4.", accumulator); + } + + [TestMethod] + public async Task TestWithStartTime_Beginning() + { + int partitionKey = 0; + + ManualResetEvent allDocsProcessed = new ManualResetEvent(false); + + int processedDocCount = 0; + string accumulator = string.Empty; + ChangeFeedProcessor processor = this.Container + .GetChangeFeedProcessorBuilder("test", (Stream stream, CancellationToken token) => + { + IEnumerable asEnumerable = CosmosFeedResponseSerializer.FromFeedResponseStream(this.serializerCore, stream); + + processedDocCount += asEnumerable.Count(); + foreach (JObject doc in asEnumerable) + { + accumulator += doc["id"].ToString() + "."; + } + + if (processedDocCount == 5) + { + allDocsProcessed.Set(); + } + + return Task.CompletedTask; + }) + .WithStartTime(DateTime.MinValue.ToUniversalTime()) + .WithInstanceName("random") + .WithLeaseContainer(this.LeaseContainer).Build(); + + await processor.StartAsync(); + await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime); + + foreach (int id in Enumerable.Range(0, 5)) + { + await this.Container.CreateItemAsync(new { id = $"doc{id}", pk = partitionKey }); + } + + // Letting processor initialize and pickup changes + bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime); + await processor.StopAsync(); + Assert.IsTrue(isStartOk, "Timed out waiting for docs to process"); + Assert.AreEqual("doc0.doc1.doc2.doc3.doc4.", accumulator); + } + } +} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/AutoCheckPointTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/AutoCheckPointTests.cs index 54c93819f9..c401eb4e96 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/AutoCheckPointTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/AutoCheckPointTests.cs @@ -4,17 +4,12 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests { - using System; - using System.Collections.Generic; - using System.Diagnostics; + using System.IO; using System.Threading; using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.ChangeFeed.Bootstrapping; - using Microsoft.Azure.Cosmos.ChangeFeed.Configuration; using Microsoft.Azure.Cosmos.ChangeFeed.Exceptions; using Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement; using Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing; - using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -22,111 +17,64 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests [TestCategory("ChangeFeed")] public class AutoCheckPointTests { - private readonly ChangeFeedObserver changeFeedObserver; + private readonly ChangeFeedObserver changeFeedObserver; private readonly ChangeFeedObserverContext observerContext; - private readonly CheckpointFrequency checkpointFrequency; - private readonly AutoCheckpointer sut; - private readonly IReadOnlyList documents; + private readonly AutoCheckpointer sut; + private readonly Stream stream; private readonly PartitionCheckpointer partitionCheckpointer; public AutoCheckPointTests() { - changeFeedObserver = Mock.Of>(); - partitionCheckpointer = Mock.Of(); - Mock.Get(partitionCheckpointer) + this.changeFeedObserver = Mock.Of(); + this.partitionCheckpointer = Mock.Of(); + Mock.Get(this.partitionCheckpointer) .Setup(checkPointer => checkPointer.CheckpointPartitionAsync(It.IsAny())) .Returns(Task.CompletedTask); - checkpointFrequency = new CheckpointFrequency(); - sut = new AutoCheckpointer(checkpointFrequency, changeFeedObserver); + this.sut = new AutoCheckpointer(this.changeFeedObserver); - documents = Mock.Of>(); + this.stream = Mock.Of(); - observerContext = Mock.Of(); - Mock.Get(observerContext) + this.observerContext = Mock.Of(); + Mock.Get(this.observerContext) .Setup(context => context.CheckpointAsync()) - .Returns(partitionCheckpointer.CheckpointPartitionAsync("token")); + .Returns(this.partitionCheckpointer.CheckpointPartitionAsync("token")); } [TestMethod] public async Task OpenAsync_WhenCalled_ShouldOpenObserver() { - await sut.OpenAsync(observerContext); + await this.sut.OpenAsync(this.observerContext); - Mock.Get(changeFeedObserver) - .Verify(observer => observer.OpenAsync(observerContext), Times.Once); + Mock.Get(this.changeFeedObserver) + .Verify(observer => observer.OpenAsync(this.observerContext), Times.Once); } [TestMethod] public async Task CloseAsync_WhenCalled_ShouldCloseObserver() { - await sut.CloseAsync(observerContext, ChangeFeedObserverCloseReason.ResourceGone); + await this.sut.CloseAsync(this.observerContext, ChangeFeedObserverCloseReason.ResourceGone); - Mock.Get(changeFeedObserver) - .Verify(observer => observer.CloseAsync(observerContext, ChangeFeedObserverCloseReason.ResourceGone), Times.Once); + Mock.Get(this.changeFeedObserver) + .Verify(observer => observer.CloseAsync(this.observerContext, ChangeFeedObserverCloseReason.ResourceGone), Times.Once); } [TestMethod] public async Task ProcessChanges_WhenCalled_ShouldPassTheBatch() { - await sut.ProcessChangesAsync(observerContext, documents, CancellationToken.None); + await this.sut.ProcessChangesAsync(this.observerContext, this.stream, CancellationToken.None); - Mock.Get(changeFeedObserver) - .Verify(observer => observer.ProcessChangesAsync(observerContext, documents, CancellationToken.None), Times.Once); + Mock.Get(this.changeFeedObserver) + .Verify(observer => observer.ProcessChangesAsync(this.observerContext, this.stream, CancellationToken.None), Times.Once); } [TestMethod] public async Task ProcessChanges_WhenCheckpointThrows_ShouldThrow() { - checkpointFrequency.TimeInterval = TimeSpan.Zero; - ChangeFeedObserverContext observerContext = Mock.Of(); Mock.Get(observerContext).Setup(abs => abs.CheckpointAsync()).Throws(new LeaseLostException()); - await Assert.ThrowsExceptionAsync(() => sut.ProcessChangesAsync(observerContext, documents, CancellationToken.None)); - } - - [TestMethod] - public async Task ProcessChanges_WhenPeriodPass_ShouldCheckpoint() - { - Stopwatch stopwatch = Stopwatch.StartNew(); - checkpointFrequency.TimeInterval = TimeSpan.FromHours(1); - await sut.ProcessChangesAsync(observerContext, documents, CancellationToken.None); - Mock.Get(observerContext) - .Verify(context => context.CheckpointAsync(), Times.Never); - - await Task.Delay(TimeSpan.FromSeconds(1)); - - checkpointFrequency.TimeInterval = stopwatch.Elapsed; - await sut.ProcessChangesAsync(observerContext, documents, CancellationToken.None); - Mock.Get(observerContext) - .Verify(context => context.CheckpointAsync(), Times.Once); - } - - [TestMethod] - public async Task ProcessChanges_WithDocTrigger_ShouldCheckpointWhenAbove() - { - Mock.Get(documents) - .Setup(list => list.Count) - .Returns(1); - - checkpointFrequency.ProcessedDocumentCount = 2; - - await sut.ProcessChangesAsync(observerContext, documents, CancellationToken.None); - Mock.Get(observerContext) - .Verify(context => context.CheckpointAsync(), Times.Never); - - await sut.ProcessChangesAsync(observerContext, documents, CancellationToken.None); - Mock.Get(observerContext) - .Verify(context => context.CheckpointAsync(), Times.Once); - - await sut.ProcessChangesAsync(observerContext, documents, CancellationToken.None); - Mock.Get(observerContext) - .Verify(context => context.CheckpointAsync(), Times.Once); - - await sut.ProcessChangesAsync(observerContext, documents, CancellationToken.None); - Mock.Get(observerContext) - .Verify(context => context.CheckpointAsync(), Times.Exactly(2)); + await Assert.ThrowsExceptionAsync(() => this.sut.ProcessChangesAsync(observerContext, this.stream, CancellationToken.None)); } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs index bf56eb92f2..16a3d173a9 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs @@ -24,7 +24,7 @@ public class ChangeFeedProcessorCoreTests [ExpectedException(typeof(ArgumentNullException))] public void ApplyBuildConfiguration_ValidatesNullStore() { - ChangeFeedProcessorCore processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _); + ChangeFeedProcessorCore processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _); processor.ApplyBuildConfiguration( null, null, @@ -38,7 +38,7 @@ public void ApplyBuildConfiguration_ValidatesNullStore() [ExpectedException(typeof(ArgumentNullException))] public void ApplyBuildConfiguration_ValidatesNullInstance() { - ChangeFeedProcessorCore processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _); + ChangeFeedProcessorCore processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _); processor.ApplyBuildConfiguration( Mock.Of(), null, @@ -52,7 +52,7 @@ public void ApplyBuildConfiguration_ValidatesNullInstance() [ExpectedException(typeof(ArgumentNullException))] public void ApplyBuildConfiguration_ValidatesNullMonitoredContainer() { - ChangeFeedProcessorCore processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _); + ChangeFeedProcessorCore processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _); processor.ApplyBuildConfiguration( Mock.Of(), null, @@ -65,7 +65,7 @@ public void ApplyBuildConfiguration_ValidatesNullMonitoredContainer() [TestMethod] public void ApplyBuildConfiguration_ValidCustomStore() { - ChangeFeedProcessorCore processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _); + ChangeFeedProcessorCore processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _); processor.ApplyBuildConfiguration( Mock.Of(), null, @@ -78,7 +78,7 @@ public void ApplyBuildConfiguration_ValidCustomStore() [TestMethod] public void ApplyBuildConfiguration_ValidContainerStore() { - ChangeFeedProcessorCore processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _); + ChangeFeedProcessorCore processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _); processor.ApplyBuildConfiguration( null, ChangeFeedProcessorCoreTests.GetMockedContainer("leases"), @@ -102,10 +102,10 @@ public async Task StartAsync() leaseStoreManager.Setup(l => l.LeaseManager).Returns(Mock.Of); leaseStoreManager.Setup(l => l.LeaseStore).Returns(leaseStore.Object); leaseStoreManager.Setup(l => l.LeaseCheckpointer).Returns(Mock.Of); - ChangeFeedProcessorCore processor = null; + ChangeFeedProcessorCore processor = null; try { - processor = ChangeFeedProcessorCoreTests.CreateProcessor(out Mock> factory, out Mock> observer); + processor = ChangeFeedProcessorCoreTests.CreateProcessor(out Mock factory, out Mock observer); processor.ApplyBuildConfiguration( leaseStoreManager.Object, null, @@ -152,10 +152,10 @@ public async Task ObserverIsCreated() leaseStoreManager.Setup(l => l.LeaseManager).Returns(Mock.Of); leaseStoreManager.Setup(l => l.LeaseStore).Returns(leaseStore.Object); leaseStoreManager.Setup(l => l.LeaseCheckpointer).Returns(Mock.Of); - ChangeFeedProcessorCore processor = null; + ChangeFeedProcessorCore processor = null; try { - processor = ChangeFeedProcessorCoreTests.CreateProcessor(out Mock> factory, out Mock> observer); + processor = ChangeFeedProcessorCoreTests.CreateProcessor(out Mock factory, out Mock observer); processor.ApplyBuildConfiguration( leaseStoreManager.Object, null, @@ -196,7 +196,7 @@ public async Task StopAsync() leaseStoreManager.Setup(l => l.LeaseManager).Returns(Mock.Of); leaseStoreManager.Setup(l => l.LeaseStore).Returns(leaseStore.Object); leaseStoreManager.Setup(l => l.LeaseCheckpointer).Returns(Mock.Of); - ChangeFeedProcessorCore processor = ChangeFeedProcessorCoreTests.CreateProcessor(out Mock> factory, out Mock> observer); + ChangeFeedProcessorCore processor = ChangeFeedProcessorCoreTests.CreateProcessor(out Mock factory, out Mock observer); processor.ApplyBuildConfiguration( leaseStoreManager.Object, null, @@ -213,15 +213,15 @@ public async Task StopAsync() } - private static ChangeFeedProcessorCore CreateProcessor( - out Mock> factory, - out Mock> observer) + private static ChangeFeedProcessorCore CreateProcessor( + out Mock factory, + out Mock observer) { - factory = new Mock>(); - observer = new Mock>(); + factory = new Mock(); + observer = new Mock(); factory.Setup(f => f.CreateObserver()).Returns(observer.Object); - return new ChangeFeedProcessorCore(factory.Object); + return new ChangeFeedProcessorCore(factory.Object); } public class MyDocument diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/FeedProcessorCoreTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/FeedProcessorCoreTests.cs index 668e64b860..5d362155ef 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/FeedProcessorCoreTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/FeedProcessorCoreTests.cs @@ -13,10 +13,9 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests using Microsoft.Azure.Cosmos.ChangeFeed.Exceptions; using Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement; using Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing; - using Microsoft.Azure.Cosmos.Query.Core; - using Microsoft.Azure.Cosmos.Tests; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; + using static Microsoft.Azure.Cosmos.Container; [TestClass] [TestCategory("ChangeFeed")] @@ -31,23 +30,21 @@ public async Task UsesCustomSerializer() { CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(1000); - Mock> mockObserver = new Mock>(); - mockObserver.Setup(o => o.ProcessChangesAsync( - It.IsAny(), - It.Is>(list => list[0].id.Equals("test")), - It.IsAny())).Returns(Task.CompletedTask); + ChangesHandler handler = (changes, cancelationToken) => + { + IReadOnlyList list = changes as IReadOnlyList; + Assert.IsNotNull(list); + Assert.AreEqual("test", list[0].id); + return Task.CompletedTask; + }; Mock mockCheckpointer = new Mock(); Mock mockIterator = new Mock(); mockIterator.Setup(i => i.ReadNextAsync(It.IsAny())).ReturnsAsync(GetResponse(HttpStatusCode.OK, true)); mockIterator.SetupSequence(i => i.HasMoreResults).Returns(true).Returns(false); CustomSerializer serializer = new CustomSerializer(); - FeedProcessorCore processor = new FeedProcessorCore( - mockObserver.Object, - mockIterator.Object, - FeedProcessorCoreTests.DefaultSettings, - mockCheckpointer.Object, - new CosmosSerializerCore(serializer)); + ChangeFeedObserverFactoryCore factory = new ChangeFeedObserverFactoryCore(handler, new CosmosSerializerCore(serializer)); + FeedProcessorCore processor = new FeedProcessorCore(factory.CreateObserver(), mockIterator.Object, FeedProcessorCoreTests.DefaultSettings, mockCheckpointer.Object); try { @@ -58,13 +55,6 @@ public async Task UsesCustomSerializer() // Expected } - Mock.Get(mockObserver.Object) - .Verify(o => o.ProcessChangesAsync( - It.IsAny(), - It.Is>(list => list[0].id.Equals("test")), - It.IsAny()) - , Times.Once); - Assert.AreEqual(1, serializer.FromStreamCalled, "Should have called FromStream on the custom serializer"); } @@ -73,19 +63,15 @@ public async Task ThrowsOnFailedCustomSerializer() { CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(1000); - Mock> mockObserver = new Mock>(); Mock mockCheckpointer = new Mock(); Mock mockIterator = new Mock(); mockIterator.Setup(i => i.ReadNextAsync(It.IsAny())).ReturnsAsync(GetResponse(HttpStatusCode.OK, true)); mockIterator.SetupSequence(i => i.HasMoreResults).Returns(true).Returns(false); CustomSerializerFails serializer = new CustomSerializerFails(); - FeedProcessorCore processor = new FeedProcessorCore( - mockObserver.Object, - mockIterator.Object, - FeedProcessorCoreTests.DefaultSettings, - mockCheckpointer.Object, - new CosmosSerializerCore(serializer)); + ChangesHandler handler = (changes, cancelationToken) => Task.CompletedTask; + ChangeFeedObserverFactoryCore factory = new ChangeFeedObserverFactoryCore(handler, new CosmosSerializerCore(serializer)); + FeedProcessorCore processor = new FeedProcessorCore(factory.CreateObserver(), mockIterator.Object, FeedProcessorCoreTests.DefaultSettings, mockCheckpointer.Object); ObserverException caughtException = await Assert.ThrowsExceptionAsync(() => processor.RunAsync(cancellationTokenSource.Token)); Assert.IsInstanceOfType(caughtException.InnerException, typeof(CustomException)); @@ -98,19 +84,14 @@ public async Task ThrowOnPartitionSplit(HttpStatusCode statusCode, int subStatus { CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(1000); - Mock> mockObserver = new Mock>(); + Mock mockObserver = new Mock(); Mock mockCheckpointer = new Mock(); Mock mockIterator = new Mock(); mockIterator.Setup(i => i.ReadNextAsync(It.IsAny())) .ReturnsAsync(GetResponse(statusCode, false, subStatusCode)); - FeedProcessorCore processor = new FeedProcessorCore( - mockObserver.Object, - mockIterator.Object, - FeedProcessorCoreTests.DefaultSettings, - mockCheckpointer.Object, - MockCosmosUtil.Serializer); + FeedProcessorCore processor = new FeedProcessorCore(mockObserver.Object, mockIterator.Object, FeedProcessorCoreTests.DefaultSettings, mockCheckpointer.Object); await Assert.ThrowsExceptionAsync(() => processor.RunAsync(cancellationTokenSource.Token)); } @@ -121,19 +102,14 @@ public async Task ThrowOnPartitionGone(HttpStatusCode statusCode, int subStatusC { CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(1000); - Mock> mockObserver = new Mock>(); + Mock mockObserver = new Mock(); Mock mockCheckpointer = new Mock(); Mock mockIterator = new Mock(); mockIterator.Setup(i => i.ReadNextAsync(It.IsAny())) .ReturnsAsync(GetResponse(statusCode, false, subStatusCode)); - FeedProcessorCore processor = new FeedProcessorCore( - mockObserver.Object, - mockIterator.Object, - FeedProcessorCoreTests.DefaultSettings, - mockCheckpointer.Object, - MockCosmosUtil.Serializer); + FeedProcessorCore processor = new FeedProcessorCore(mockObserver.Object, mockIterator.Object, FeedProcessorCoreTests.DefaultSettings, mockCheckpointer.Object); await Assert.ThrowsExceptionAsync(() => processor.RunAsync(cancellationTokenSource.Token)); } @@ -144,19 +120,18 @@ public async Task ThrowOnReadSessionNotAvailable(HttpStatusCode statusCode, int { CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(1000); - Mock> mockObserver = new Mock>(); + Mock mockObserver = new Mock(); Mock mockCheckpointer = new Mock(); Mock mockIterator = new Mock(); mockIterator.Setup(i => i.ReadNextAsync(It.IsAny())) .ReturnsAsync(GetResponse(statusCode, false, subStatusCode)); - FeedProcessorCore processor = new FeedProcessorCore( + FeedProcessorCore processor = new FeedProcessorCore( mockObserver.Object, mockIterator.Object, FeedProcessorCoreTests.DefaultSettings, - mockCheckpointer.Object, - MockCosmosUtil.Serializer); + mockCheckpointer.Object); await Assert.ThrowsExceptionAsync(() => processor.RunAsync(cancellationTokenSource.Token)); } @@ -172,8 +147,10 @@ private static ResponseMessage GetResponse(HttpStatusCode statusCode, bool inclu if (includeItem) { - MyDocument document = new MyDocument(); - document.id = "test"; + MyDocument document = new MyDocument + { + id = "test" + }; message.Content = new CosmosJsonDotNetSerializer().ToStream(new { Documents = new List() { document } }); } @@ -188,7 +165,7 @@ public class MyDocument private class CustomSerializer : CosmosSerializer { - private CosmosSerializer cosmosSerializer = new CosmosJsonDotNetSerializer(); + private readonly CosmosSerializer cosmosSerializer = new CosmosJsonDotNetSerializer(); public int FromStreamCalled = 0; public int ToStreamCalled = 0; @@ -207,7 +184,7 @@ public override Stream ToStream(T input) private class CustomSerializerFails: CosmosSerializer { - private CosmosSerializer cosmosSerializer = new CosmosJsonDotNetSerializer(); + private readonly CosmosSerializer cosmosSerializer = new CosmosJsonDotNetSerializer(); public override T FromStream(Stream stream) { throw new CustomException(); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ObserverExceptionWrappingChangeFeedObserverDecoratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ObserverExceptionWrappingChangeFeedObserverDecoratorTests.cs index aea92cbb4b..56b9eac5b7 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ObserverExceptionWrappingChangeFeedObserverDecoratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ObserverExceptionWrappingChangeFeedObserverDecoratorTests.cs @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests { using System; using System.Collections.Generic; + using System.IO; using System.Linq; using System.Net; using System.Threading; @@ -20,25 +21,28 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests public class ObserverExceptionWrappingChangeFeedObserverDecoratorTests { private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); - private readonly Mock> observer; + private readonly Mock observer; private readonly ChangeFeedObserverContext changeFeedObserverContext; - private readonly FeedProcessing.ObserverExceptionWrappingChangeFeedObserverDecorator observerWrapper; + private readonly FeedProcessing.ObserverExceptionWrappingChangeFeedObserverDecorator observerWrapper; private readonly IReadOnlyList documents; + private readonly CosmosSerializerCore serializerCore; public ObserverExceptionWrappingChangeFeedObserverDecoratorTests() { - this.observer = new Mock>(); + this.observer = new Mock(); this.changeFeedObserverContext = Mock.Of(); - this.observerWrapper = new FeedProcessing.ObserverExceptionWrappingChangeFeedObserverDecorator(this.observer.Object); + this.observerWrapper = new FeedProcessing.ObserverExceptionWrappingChangeFeedObserverDecorator(this.observer.Object); - var document = new MyDocument(); - documents = new List { document }; + this.serializerCore = new CosmosSerializerCore(); + + MyDocument document = new MyDocument(); + this.documents = new List { document }; } [TestMethod] public async Task OpenAsync_ShouldCallOpenAsync() { - await observerWrapper.OpenAsync(this.changeFeedObserverContext); + await this.observerWrapper.OpenAsync(this.changeFeedObserverContext); Mock.Get(this.observer.Object) .Verify(feedObserver => feedObserver.OpenAsync(It.IsAny()), @@ -48,7 +52,7 @@ public async Task OpenAsync_ShouldCallOpenAsync() [TestMethod] public async Task CloseAsync_ShouldCallCloseAsync() { - await observerWrapper.CloseAsync(this.changeFeedObserverContext, ChangeFeedObserverCloseReason.Shutdown); + await this.observerWrapper.CloseAsync(this.changeFeedObserverContext, ChangeFeedObserverCloseReason.Shutdown); Mock.Get(this.observer.Object) .Verify(feedObserver => feedObserver @@ -60,12 +64,13 @@ public async Task CloseAsync_ShouldCallCloseAsync() [TestMethod] public async Task ProcessChangesAsync_ShouldPassDocumentsToProcessChangesAsync() { - await observerWrapper.ProcessChangesAsync(this.changeFeedObserverContext, this.documents, cancellationTokenSource.Token); + using Stream stream = this.serializerCore.ToStream(this.documents); + await this.observerWrapper.ProcessChangesAsync(this.changeFeedObserverContext, stream, this.cancellationTokenSource.Token); Mock.Get(this.observer.Object) .Verify(feedObserver => feedObserver .ProcessChangesAsync(It.IsAny(), - It.Is>(list => this.documents.SequenceEqual(list)), + It.Is(stream => this.ValidateStream(stream)), It.IsAny() ), Times.Once); @@ -74,14 +79,15 @@ public async Task ProcessChangesAsync_ShouldPassDocumentsToProcessChangesAsync() [TestMethod] public async Task ProcessChangesAsync_ShouldThrow_IfObserverThrows() { + using Stream stream = this.serializerCore.ToStream(this.documents); Mock.Get(this.observer.Object) .SetupSequence(feedObserver => feedObserver - .ProcessChangesAsync(It.IsAny(), It.IsAny>(), It.IsAny())) + .ProcessChangesAsync(It.IsAny(), It.IsAny(), It.IsAny())) .Throws(new Exception()); try { - await observerWrapper.ProcessChangesAsync(this.changeFeedObserverContext, this.documents, cancellationTokenSource.Token); + await this.observerWrapper.ProcessChangesAsync(this.changeFeedObserverContext, stream, this.cancellationTokenSource.Token); Assert.Fail("Should had thrown"); } catch (ObserverException ex) @@ -92,7 +98,7 @@ public async Task ProcessChangesAsync_ShouldThrow_IfObserverThrows() Mock.Get(this.observer.Object) .Verify(feedObserver => feedObserver .ProcessChangesAsync(It.IsAny(), - It.Is>(list => this.documents.SequenceEqual(list)), + It.Is(stream => this.ValidateStream(stream)), It.IsAny() ), Times.Once); @@ -101,14 +107,15 @@ public async Task ProcessChangesAsync_ShouldThrow_IfObserverThrows() [TestMethod] public async Task ProcessChangesAsync_ShouldThrow_IfObserverThrowsDocumentClientException() { + using Stream stream = this.serializerCore.ToStream(this.documents); Mock.Get(this.observer.Object) .SetupSequence(feedObserver => feedObserver - .ProcessChangesAsync(It.IsAny(), It.IsAny>(), It.IsAny())) + .ProcessChangesAsync(It.IsAny(), It.IsAny(), It.IsAny())) .Throws(new Documents.DocumentClientException("Some message", (HttpStatusCode) 429, Documents.SubStatusCodes.Unknown)); try { - await observerWrapper.ProcessChangesAsync(this.changeFeedObserverContext, this.documents, cancellationTokenSource.Token); + await this.observerWrapper.ProcessChangesAsync(this.changeFeedObserverContext, stream, this.cancellationTokenSource.Token); Assert.Fail("Should had thrown"); } catch (ObserverException ex) @@ -119,15 +126,34 @@ public async Task ProcessChangesAsync_ShouldThrow_IfObserverThrowsDocumentClient Mock.Get(this.observer.Object) .Verify(feedObserver => feedObserver .ProcessChangesAsync(It.IsAny(), - It.Is>(list => this.documents.SequenceEqual(list)), + It.Is(stream => this.ValidateStream(stream)), It.IsAny() ), Times.Once); } + private bool ValidateStream(Stream stream) + { + IEnumerable asEnumerable = CosmosFeedResponseSerializer.FromFeedResponseStream(this.serializerCore, stream); + return this.documents.SequenceEqual(asEnumerable, new MyDocument.Comparer()); + } + public class MyDocument { public string id { get; set; } + + public class Comparer : IEqualityComparer + { + public bool Equals(MyDocument x, MyDocument y) + { + return x.id?.Equals(y?.id) ?? y.id == null; + } + + public int GetHashCode(MyDocument obj) + { + return obj.id.GetHashCode(); + } + } } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/PartitionControllerTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/PartitionControllerTests.cs index 7e46703a48..4c34da49b6 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/PartitionControllerTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/PartitionControllerTests.cs @@ -21,7 +21,7 @@ public class PartitionControllerTests private readonly DocumentServiceLeaseManager leaseManager; private readonly FeedProcessor partitionProcessor; private readonly LeaseRenewer leaseRenewer; - private readonly ChangeFeedObserver observer; + private readonly ChangeFeedObserver observer; private readonly PartitionSynchronizer synchronizer; private readonly PartitionController sut; private readonly PartitionSupervisorFactory partitionSupervisorFactory; @@ -35,8 +35,8 @@ public PartitionControllerTests() this.partitionProcessor = MockPartitionProcessor(); this.leaseRenewer = MockRenewer(); - this.observer = MockObserver(); - this.partitionSupervisorFactory = Mock.Of(f => f.Create(this.lease) == new PartitionSupervisorCore(this.lease, this.observer, this.partitionProcessor, this.leaseRenewer)); + this.observer = Mock.Of(); + this.partitionSupervisorFactory = Mock.Of(f => f.Create(this.lease) == new PartitionSupervisorCore(this.lease, this.observer, this.partitionProcessor, this.leaseRenewer)); this.leaseManager = Mock.Of(); Mock.Get(this.leaseManager).Reset(); // Reset implicit/by default setup of properties. @@ -108,7 +108,7 @@ public async Task AddLease_ShouldIgnorePartitionObserving_IfDuplicateLease() FeedProcessor processorDuplicate = MockPartitionProcessor(); Mock.Get(this.partitionSupervisorFactory) .Setup(f => f.Create(this.lease)) - .Returns(new PartitionSupervisorCore(this.lease, this.observer, processorDuplicate, this.leaseRenewer)); + .Returns(new PartitionSupervisorCore(this.lease, this.observer, processorDuplicate, this.leaseRenewer)); await this.sut.AddOrUpdateLeaseAsync(this.lease).ConfigureAwait(false); @@ -162,7 +162,7 @@ public async Task AddLease_ShouldAcquireLease_IfSecondLeaseAdded() Mock.Get(this.partitionSupervisorFactory) .Setup(f => f.Create(lease2)) - .Returns(new PartitionSupervisorCore(lease2, this.observer, MockPartitionProcessor(), this.leaseRenewer)); + .Returns(new PartitionSupervisorCore(lease2, this.observer, MockPartitionProcessor(), this.leaseRenewer)); await this.sut.AddOrUpdateLeaseAsync(this.lease).ConfigureAwait(false); await this.sut.AddOrUpdateLeaseAsync(lease2).ConfigureAwait(false); @@ -182,7 +182,7 @@ public async Task AddLease_ShouldRunObserver_IfSecondAdded() FeedProcessor partitionProcessor2 = MockPartitionProcessor(); Mock.Get(this.partitionSupervisorFactory) .Setup(f => f.Create(lease2)) - .Returns(new PartitionSupervisorCore(lease2, this.observer, partitionProcessor2, this.leaseRenewer)); + .Returns(new PartitionSupervisorCore(lease2, this.observer, partitionProcessor2, this.leaseRenewer)); await this.sut.AddOrUpdateLeaseAsync(this.lease).ConfigureAwait(false); await this.sut.AddOrUpdateLeaseAsync(lease2).ConfigureAwait(false); @@ -214,7 +214,7 @@ public async Task Controller_ShouldReleasesLease_IfObserverExits() Mock.Get(this.partitionSupervisorFactory) .Setup(f => f.Create(this.lease)) - .Returns(new PartitionSupervisorCore(this.lease, this.observer, this.partitionProcessor, this.leaseRenewer)); + .Returns(new PartitionSupervisorCore(this.lease, this.observer, this.partitionProcessor, this.leaseRenewer)); await this.sut.AddOrUpdateLeaseAsync(this.lease).ConfigureAwait(false); await Task.Delay(TimeSpan.FromMilliseconds(100)).ConfigureAwait(false); @@ -311,16 +311,5 @@ private static LeaseRenewer MockRenewer() .Returns(token => Task.Delay(TimeSpan.FromMinutes(1), token)); return mock.Object; } - - private static ChangeFeedObserver MockObserver() - { - Mock> mock = new Mock>(); - return mock.Object; - } - - public class MyDocument - { - public string id { get; set; } - } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/PartitionSupervisorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/PartitionSupervisorTests.cs index ab69e0fb5d..b775a56228 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/PartitionSupervisorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/PartitionSupervisorTests.cs @@ -5,7 +5,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests { using System; - using System.Collections.Generic; + using System.IO; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.ChangeFeed.Exceptions; @@ -22,7 +22,7 @@ public class PartitionSupervisorTests : IDisposable private readonly DocumentServiceLease lease; private readonly LeaseRenewer leaseRenewer; private readonly FeedProcessor partitionProcessor; - private readonly ChangeFeedObserver observer; + private readonly ChangeFeedObserver observer; private readonly CancellationTokenSource shutdownToken = new CancellationTokenSource(TimeSpan.FromMinutes(5)); private readonly PartitionSupervisor sut; @@ -35,9 +35,9 @@ public PartitionSupervisorTests() this.leaseRenewer = Mock.Of(); this.partitionProcessor = Mock.Of(); - this.observer = Mock.Of>(); + this.observer = Mock.Of(); - this.sut = new PartitionSupervisorCore(this.lease, this.observer, this.partitionProcessor, this.leaseRenewer); + this.sut = new PartitionSupervisorCore(this.lease, this.observer, this.partitionProcessor, this.leaseRenewer); } [TestMethod] @@ -134,7 +134,7 @@ public async Task RunObserver_ShouldCloseWithObserverError_IfObserverFailed() public async Task RunObserver_ShouldPassPartitionToObserver_WhenExecuted() { Mock.Get(this.observer) - .Setup(feedObserver => feedObserver.ProcessChangesAsync(It.IsAny(), It.IsAny>(), It.IsAny())) + .Setup(feedObserver => feedObserver.ProcessChangesAsync(It.IsAny(), It.IsAny(), It.IsAny())) .Callback(() => this.shutdownToken.Cancel()); await this.sut.RunAsync(this.shutdownToken.Token).ConfigureAwait(false); @@ -146,32 +146,32 @@ public async Task RunObserver_ShouldPassPartitionToObserver_WhenExecuted() [TestMethod] public async Task RunObserver_ResourceGoneCloseReason_IfProcessorFailedWithPartitionNotFoundException() { - Mock.Get(partitionProcessor) + Mock.Get(this.partitionProcessor) .Setup(processor => processor.RunAsync(It.IsAny())) .ThrowsAsync(new FeedNotFoundException("processorException", "12345")); - Exception exception = await Assert.ThrowsExceptionAsync(() => sut.RunAsync(shutdownToken.Token)).ConfigureAwait(false); + Exception exception = await Assert.ThrowsExceptionAsync(() => this.sut.RunAsync(this.shutdownToken.Token)).ConfigureAwait(false); Assert.AreEqual("processorException", exception.Message); - Mock.Get(observer) + Mock.Get(this.observer) .Verify(feedObserver => feedObserver - .CloseAsync(It.Is(context => context.LeaseToken == lease.CurrentLeaseToken), + .CloseAsync(It.Is(context => context.LeaseToken == this.lease.CurrentLeaseToken), ChangeFeedObserverCloseReason.ResourceGone)); } [TestMethod] public async Task RunObserver_ReadSessionNotAvailableCloseReason_IfProcessorFailedWithReadSessionNotAvailableException() { - Mock.Get(partitionProcessor) + Mock.Get(this.partitionProcessor) .Setup(processor => processor.RunAsync(It.IsAny())) .ThrowsAsync(new FeedReadSessionNotAvailableException("processorException", "12345")); - Exception exception = await Assert.ThrowsExceptionAsync(() => sut.RunAsync(shutdownToken.Token)).ConfigureAwait(false); + Exception exception = await Assert.ThrowsExceptionAsync(() => this.sut.RunAsync(this.shutdownToken.Token)).ConfigureAwait(false); Assert.AreEqual("processorException", exception.Message); - Mock.Get(observer) + Mock.Get(this.observer) .Verify(feedObserver => feedObserver - .CloseAsync(It.Is(context => context.LeaseToken == lease.CurrentLeaseToken), + .CloseAsync(It.Is(context => context.LeaseToken == this.lease.CurrentLeaseToken), ChangeFeedObserverCloseReason.ReadSessionNotAvailable)); }