Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cross Partition Execution Context Refactor #1260

Merged
merged 39 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
a001801
realized that this will be a lot of work
bchong95 Jan 8, 2020
f70d18a
merged
bchong95 Jan 15, 2020
f8359f2
wired up continuation token on the write path
bchong95 Jan 15, 2020
ab6bcc1
wiring continuation new continuation token in the read path
bchong95 Jan 17, 2020
a5fa91d
updated distinct and group by
bchong95 Jan 18, 2020
b50ae6d
drafted out code
bchong95 Jan 21, 2020
02e95ca
merged
bchong95 Jan 22, 2020
689fe8f
merged
bchong95 Jan 25, 2020
81418d0
wired in serialize state
bchong95 Jan 25, 2020
658d3f7
merged
bchong95 Jan 29, 2020
f4e689e
fixed test cases
bchong95 Jan 29, 2020
2ced726
merged
bchong95 Jan 29, 2020
3e470e0
I give up
bchong95 Jan 29, 2020
d5ff581
can't do it
bchong95 Jan 30, 2020
7ebd158
added min max continuation token
bchong95 Feb 13, 2020
230193c
merged
bchong95 Feb 13, 2020
20000cb
removed TryGetContinuationToken
bchong95 Feb 13, 2020
e76d053
fixed random bugs
bchong95 Feb 14, 2020
d9455b1
fixed more bugs
bchong95 Feb 14, 2020
ca3d0cf
about to gut out RequestContinuationToken and just wire through Cosmo…
bchong95 Feb 27, 2020
fb031f3
made input a cosmos element
bchong95 Feb 28, 2020
8d8279f
returning continuation token instead
bchong95 Feb 28, 2020
1e31ac2
merged
bchong95 Feb 28, 2020
4fe4933
updated tests
bchong95 Feb 28, 2020
abec934
resolved iteration comments
bchong95 Mar 2, 2020
144364f
broke up parallel execution context into different files based on stages
bchong95 Mar 2, 2020
26e2fc5
refactored parrallel resume code
bchong95 Mar 3, 2020
3f65c71
broke order by into separate files
bchong95 Mar 3, 2020
f0d2874
refactored code
bchong95 Mar 4, 2020
c52f60d
fixed bugs
bchong95 Mar 4, 2020
a3b2d5f
merged
bchong95 Mar 5, 2020
c894ddc
resolved iteration comments
bchong95 Mar 6, 2020
6494d65
resolved iteration comments
bchong95 Mar 9, 2020
69d0985
Update Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy…
bchong95 Mar 11, 2020
78339f0
added tests and fixed off by one error
bchong95 Mar 12, 2020
6fcac1a
Merge branch 'master' into users/brchon/Query/ExecutionContextFactory…
bchong95 Mar 12, 2020
a1faaa4
Merge branch 'users/brchon/Query/ExecutionContextFactoryRefactor' of …
bchong95 Mar 12, 2020
2737293
fixed typo
bchong95 Mar 12, 2020
8c85cae
merged
bchong95 Mar 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens
using Microsoft.Azure.Cosmos.Query.Core.Exceptions;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents.Routing;
using Newtonsoft.Json;

