Skip to content

Commit

Permalink
Porting changes from #888
Browse files Browse the repository at this point in the history
  • Loading branch information
ealsur committed Mar 22, 2021
1 parent 35d1744 commit 2b9a221
Show file tree
Hide file tree
Showing 30 changed files with 576 additions and 363 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Tracing;

internal sealed class ChangeFeedProcessorCore<T> : ChangeFeedProcessor
internal sealed class ChangeFeedProcessorCore : ChangeFeedProcessor
{
private readonly ChangeFeedObserverFactory<T> observerFactory;
private readonly ChangeFeedObserverFactory observerFactory;
private ContainerInternal leaseContainer;
private string instanceName;
private ContainerInternal monitoredContainer;
Expand All @@ -28,7 +28,7 @@ internal sealed class ChangeFeedProcessorCore<T> : ChangeFeedProcessor
private DocumentServiceLeaseStoreManager documentServiceLeaseStoreManager;
private bool initialized = false;

public ChangeFeedProcessorCore(ChangeFeedObserverFactory<T> observerFactory)
public ChangeFeedProcessorCore(ChangeFeedObserverFactory observerFactory)
{
this.observerFactory = observerFactory ?? throw new ArgumentNullException(nameof(observerFactory));
}
Expand Down Expand Up @@ -97,7 +97,7 @@ private PartitionManager BuildPartitionManager(
string containerRid,
Routing.PartitionKeyRangeCache partitionKeyRangeCache)
{
CheckpointerObserverFactory<T> factory = new CheckpointerObserverFactory<T>(this.observerFactory, this.changeFeedProcessorOptions.CheckpointFrequency);
CheckpointerObserverFactory factory = new CheckpointerObserverFactory(this.observerFactory, this.changeFeedProcessorOptions.CheckpointFrequency);
PartitionSynchronizerCore synchronizer = new PartitionSynchronizerCore(
this.monitoredContainer,
this.documentServiceLeaseStoreManager.LeaseContainer,
Expand All @@ -106,10 +106,10 @@ private PartitionManager BuildPartitionManager(
partitionKeyRangeCache,
containerRid);
BootstrapperCore bootstrapper = new BootstrapperCore(synchronizer, this.documentServiceLeaseStoreManager.LeaseStore, BootstrapperCore.DefaultLockTime, BootstrapperCore.DefaultSleepTime);
PartitionSupervisorFactoryCore<T> partitionSuperviserFactory = new PartitionSupervisorFactoryCore<T>(
PartitionSupervisorFactoryCore partitionSuperviserFactory = new PartitionSupervisorFactoryCore(
factory,
this.documentServiceLeaseStoreManager.LeaseManager,
new FeedProcessorFactoryCore<T>(this.monitoredContainer, this.changeFeedProcessorOptions, this.documentServiceLeaseStoreManager.LeaseCheckpointer, this.monitoredContainer.ClientContext.SerializerCore),
new FeedProcessorFactoryCore(this.monitoredContainer, this.changeFeedProcessorOptions, this.documentServiceLeaseStoreManager.LeaseCheckpointer),
this.changeFeedLeaseOptions);

EqualPartitionsBalancingStrategy loadBalancingStrategy = new EqualPartitionsBalancingStrategy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Configuration
using System;

/// <summary>
/// Options to control various aspects of partition distribution happening within <see cref="ChangeFeedProcessorCore{T}"/> instance.
/// Options to control various aspects of partition distribution happening within <see cref="ChangeFeedProcessorCore"/> instance.
/// </summary>
internal class ChangeFeedLeaseOptions
{
Expand All @@ -24,7 +24,7 @@ public ChangeFeedLeaseOptions()
}

/// <summary>
/// Gets or sets renew interval for all leases currently held by <see cref="ChangeFeedProcessorCore{T}"/> instance.
/// Gets or sets renew interval for all leases currently held by <see cref="ChangeFeedProcessorCore"/> instance.
/// </summary>
public TimeSpan LeaseRenewInterval { get; set; }

Expand All @@ -35,12 +35,12 @@ public ChangeFeedLeaseOptions()

/// <summary>
/// Gets or sets the interval for which the lease is taken. If the lease is not renewed within this
/// interval, it will cause it to expire and ownership of the lease will move to another <see cref="ChangeFeedProcessorCore{T}"/> instance.
/// interval, it will cause it to expire and ownership of the lease will move to another <see cref="ChangeFeedProcessorCore"/> instance.
/// </summary>
public TimeSpan LeaseExpirationInterval { get; set; }

/// <summary>
/// Gets or sets a prefix to be used as part of the lease id. This can be used to support multiple instances of <see cref="ChangeFeedProcessorCore{T}"/>
/// Gets or sets a prefix to be used as part of the lease id. This can be used to support multiple instances of <see cref="ChangeFeedProcessorCore"/>
/// instances pointing at the same feed while using the same auxiliary collection.
/// </summary>
public string LeasePrefix { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Configuration
using Microsoft.Azure.Cosmos;

/// <summary>
/// Options to control various aspects of partition distribution happening within <see cref="ChangeFeedProcessorCore{T}"/> instance.
/// Options to control various aspects of partition distribution happening within <see cref="ChangeFeedProcessorCore"/> instance.
/// </summary>
internal class ChangeFeedProcessorOptions
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Microsoft.Azure.Cosmos.ChangeFeed.Configuration
{
using System;
using Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement;

/// <summary>
Expand All @@ -18,15 +17,5 @@ internal class CheckpointFrequency
/// Client code needs to explicitly checkpoint via <see cref="PartitionCheckpointer"/>
/// </summary>
public bool ExplicitCheckpoint { get; set; }

/// <summary>
/// Gets or sets the value that specifies to checkpoint every specified number of docs.
/// </summary>
public int? ProcessedDocumentCount { get; set; }

/// <summary>
/// Gets or sets the value that specifies to checkpoint every specified time interval.
/// </summary>
public TimeSpan? TimeInterval { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;

internal sealed class PartitionSupervisorCore<T> : PartitionSupervisor
internal sealed class PartitionSupervisorCore : PartitionSupervisor
{
private readonly DocumentServiceLease lease;
private readonly ChangeFeedObserver<T> observer;
private readonly ChangeFeedObserver observer;
private readonly FeedProcessor processor;
private readonly LeaseRenewer renewer;
private readonly CancellationTokenSource renewerCancellation = new CancellationTokenSource();
private CancellationTokenSource processorCancellation;

public PartitionSupervisorCore(DocumentServiceLease lease, ChangeFeedObserver<T> observer, FeedProcessor processor, LeaseRenewer renewer)
public PartitionSupervisorCore(DocumentServiceLease lease, ChangeFeedObserver observer, FeedProcessor processor, LeaseRenewer renewer)
{
this.lease = lease;
this.observer = observer;
Expand All @@ -31,7 +31,7 @@ public PartitionSupervisorCore(DocumentServiceLease lease, ChangeFeedObserver<T>

public override async Task RunAsync(CancellationToken shutdownToken)
{
ChangeFeedObserverContextCore<T> context = new ChangeFeedObserverContextCore<T>(this.lease.CurrentLeaseToken);
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.lease.CurrentLeaseToken);
await this.observer.OpenAsync(context).ConfigureAwait(false);

this.processorCancellation = CancellationTokenSource.CreateLinkedTokenSource(shutdownToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement
using Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;

internal sealed class PartitionSupervisorFactoryCore<T> : PartitionSupervisorFactory
internal sealed class PartitionSupervisorFactoryCore : PartitionSupervisorFactory
{
private readonly ChangeFeedObserverFactory<T> observerFactory;
private readonly ChangeFeedObserverFactory observerFactory;
private readonly DocumentServiceLeaseManager leaseManager;
private readonly ChangeFeedLeaseOptions changeFeedLeaseOptions;
private readonly FeedProcessorFactory<T> partitionProcessorFactory;
private readonly FeedProcessorFactory partitionProcessorFactory;

public PartitionSupervisorFactoryCore(
ChangeFeedObserverFactory<T> observerFactory,
ChangeFeedObserverFactory observerFactory,
DocumentServiceLeaseManager leaseManager,
FeedProcessorFactory<T> partitionProcessorFactory,
FeedProcessorFactory partitionProcessorFactory,
ChangeFeedLeaseOptions options)
{
this.observerFactory = observerFactory ?? throw new ArgumentNullException(nameof(observerFactory));
Expand All @@ -35,11 +35,11 @@ public override PartitionSupervisor Create(DocumentServiceLease lease)
throw new ArgumentNullException(nameof(lease));
}

ChangeFeedObserver<T> changeFeedObserver = this.observerFactory.CreateObserver();
ChangeFeedObserver changeFeedObserver = this.observerFactory.CreateObserver();
FeedProcessor processor = this.partitionProcessorFactory.Create(lease, changeFeedObserver);
LeaseRenewerCore renewer = new LeaseRenewerCore(lease, this.leaseManager, this.changeFeedLeaseOptions.LeaseRenewInterval);

return new PartitionSupervisorCore<T>(lease, changeFeedObserver, processor, renewer);
return new PartitionSupervisorCore(lease, changeFeedObserver, processor, renewer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,21 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.Configuration;

internal sealed class AutoCheckpointer<T> : ChangeFeedObserver<T>
internal sealed class AutoCheckpointer : ChangeFeedObserver
{
private readonly CheckpointFrequency checkpointFrequency;
private readonly ChangeFeedObserver<T> observer;
private int processedDocCount;
private DateTime lastCheckpointTime = DateTime.UtcNow;
private readonly ChangeFeedObserver observer;

public AutoCheckpointer(CheckpointFrequency checkpointFrequency, ChangeFeedObserver<T> observer)
public AutoCheckpointer(ChangeFeedObserver observer)
{
if (checkpointFrequency == null)
{
throw new ArgumentNullException(nameof(checkpointFrequency));
}

if (observer == null)
{
throw new ArgumentNullException(nameof(observer));
}

this.checkpointFrequency = checkpointFrequency;
this.observer = observer;
}

Expand All @@ -43,38 +33,14 @@ public override Task CloseAsync(ChangeFeedObserverContext context, ChangeFeedObs
return this.observer.CloseAsync(context, reason);
}

public override async Task ProcessChangesAsync(ChangeFeedObserverContext context, IReadOnlyCollection<T> docs, CancellationToken cancellationToken)
{
await this.observer.ProcessChangesAsync(context, docs, cancellationToken).ConfigureAwait(false);
this.processedDocCount += docs.Count;

if (this.IsCheckpointNeeded())
{
await context.CheckpointAsync().ConfigureAwait(false);
this.processedDocCount = 0;
this.lastCheckpointTime = DateTime.UtcNow;
}
}

private bool IsCheckpointNeeded()
public override async Task ProcessChangesAsync(
ChangeFeedObserverContext context,
Stream stream,
CancellationToken cancellationToken)
{
if (!this.checkpointFrequency.ProcessedDocumentCount.HasValue && !this.checkpointFrequency.TimeInterval.HasValue)
{
return true;
}

if (this.processedDocCount >= this.checkpointFrequency.ProcessedDocumentCount)
{
return true;
}

TimeSpan delta = DateTime.UtcNow - this.lastCheckpointTime;
if (delta >= this.checkpointFrequency.TimeInterval)
{
return true;
}
await this.observer.ProcessChangesAsync(context, stream, cancellationToken).ConfigureAwait(false);

return false;
await context.CheckpointAsync().ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@

namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing
{
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// This interface is used to deliver change events to document feed observers.
/// </summary>
internal abstract class ChangeFeedObserver<T>
internal abstract class ChangeFeedObserver
{
/// <summary>
/// This is called when change feed observer is opened.
Expand All @@ -32,9 +32,9 @@ internal abstract class ChangeFeedObserver<T>
/// This is called when document changes are available on change feed.
/// </summary>
/// <param name="context">The context specifying partition for this change event, etc.</param>
/// <param name="docs">The documents changed.</param>
/// <param name="stream">The document streams that contain the change feed events.</param>
/// <param name="cancellationToken">Token to signal that the partition processing is going to finish.</param>
/// <returns>A Task to allow asynchronous execution.</returns>
public abstract Task ProcessChangesAsync(ChangeFeedObserverContext context, IReadOnlyCollection<T> docs, CancellationToken cancellationToken);
public abstract Task ProcessChangesAsync(ChangeFeedObserverContext context, Stream stream, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@

namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing
{
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using static Microsoft.Azure.Cosmos.Container;

internal sealed class ChangeFeedObserverBase<T> : ChangeFeedObserver<T>
internal sealed class ChangeFeedObserverBase : ChangeFeedObserver
{
private readonly ChangesHandler<T> onChanges;
private readonly ChangesStreamHandler onChanges;

public ChangeFeedObserverBase(ChangesHandler<T> onChanges)
public ChangeFeedObserverBase(ChangesStreamHandler onChanges)
{
this.onChanges = onChanges;
}
Expand All @@ -28,9 +28,9 @@ public override Task OpenAsync(ChangeFeedObserverContext context)
return Task.CompletedTask;
}

public override Task ProcessChangesAsync(ChangeFeedObserverContext context, IReadOnlyCollection<T> docs, CancellationToken cancellationToken)
public override Task ProcessChangesAsync(ChangeFeedObserverContext context, Stream stream, CancellationToken cancellationToken)
{
return this.onChanges(docs, cancellationToken);
return this.onChanges(stream, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing
{
/// <summary>
/// The reason for the <see cref="ChangeFeedObserver{T}"/> to close.
/// The reason for the <see cref="ChangeFeedObserver"/> to close.
/// </summary>
internal enum ChangeFeedObserverCloseReason
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing
using System.Threading.Tasks;

/// <summary>
/// Represents the context passed to <see cref="ChangeFeedObserver{T}"/> events.
/// Represents the context passed to <see cref="ChangeFeedObserver"/> events.
/// </summary>
internal abstract class ChangeFeedObserverContext
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing
using Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement;

/// <summary>
/// The context passed to <see cref="ChangeFeedObserver{T}"/> events.
/// The context passed to <see cref="ChangeFeedObserver"/> events.
/// </summary>
internal sealed class ChangeFeedObserverContextCore<T> : ChangeFeedObserverContext
internal sealed class ChangeFeedObserverContextCore : ChangeFeedObserverContext
{
private readonly PartitionCheckpointer checkpointer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing
{
/// <summary>
/// Factory class used to create instance(s) of <see cref="ChangeFeedObserver{T}"/>.
/// Factory class used to create instance(s) of <see cref="ChangeFeedObserver"/>.
/// </summary>
internal abstract class ChangeFeedObserverFactory<T>
internal abstract class ChangeFeedObserverFactory
{
/// <summary>
/// Creates an instance of a <see cref="ChangeFeedObserver{T}"/>.
/// Creates an instance of a <see cref="ChangeFeedObserver"/>.
/// </summary>
/// <returns>An instance of a <see cref="ChangeFeedObserver{T}"/>.</returns>
public abstract ChangeFeedObserver<T> CreateObserver();
/// <returns>An instance of a <see cref="ChangeFeedObserver"/>.</returns>
public abstract ChangeFeedObserver CreateObserver();
}
}
Loading

0 comments on commit 2b9a221

Please sign in to comment.