Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChangeFeed : Adds ChangeFeedStartFrom to support StartTimes x FeedRanges #1725

Merged
merged 33 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
0d924ee
Revert "Revert "ChangeFeedRequestOptions Refactor (#1332)" (#1684)"
bchong95 Jul 8, 2020
0398750
marking test as ignored as agreed upon
bchong95 Jul 9, 2020
0c8c604
Merge branch 'master' into revert-1684-users/jawilley/contract/change…
sboshra Jul 9, 2020
cc5b1a8
merged
bchong95 Jul 20, 2020
c6dbda7
Merge branch 'revert-1684-users/jawilley/contract/changefeed_revert' …
bchong95 Jul 20, 2020
4dba475
Merge branch 'master' into revert-1684-users/jawilley/contract/change…
bchong95 Jul 23, 2020
3d67d53
wired up changes
bchong95 Jul 23, 2020
f4e1be5
merged
bchong95 Jul 24, 2020
cb697d9
resolved iteration comments
bchong95 Jul 24, 2020
2bbbbd2
merged
bchong95 Jul 25, 2020
e3746f8
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
bchong95 Aug 6, 2020
990e8b0
merged
bchong95 Aug 6, 2020
72ecf96
made start from a required field
bchong95 Aug 6, 2020
e024d63
added feedback from API review
bchong95 Aug 7, 2020
9018e52
resolved iteration comments
bchong95 Aug 7, 2020
beb5650
fixed tests
bchong95 Aug 7, 2020
366d0d3
fixed feed range docs
bchong95 Aug 7, 2020
ab91449
removed unreachable code
bchong95 Aug 7, 2020
dd76679
fixed tests
bchong95 Aug 8, 2020
cdfdd1e
fixed tests
bchong95 Aug 10, 2020
223d91d
fixed mocks
bchong95 Aug 10, 2020
d098458
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
j82w Aug 10, 2020
860f221
resolved iteration comments
bchong95 Aug 10, 2020
33a803e
Merge branch 'users/brchon/ChangeFeed/StartFromFeedRange' of https://…
bchong95 Aug 10, 2020
3b224b0
fixed build issue
bchong95 Aug 10, 2020
7c5e7ab
fixed test build errors
bchong95 Aug 10, 2020
4ae6f31
more build fixes
bchong95 Aug 10, 2020
a486d30
updated preview API json
bchong95 Aug 10, 2020
412c98e
merged
bchong95 Aug 10, 2020
dc93109
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
j82w Aug 10, 2020
c3707f2
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
j82w Aug 11, 2020
7b045bd
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
j82w Aug 11, 2020
be3bf1f
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
j82w Aug 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,36 @@ public static ChangeFeedPartitionKeyResultSetIteratorCore BuildResultSetIterator
DateTime? startTime,
bool startFromBeginning)
{
ChangeFeedRequestOptions.StartFrom startFrom;
FeedRangeInternal feedRange = new FeedRangePartitionKeyRange(partitionKeyRangeId);

ChangeFeedStartFrom startFrom;
if (continuationToken != null)
{
startFrom = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(continuationToken);
// For continuation based feed range we need to manufactor a new continuation token that has the partition key range id in it.
startFrom = new ChangeFeedStartFromContinuationAndFeedRange(continuationToken, feedRange);
}
else if (startTime.HasValue)
{
startFrom = ChangeFeedRequestOptions.StartFrom.CreateFromTime(startTime.Value);
startFrom = ChangeFeedStartFrom.Time(startTime.Value, feedRange);
}
else if (startFromBeginning)
{
startFrom = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning();
startFrom = ChangeFeedStartFrom.Beginning(feedRange);
}
else
{
startFrom = ChangeFeedRequestOptions.StartFrom.CreateFromNow();
startFrom = ChangeFeedStartFrom.Now(feedRange);
}

ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions()
{
MaxItemCount = maxItemCount,
FeedRange = new FeedRangePartitionKeyRange(partitionKeyRangeId),
From = startFrom,
PageSizeHint = maxItemCount,
};

return new ChangeFeedPartitionKeyResultSetIteratorCore(
clientContext: container.ClientContext,
container: container,
changeFeedStartFrom: startFrom,
options: requestOptions);
}
}
Expand Down
346 changes: 346 additions & 0 deletions Microsoft.Azure.Cosmos/src/ChangeFeedStartFrom.cs

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Microsoft.Azure.Cosmos/src/FeedRange/FeedRange.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,7 @@ public static FeedRange FromJsonString(string toStringValue)

return parsedRange;
}