/// <summary>
/// A composite continuation token that has both backend continuation token and partition range information.
/// </summary>
internal sealed class CompositeContinuationToken
internal sealed class CompositeContinuationToken : IPartitionedToken
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
{
private static class PropertyNames
{
Expand All @@ -41,6 +42,9 @@ public Documents.Routing.Range<string> Range
set;
}

[JsonIgnore]
public Range<string> PartitionRange => this.Range;

public object ShallowCopy()
{
return this.MemberwiseClone();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens
{
internal interface IPartitionedToken
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
{
Documents.Routing.Range<string> PartitionRange { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens
using Microsoft.Azure.Cosmos.Query.Core.Exceptions;
using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.OrderBy;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Documents.Routing;
using Newtonsoft.Json;

/// <summary>
Expand Down Expand Up @@ -53,7 +54,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens
/// {"compositeToken":{"token":"+RID:OpY0AN-mFAACAAAAAAAABA==#RT:1#TRC:1#RTD:qdTAEA==","range":{"min":"05C1D9CD673398","max":"05C1E399CD6732"}},"orderByItems"[{"item":2}],"rid":"OpY0AN-mFAACAAAAAAAABA==","skipCount":0,"filter":"r.key > 1"}
/// ]]>
/// </example>
internal sealed class OrderByContinuationToken
internal sealed class OrderByContinuationToken : IPartitionedToken
{
private static class PropertyNames
{
Expand Down Expand Up @@ -206,6 +207,9 @@ public string Filter
get;
}

[JsonIgnore]
public Range<string> PartitionRange => this.CompositeContinuationToken.Range;

public static CosmosElement ToCosmosElement(OrderByContinuationToken orderByContinuationToken)
{
CosmosElement compositeContinuationToken = CompositeContinuationToken.ToCosmosElement(orderByContinuationToken.CompositeContinuationToken);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.OrderBy
{
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens;
using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.ItemProducers;

internal sealed partial class CosmosOrderByItemQueryExecutionContext
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
{
/// <summary>
/// Gets the continuation token for an order by query.
/// </summary>
protected override string ContinuationToken
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
{
// In general the continuation token for order by queries contains the following information:
// 1) What partition did we leave off on
// 2) What value did we leave off
// Along with the constraints that we get from how we drain the documents:
// Let <x, y> mean that the last item we drained was item x from partition y.
// Then we know that for all partitions
// * < y that we have drained all items <= x
// * > y that we have drained all items < x
// * = y that we have drained all items <= x based on the backend continuation token for y
// With this information we have captured the progress for all partitions in a single continuation token.
get
{
IEnumerable<ItemProducer> activeItemProducers = this.GetActiveItemProducers();
string continuationToken;
if (activeItemProducers.Any())
{
IEnumerable<CosmosElement> orderByContinuationTokens = activeItemProducers.Select((itemProducer) =>
{
OrderByQueryResult orderByQueryResult = new OrderByQueryResult(itemProducer.Current);
string filter = itemProducer.Filter;
OrderByContinuationToken orderByContinuationToken = new OrderByContinuationToken(
new CompositeContinuationToken
{
Token = itemProducer.PreviousContinuationToken,
Range = itemProducer.PartitionKeyRange.ToRange(),
},
orderByQueryResult.OrderByItems,
orderByQueryResult.Rid,
this.ShouldIncrementSkipCount(itemProducer) ? this.skipCount + 1 : 0,
filter);

return OrderByContinuationToken.ToCosmosElement(orderByContinuationToken);
});

continuationToken = CosmosArray.Create(orderByContinuationTokens).ToString();
}
else
{
continuationToken = null;
}

// Note we are no longer escaping non ascii continuation tokens.
// It is the callers job to encode a continuation token before adding it to a header in their service.

return continuationToken;
}
}

public override CosmosElement GetCosmosElementContinuationToken()
{
IEnumerable<ItemProducer> activeItemProducers = this.GetActiveItemProducers();
if (!activeItemProducers.Any())
{
return default;
}

List<CosmosElement> orderByContinuationTokens = new List<CosmosElement>();
foreach (ItemProducer activeItemProducer in activeItemProducers)
{
OrderByQueryResult orderByQueryResult = new OrderByQueryResult(activeItemProducer.Current);
OrderByContinuationToken orderByContinuationToken = new OrderByContinuationToken(
compositeContinuationToken: new CompositeContinuationToken()
{
Token = activeItemProducer.PreviousContinuationToken,
Range = new Documents.Routing.Range<string>(
min: activeItemProducer.PartitionKeyRange.MinInclusive,
max: activeItemProducer.PartitionKeyRange.MaxExclusive,
isMinInclusive: true,
isMaxInclusive: false)
},
orderByItems: orderByQueryResult.OrderByItems,
rid: orderByQueryResult.Rid,
skipCount: this.ShouldIncrementSkipCount(activeItemProducer) ? this.skipCount + 1 : 0,
filter: activeItemProducer.Filter);

CosmosElement cosmosElementToken = OrderByContinuationToken.ToCosmosElement(orderByContinuationToken);
orderByContinuationTokens.Add(cosmosElementToken);
}

return CosmosArray.Create(orderByContinuationTokens);
}

/// <summary>
/// Equality comparer used to determine if a document producer needs it's continuation token returned.
/// Basically just says that the continuation token can be flushed once you stop seeing duplicates.
/// </summary>
private sealed class OrderByEqualityComparer : IEqualityComparer<CosmosElement>
{
/// <summary>
/// The order by comparer.
/// </summary>
private readonly OrderByItemProducerTreeComparer orderByConsumeComparer;

/// <summary>
/// Initializes a new instance of the OrderByEqualityComparer class.
/// </summary>
/// <param name="orderByConsumeComparer">The order by consume comparer.</param>
public OrderByEqualityComparer(OrderByItemProducerTreeComparer orderByConsumeComparer)
{
this.orderByConsumeComparer = orderByConsumeComparer ?? throw new ArgumentNullException($"{nameof(orderByConsumeComparer)} can not be null.");
}

/// <summary>
/// Gets whether two OrderByQueryResult instances are equal.
/// </summary>
/// <param name="x">The first.</param>
/// <param name="y">The second.</param>
/// <returns>Whether two OrderByQueryResult instances are equal.</returns>
public bool Equals(CosmosElement x, CosmosElement y)
{
OrderByQueryResult orderByQueryResultX = new OrderByQueryResult(x);
OrderByQueryResult orderByQueryResultY = new OrderByQueryResult(y);
return this.orderByConsumeComparer.CompareOrderByItems(
orderByQueryResultX.OrderByItems,
orderByQueryResultY.OrderByItems) == 0;
}

/// <summary>
/// Gets the hash code for object.
/// </summary>
/// <param name="obj">The object to hash.</param>
/// <returns>The hash code for the OrderByQueryResult object.</returns>
public int GetHashCode(CosmosElement obj)
{
return 0;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.OrderBy
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.ItemProducers;
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;

internal sealed partial class CosmosOrderByItemQueryExecutionContext
{
/// <summary>
/// Drains a page of documents from this context.
/// </summary>
/// <param name="maxElements">The maximum number of elements.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that when awaited on return a page of documents.</returns>
public override async Task<QueryResponseCore> DrainAsync(int maxElements, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

//// In order to maintain the continuation token for the user we must drain with a few constraints
//// 1) We always drain from the partition, which has the highest priority item first
//// 2) If multiple partitions have the same priority item then we drain from the left most first
//// otherwise we would need to keep track of how many of each item we drained from each partition
//// (just like parallel queries).
//// Visually that look the following case where we have three partitions that are numbered and store letters.
//// For teaching purposes I have made each item a tuple of the following form:
//// <item stored in partition, partition number>
//// So that duplicates across partitions are distinct, but duplicates within partitions are indistinguishable.
//// |-------| |-------| |-------|
//// | <a,1> | | <a,2> | | <a,3> |
//// | <a,1> | | <b,2> | | <c,3> |
//// | <a,1> | | <b,2> | | <c,3> |
//// | <d,1> | | <c,2> | | <c,3> |
//// | <d,1> | | <e,2> | | <f,3> |
//// | <e,1> | | <h,2> | | <j,3> |
//// | <f,1> | | <i,2> | | <k,3> |
//// |-------| |-------| |-------|
//// Now the correct drain order in this case is:
//// <a,1>,<a,1>,<a,1>,<a,2>,<a,3>,<b,2>,<b,2>,<c,2>,<c,3>,<c,3>,<c,3>,
//// <d,1>,<d,1>,<e,1>,<e,2>,<f,1>,<f,3>,<h,2>,<i,2>,<j,3>,<k,3>
//// In more mathematical terms
//// 1) <x, y> always comes before <z, y> where x < z
//// 2) <i, j> always come before <i, k> where j < k

List<CosmosElement> results = new List<CosmosElement>();
while (results.Count < maxElements)
{
// Only drain from the highest priority document producer
// We need to pop and push back the document producer tree, since the priority changes according to the sort order.
ItemProducerTree currentItemProducerTree = this.PopCurrentItemProducerTree();
try
{
if (!currentItemProducerTree.HasMoreResults)
{
// This means there are no more items to drain
break;
}

OrderByQueryResult orderByQueryResult = new OrderByQueryResult(currentItemProducerTree.Current);

// Only add the payload, since other stuff is garbage from the caller's perspective.
results.Add(orderByQueryResult.Payload);

// If we are at the beginning of the page and seeing an rid from the previous page we should increment the skip count
// due to the fact that JOINs can make a document appear multiple times and across continuations, so we don't want to
// surface this more than needed. More information can be found in the continuation token docs.
if (this.ShouldIncrementSkipCount(currentItemProducerTree.CurrentItemProducerTree.Root))
{
++this.skipCount;
}
else
{
this.skipCount = 0;
}

this.previousRid = orderByQueryResult.Rid;
this.previousOrderByItems = orderByQueryResult.OrderByItems;

if (!currentItemProducerTree.TryMoveNextDocumentWithinPage())
{
while (true)
{
(bool movedToNextPage, QueryResponseCore? failureResponse) = await currentItemProducerTree.TryMoveNextPageAsync(cancellationToken);
if (!movedToNextPage)
{
if (failureResponse.HasValue)
{
// TODO: We can buffer this failure so that the user can still get the pages we already got.
return failureResponse.Value;
}

break;
}

if (currentItemProducerTree.IsAtBeginningOfPage)
{
break;
}

if (currentItemProducerTree.TryMoveNextDocumentWithinPage())
{
break;
}
}
}
}
finally
{
this.PushCurrentItemProducerTree(currentItemProducerTree);
}
}

return QueryResponseCore.CreateSuccess(
result: results,
requestCharge: this.requestChargeTracker.GetAndResetCharge(),
activityId: null,
responseLengthBytes: this.GetAndResetResponseLengthBytes(),
disallowContinuationTokenMessage: null,
continuationToken: this.ContinuationToken,
diagnostics: this.GetAndResetDiagnostics());
}

/// <summary>
/// Gets whether or not we should increment the skip count based on the rid of the document.
/// </summary>
/// <param name="currentItemProducer">The current document producer.</param>
/// <returns>Whether or not we should increment the skip count.</returns>
private bool ShouldIncrementSkipCount(ItemProducer currentItemProducer)
{
// If we are not at the beginning of the page and we saw the same rid again.
return !currentItemProducer.IsAtBeginningOfPage &&
string.Equals(
this.previousRid,
new OrderByQueryResult(currentItemProducer.Current).Rid,
StringComparison.Ordinal);
}
}
}
Loading