Skip to content

Commit

Permalink
Cross Partition Execution Context Refactor (#1260)
Browse files Browse the repository at this point in the history
* realized that this will be a lot of work

* wired up continuation token on the write path

* wiring continuation new continuation token in the read path

* updated distinct and group by

* drafted out code

* wired in serialize state

* fixed test cases

* I give up

* can't do it

* added min max continuation token

* removed TryGetContinuationToken

* fixed random bugs

* fixed more bugs

* about to gut out RequestContinuationToken and just wire through CosmosElement

* made input a cosmos element

* returning continuation token instead

* updated tests

* resolved iteration comments

* broke up parallel execution context into different files based on stages

* refactored parrallel resume code

* broke order by into separate files

* refactored code

* fixed bugs

* resolved iteration comments

* resolved iteration comments

* Update Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy/CosmosOrderByItemQueryExecutionContext.Resume.cs

Co-Authored-By: j82w <j82w@users.noreply.github.com>

* added tests and fixed off by one error

* fixed typo

Co-authored-by: j82w <j82w@users.noreply.github.com>
  • Loading branch information
bchong95 and j82w authored Mar 12, 2020
1 parent 4044aa9 commit e3e23a5
Show file tree
Hide file tree
Showing 13 changed files with 1,798 additions and 1,382 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens
using Microsoft.Azure.Cosmos.Query.Core.Exceptions;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents.Routing;
using Newtonsoft.Json;

/// <summary>
/// A composite continuation token that has both backend continuation token and partition range information.
/// </summary>
internal sealed class CompositeContinuationToken
internal sealed class CompositeContinuationToken : IPartitionedToken
{
private static class PropertyNames
{
Expand All @@ -41,6 +42,9 @@ public Documents.Routing.Range<string> Range
set;
}

[JsonIgnore]
public Range<string> PartitionRange => this.Range;

public object ShallowCopy()
{
return this.MemberwiseClone();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens
{
internal interface IPartitionedToken
{
Documents.Routing.Range<string> PartitionRange { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens
using Microsoft.Azure.Cosmos.Query.Core.Exceptions;
using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.OrderBy;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Documents.Routing;
using Newtonsoft.Json;

/// <summary>
Expand Down Expand Up @@ -53,7 +54,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens
/// {"compositeToken":{"token":"+RID:OpY0AN-mFAACAAAAAAAABA==#RT:1#TRC:1#RTD:qdTAEA==","range":{"min":"05C1D9CD673398","max":"05C1E399CD6732"}},"orderByItems"[{"item":2}],"rid":"OpY0AN-mFAACAAAAAAAABA==","skipCount":0,"filter":"r.key > 1"}
/// ]]>
/// </example>
internal sealed class OrderByContinuationToken
internal sealed class OrderByContinuationToken : IPartitionedToken
{
private static class PropertyNames
{
Expand Down Expand Up @@ -206,6 +207,9 @@ public string Filter
get;
}

[JsonIgnore]
public Range<string> PartitionRange => this.CompositeContinuationToken.Range;

public static CosmosElement ToCosmosElement(OrderByContinuationToken orderByContinuationToken)
{
CosmosElement compositeContinuationToken = CompositeContinuationToken.ToCosmosElement(orderByContinuationToken.CompositeContinuationToken);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.OrderBy
{
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens;
using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.ItemProducers;

internal sealed partial class CosmosOrderByItemQueryExecutionContext
{
/// <summary>
/// Gets the continuation token for an order by query.
/// </summary>
protected override string ContinuationToken
{
// In general the continuation token for order by queries contains the following information:
// 1) What partition did we leave off on
// 2) What value did we leave off
// Along with the constraints that we get from how we drain the documents:
// Let <x, y> mean that the last item we drained was item x from partition y.
// Then we know that for all partitions
// * < y that we have drained all items <= x
// * > y that we have drained all items < x
// * = y that we have drained all items <= x based on the backend continuation token for y
// With this information we have captured the progress for all partitions in a single continuation token.
get
{
IEnumerable<ItemProducer> activeItemProducers = this.GetActiveItemProducers();
string continuationToken;
if (activeItemProducers.Any())
{
IEnumerable<CosmosElement> orderByContinuationTokens = activeItemProducers.Select((itemProducer) =>
{
OrderByQueryResult orderByQueryResult = new OrderByQueryResult(itemProducer.Current);
string filter = itemProducer.Filter;
OrderByContinuationToken orderByContinuationToken = new OrderByContinuationToken(
new CompositeContinuationToken
{
Token = itemProducer.PreviousContinuationToken,
Range = itemProducer.PartitionKeyRange.ToRange(),
},
orderByQueryResult.OrderByItems,
orderByQueryResult.Rid,
this.ShouldIncrementSkipCount(itemProducer) ? this.skipCount + 1 : 0,
filter);
return OrderByContinuationToken.ToCosmosElement(orderByContinuationToken);
});

continuationToken = CosmosArray.Create(orderByContinuationTokens).ToString();
}
else
{
continuationToken = null;
}

// Note we are no longer escaping non ascii continuation tokens.
// It is the callers job to encode a continuation token before adding it to a header in their service.

return continuationToken;
}
}

public override CosmosElement GetCosmosElementContinuationToken()
{
IEnumerable<ItemProducer> activeItemProducers = this.GetActiveItemProducers();
if (!activeItemProducers.Any())
{
return default;
}

List<CosmosElement> orderByContinuationTokens = new List<CosmosElement>();
foreach (ItemProducer activeItemProducer in activeItemProducers)
{
OrderByQueryResult orderByQueryResult = new OrderByQueryResult(activeItemProducer.Current);
OrderByContinuationToken orderByContinuationToken = new OrderByContinuationToken(
compositeContinuationToken: new CompositeContinuationToken()
{
Token = activeItemProducer.PreviousContinuationToken,
Range = new Documents.Routing.Range<string>(
min: activeItemProducer.PartitionKeyRange.MinInclusive,
max: activeItemProducer.PartitionKeyRange.MaxExclusive,
isMinInclusive: true,
isMaxInclusive: false)
},
orderByItems: orderByQueryResult.OrderByItems,
rid: orderByQueryResult.Rid,
skipCount: this.ShouldIncrementSkipCount(activeItemProducer) ? this.skipCount + 1 : 0,
filter: activeItemProducer.Filter);

CosmosElement cosmosElementToken = OrderByContinuationToken.ToCosmosElement(orderByContinuationToken);
orderByContinuationTokens.Add(cosmosElementToken);
}

return CosmosArray.Create(orderByContinuationTokens);
}

/// <summary>
/// Equality comparer used to determine if a document producer needs it's continuation token returned.
/// Basically just says that the continuation token can be flushed once you stop seeing duplicates.
/// </summary>
private sealed class OrderByEqualityComparer : IEqualityComparer<CosmosElement>
{
/// <summary>
/// The order by comparer.
/// </summary>
private readonly OrderByItemProducerTreeComparer orderByConsumeComparer;

/// <summary>
/// Initializes a new instance of the OrderByEqualityComparer class.
/// </summary>
/// <param name="orderByConsumeComparer">The order by consume comparer.</param>
public OrderByEqualityComparer(OrderByItemProducerTreeComparer orderByConsumeComparer)
{
this.orderByConsumeComparer = orderByConsumeComparer ?? throw new ArgumentNullException($"{nameof(orderByConsumeComparer)} can not be null.");
}

/// <summary>
/// Gets whether two OrderByQueryResult instances are equal.
/// </summary>
/// <param name="x">The first.</param>
/// <param name="y">The second.</param>
/// <returns>Whether two OrderByQueryResult instances are equal.</returns>
public bool Equals(CosmosElement x, CosmosElement y)
{
OrderByQueryResult orderByQueryResultX = new OrderByQueryResult(x);
OrderByQueryResult orderByQueryResultY = new OrderByQueryResult(y);
return this.orderByConsumeComparer.CompareOrderByItems(
orderByQueryResultX.OrderByItems,
orderByQueryResultY.OrderByItems) == 0;
}

/// <summary>
/// Gets the hash code for object.
/// </summary>
/// <param name="obj">The object to hash.</param>
/// <returns>The hash code for the OrderByQueryResult object.</returns>
public int GetHashCode(CosmosElement obj)
{
return 0;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.OrderBy
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.ItemProducers;
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;

internal sealed partial class CosmosOrderByItemQueryExecutionContext
{
/// <summary>
/// Drains a page of documents from this context.
/// </summary>
/// <param name="maxElements">The maximum number of elements.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that when awaited on return a page of documents.</returns>
public override async Task<QueryResponseCore> DrainAsync(int maxElements, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

//// In order to maintain the continuation token for the user we must drain with a few constraints
//// 1) We always drain from the partition, which has the highest priority item first
//// 2) If multiple partitions have the same priority item then we drain from the left most first
//// otherwise we would need to keep track of how many of each item we drained from each partition
//// (just like parallel queries).
//// Visually that look the following case where we have three partitions that are numbered and store letters.
//// For teaching purposes I have made each item a tuple of the following form:
//// <item stored in partition, partition number>
//// So that duplicates across partitions are distinct, but duplicates within partitions are indistinguishable.
//// |-------| |-------| |-------|
//// | <a,1> | | <a,2> | | <a,3> |
//// | <a,1> | | <b,2> | | <c,3> |
//// | <a,1> | | <b,2> | | <c,3> |
//// | <d,1> | | <c,2> | | <c,3> |
//// | <d,1> | | <e,2> | | <f,3> |
//// | <e,1> | | <h,2> | | <j,3> |
//// | <f,1> | | <i,2> | | <k,3> |
//// |-------| |-------| |-------|
//// Now the correct drain order in this case is:
//// <a,1>,<a,1>,<a,1>,<a,2>,<a,3>,<b,2>,<b,2>,<c,2>,<c,3>,<c,3>,<c,3>,
//// <d,1>,<d,1>,<e,1>,<e,2>,<f,1>,<f,3>,<h,2>,<i,2>,<j,3>,<k,3>
//// In more mathematical terms
//// 1) <x, y> always comes before <z, y> where x < z
//// 2) <i, j> always come before <i, k> where j < k

List<CosmosElement> results = new List<CosmosElement>();
while (results.Count < maxElements)
{
// Only drain from the highest priority document producer
// We need to pop and push back the document producer tree, since the priority changes according to the sort order.
ItemProducerTree currentItemProducerTree = this.PopCurrentItemProducerTree();
try
{
if (!currentItemProducerTree.HasMoreResults)
{
// This means there are no more items to drain
break;
}

OrderByQueryResult orderByQueryResult = new OrderByQueryResult(currentItemProducerTree.Current);

// Only add the payload, since other stuff is garbage from the caller's perspective.
results.Add(orderByQueryResult.Payload);

// If we are at the beginning of the page and seeing an rid from the previous page we should increment the skip count
// due to the fact that JOINs can make a document appear multiple times and across continuations, so we don't want to
// surface this more than needed. More information can be found in the continuation token docs.
if (this.ShouldIncrementSkipCount(currentItemProducerTree.CurrentItemProducerTree.Root))
{
++this.skipCount;
}
else
{
this.skipCount = 0;
}

this.previousRid = orderByQueryResult.Rid;
this.previousOrderByItems = orderByQueryResult.OrderByItems;

if (!currentItemProducerTree.TryMoveNextDocumentWithinPage())
{
while (true)
{
(bool movedToNextPage, QueryResponseCore? failureResponse) = await currentItemProducerTree.TryMoveNextPageAsync(cancellationToken);
if (!movedToNextPage)
{
if (failureResponse.HasValue)
{
// TODO: We can buffer this failure so that the user can still get the pages we already got.
return failureResponse.Value;
}

break;
}

if (currentItemProducerTree.IsAtBeginningOfPage)
{
break;
}

if (currentItemProducerTree.TryMoveNextDocumentWithinPage())
{
break;
}
}
}
}
finally
{
this.PushCurrentItemProducerTree(currentItemProducerTree);
}
}

return QueryResponseCore.CreateSuccess(
result: results,
requestCharge: this.requestChargeTracker.GetAndResetCharge(),
activityId: null,
responseLengthBytes: this.GetAndResetResponseLengthBytes(),
disallowContinuationTokenMessage: null,
continuationToken: this.ContinuationToken,
diagnostics: this.GetAndResetDiagnostics());
}

/// <summary>
/// Gets whether or not we should increment the skip count based on the rid of the document.
/// </summary>
/// <param name="currentItemProducer">The current document producer.</param>
/// <returns>Whether or not we should increment the skip count.</returns>
private bool ShouldIncrementSkipCount(ItemProducer currentItemProducer)
{
// If we are not at the beginning of the page and we saw the same rid again.
return !currentItemProducer.IsAtBeginningOfPage &&
string.Equals(
this.previousRid,
new OrderByQueryResult(currentItemProducer.Current).Rid,
StringComparison.Ordinal);
}
}
}
Loading

0 comments on commit e3e23a5

Please sign in to comment.