public static FeedRange CreateFromPartitionKey(PartitionKey partitionKey) => new FeedRangePartitionKey(partitionKey);
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
}
}
255 changes: 10 additions & 245 deletions Microsoft.Azure.Cosmos/src/RequestOptions/ChangeFeedRequestOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,29 @@ namespace Microsoft.Azure.Cosmos
#endif
sealed class ChangeFeedRequestOptions : RequestOptions
{
private int? maxItemCount;
private int? pageSizeHint;

/// <summary>
/// Gets or sets the maximum number of items to be returned in the enumeration operation in the Azure Cosmos DB service.
/// </summary>
/// <value>
/// The maximum number of items to be returned in the enumeration operation.
/// </value>
public int? MaxItemCount
/// </value>
/// <remarks>This is just a hint to the server which can return less items per page.</remarks>
public int? PageSizeHint
{
get => this.maxItemCount;
get => this.pageSizeHint;
set
{
if (value.HasValue && (value.Value <= 0))
{
throw new ArgumentOutOfRangeException($"{nameof(this.MaxItemCount)} must be a positive value.");
throw new ArgumentOutOfRangeException($"{nameof(this.PageSizeHint)} must be a positive value.");
}

this.maxItemCount = value;
this.pageSizeHint = value;
}
}

/// <summary>
/// Gets or sets where the ChangeFeed operation should start from. If not set then the ChangeFeed operation will start from now.
/// </summary>
/// <remarks>
/// Only applies in the case where no FeedToken is provided or the FeedToken was never used in a previous iterator.
/// </remarks>
public StartFrom From { get; set; } = StartFromNow.Singleton;

/// <summary>
/// Gets or set which ranges to execute the ChangeFeed operation on.
/// </summary>
public FeedRange FeedRange { get; set; } = FeedRangeEpk.FullRange;

/// <summary>
/// Fill the CosmosRequestMessage headers with the set properties
/// </summary>
Expand All @@ -64,25 +52,11 @@ internal override void PopulateRequestOptions(RequestMessage request)

base.PopulateRequestOptions(request);

PopulateStartFromRequestOptionVisitor visitor = new PopulateStartFromRequestOptionVisitor(request);
if (this.From == null)
{
throw new InvalidOperationException($"{nameof(ChangeFeedRequestOptions)}.{nameof(ChangeFeedRequestOptions.StartFrom)} needs to be set to a value.");
}

this.From.Accept(visitor);

if (this.MaxItemCount.HasValue)
if (this.PageSizeHint.HasValue)
{
request.Headers.Add(
HttpConstants.HttpHeaders.PageSize,
this.MaxItemCount.Value.ToString(CultureInfo.InvariantCulture));
}

if (this.FeedRange != null)
{
FeedRangeRequestMessagePopulatorVisitor feedRangeVisitor = new FeedRangeRequestMessagePopulatorVisitor(request);
((FeedRangeInternal)this.FeedRange).Accept(feedRangeVisitor);
this.PageSizeHint.Value.ToString(CultureInfo.InvariantCulture));
}

request.Headers.Add(
Expand Down Expand Up @@ -112,220 +86,11 @@ internal override void PopulateRequestOptions(RequestMessage request)
set => throw new NotSupportedException($"{nameof(ChangeFeedRequestOptions)} does not use the {nameof(this.IfNoneMatchEtag)} property.");
}

/// <summary>
/// Base class for where to start a ChangeFeed operation in <see cref="ChangeFeedRequestOptions"/>.
/// </summary>
/// <remarks>Use one of the static constructors to generate a StartFrom option.</remarks>
public abstract class StartFrom
{
/// <summary>
/// Initializes an instance of the <see cref="StartFrom"/> class.
/// </summary>
protected StartFrom()
{
}

internal abstract void Accept(StartFromVisitor visitor);

/// <summary>
/// Creates a <see cref="StartFrom"/> that tells the ChangeFeed operation to start reading changes from this moment onward.
/// </summary>
/// <returns>A <see cref="StartFrom"/> that tells the ChangeFeed operation to start reading changes from this moment onward.</returns>
public static StartFrom CreateFromNow()
{
return StartFromNow.Singleton;
}

/// <summary>
/// Creates a <see cref="StartFrom"/> that tells the ChangeFeed operation to start reading changes from some point in time onward.
/// </summary>
/// <param name="dateTime">The time to start reading from.</param>
/// <returns>A <see cref="StartFrom"/> that tells the ChangeFeed operation to start reading changes from some point in time onward.</returns>
public static StartFrom CreateFromTime(DateTime dateTime)
{
return new StartFromTime(dateTime);
}

/// <summary>
/// Creates a <see cref="StartFrom"/> that tells the ChangeFeed operation to start reading changes from a save point.
/// </summary>
/// <param name="continuation">The continuation to resume from.</param>
/// <returns>A <see cref="StartFrom"/> that tells the ChangeFeed operation to start reading changes from a save point.</returns>
public static StartFrom CreateFromContinuation(string continuation)
{
return new StartFromContinuation(continuation);
}

/// <summary>
/// Creates a <see cref="StartFrom"/> that tells the ChangeFeed operation to start from the beginning of time.
/// </summary>
/// <returns>A <see cref="StartFrom"/> that tells the ChangeFeed operation to start reading changes from the beginning of time.</returns>
public static StartFrom CreateFromBeginning()
{
return StartFromBeginning.Singleton;
}
}

internal abstract class StartFromVisitor
{
public abstract void Visit(StartFromNow startFromNow);
public abstract void Visit(StartFromTime startFromTime);
public abstract void Visit(StartFromContinuation startFromContinuation);
public abstract void Visit(StartFromBeginning startFromBeginning);
}

internal sealed class PopulateStartFromRequestOptionVisitor : StartFromVisitor
{
private const string IfNoneMatchAllHeaderValue = "*";
private static readonly DateTime StartFromBeginningTime = DateTime.MinValue.ToUniversalTime();

private readonly RequestMessage requestMessage;

public PopulateStartFromRequestOptionVisitor(RequestMessage requestMessage)
{
this.requestMessage = requestMessage ?? throw new ArgumentNullException(nameof(requestMessage));
}

public override void Visit(StartFromNow startFromNow)
{
this.requestMessage.Headers.IfNoneMatch = PopulateStartFromRequestOptionVisitor.IfNoneMatchAllHeaderValue;
}

public override void Visit(StartFromTime startFromTime)
{
// Our current public contract for ChangeFeedProcessor uses DateTime.MinValue.ToUniversalTime as beginning.
// We need to add a special case here, otherwise it would send it as normal StartTime.
// The problem is Multi master accounts do not support StartTime header on ReadFeed, and thus,
// it would break multi master Change Feed Processor users using Start From Beginning semantics.
// It's also an optimization, since the backend won't have to binary search for the value.
if (startFromTime.Time != PopulateStartFromRequestOptionVisitor.StartFromBeginningTime)
{
this.requestMessage.Headers.Add(
HttpConstants.HttpHeaders.IfModifiedSince,
startFromTime.Time.ToString("r", CultureInfo.InvariantCulture));
}
}

public override void Visit(StartFromContinuation startFromContinuation)
{
// On REST level, change feed is using IfNoneMatch/ETag instead of continuation
this.requestMessage.Headers.IfNoneMatch = startFromContinuation.Continuation;
}

public override void Visit(StartFromBeginning startFromBeginning)
{
// We don't need to set any headers to start from the beginning
}
}

/// <summary>
/// Derived instance of <see cref="StartFrom"/> that tells the ChangeFeed operation to start reading changes from this moment onward.
/// </summary>
internal sealed class StartFromNow : StartFrom
{
public static readonly StartFromNow Singleton = new StartFromNow();

/// <summary>
/// Intializes an instance of the <see cref="StartFromNow"/> class.
/// </summary>
public StartFromNow()
: base()
{
}

internal override void Accept(StartFromVisitor visitor)
{
visitor.Visit(this);
}
}

/// <summary>
/// Derived instance of <see cref="StartFrom"/> that tells the ChangeFeed operation to start reading changes from some point in time onward.
/// </summary>
internal sealed class StartFromTime : StartFrom
{
/// <summary>
/// Initializes an instance of the <see cref="StartFromTime"/> class.
/// </summary>
/// <param name="time">The time to start reading from.</param>
public StartFromTime(DateTime time)
: base()
{
if (time.Kind != DateTimeKind.Utc)
{
throw new ArgumentOutOfRangeException($"{nameof(time)}.{nameof(DateTime.Kind)} must be {nameof(DateTimeKind)}.{nameof(DateTimeKind.Utc)}");
}

this.Time = time;
}

/// <summary>
/// Gets the time the ChangeFeed operation should start reading from.
/// </summary>
public DateTime Time { get; }

internal override void Accept(StartFromVisitor visitor)
{
visitor.Visit(this);
}
}

/// <summary>
/// Derived instance of <see cref="StartFrom"/> that tells the ChangeFeed operation to start reading changes from a save point.
/// </summary>
internal sealed class StartFromContinuation : StartFrom
{
/// <summary>
/// Initializes an instance of the <see cref="StartFromContinuation"/> class.
/// </summary>
/// <param name="continuation">The continuation to resume from.</param>
public StartFromContinuation(string continuation)
: base()
{
if (string.IsNullOrWhiteSpace(continuation))
{
throw new ArgumentOutOfRangeException($"{nameof(continuation)} must not be null, empty, or whitespace.");
}

this.Continuation = continuation;
}

/// <summary>
/// Gets the continuation to resume from.
/// </summary>
public string Continuation { get; }

internal override void Accept(StartFromVisitor visitor)
{
visitor.Visit(this);
}
}

/// <summary>
/// Derived instance of <see cref="StartFrom"/> that tells the ChangeFeed operation to start reading changes from the beginning of time.
/// </summary>
internal sealed class StartFromBeginning : StartFrom
{
public static readonly StartFromBeginning Singleton = new StartFromBeginning();

public StartFromBeginning()
: base()
{
}

internal override void Accept(StartFromVisitor visitor)
{
visitor.Visit(this);
}
}

internal ChangeFeedRequestOptions Clone()
{
return new ChangeFeedRequestOptions()
{
MaxItemCount = this.maxItemCount,
From = this.From,
FeedRange = this.FeedRange,
PageSizeHint = this.pageSizeHint,
};
}
}
Expand Down
Loading