Skip to content

Commit

Permalink
Internal Pagination: Adds Merge Proofing (#2084)
Browse files Browse the repository at this point in the history
* new algorithm

* refreshed unit tests

* refactored

* fixed inmemorycontainer get overlapping ranges

* fixed merge bug for getting the target range

* updated split and merge tests

* fixed order by in memory container continuation token

* resolved some iteration comments

* typo

Co-authored-by: Samer Boshra <sboshra@microsoft.com>
  • Loading branch information
bchong95 and sboshra committed Jan 11, 2021
1 parent 0e338b7 commit bd3f1f1
Show file tree
Hide file tree
Showing 15 changed files with 1,271 additions and 1,019 deletions.
22 changes: 22 additions & 0 deletions Microsoft.Azure.Cosmos/src/FeedRange/FeedRanges/FeedRangeEpk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,27 @@ public override TResult Accept<TResult>(IFeedRangeTransformer<TResult> transform
{
return transformer.Visit(this);
}

public override bool Equals(object obj)
{
return this.Equals(obj as FeedRangeEpk);
}

public bool Equals(FeedRangeEpk other)
{
return (other != null)
&& this.Range.Min.Equals(other.Range.Min)
&& this.Range.Max.Equals(other.Range.Max)
&& this.Range.IsMinInclusive.Equals(other.Range.IsMinInclusive)
&& this.Range.IsMaxInclusive.Equals(other.Range.IsMaxInclusive);
}

public override int GetHashCode()
{
return this.Range.Min.GetHashCode()
^ this.Range.Max.GetHashCode()
^ this.Range.IsMinInclusive.GetHashCode()
^ this.Range.IsMaxInclusive.GetHashCode();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
if (IsSplitException(exception))
{
// Handle split

List<FeedRangeEpk> childRanges = await this.feedRangeProvider.GetChildRangeAsync(
currentPaginator.Range,
childTrace,
Expand Down Expand Up @@ -181,11 +182,6 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
return await this.MoveNextAsync(childTrace);
}

if (IsMergeException(exception))
{
throw new NotImplementedException();
}

// Just enqueue the paginator and the user can decide if they want to retry.
enumerators.Enqueue(currentPaginator);

Expand Down Expand Up @@ -242,12 +238,6 @@ private static bool IsSplitException(Exception exeception)
&& (cosmosException.SubStatusCode == (int)Documents.SubStatusCodes.PartitionKeyRangeGone);
}

private static bool IsMergeException(Exception exception)
{
// TODO: code this out
return false;
}

private interface IQueue<T> : IEnumerable<T>
{
T Peek();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(

PartitionMapper.PartitionMapping<OrderByContinuationToken> partitionMapping = monadicGetOrderByContinuationTokenMapping.Result;
IReadOnlyList<CosmosElement> orderByItems = partitionMapping
.TargetPartition
.TargetMapping
.Values
.First()
.OrderByItems
Expand All @@ -639,9 +639,9 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
(string leftFilter, string targetFilter, string rightFilter) = OrderByCrossPartitionQueryPipelineStage.GetFormattedFilters(columnAndItems);
List<(IReadOnlyDictionary<FeedRangeEpk, OrderByContinuationToken>, string)> tokenMappingAndFilters = new List<(IReadOnlyDictionary<FeedRangeEpk, OrderByContinuationToken>, string)>()
{
{ (partitionMapping.PartitionsLeftOfTarget, leftFilter) },
{ (partitionMapping.TargetPartition, targetFilter) },
{ (partitionMapping.PartitionsRightOfTarget, rightFilter) },
{ (partitionMapping.MappingLeftOfTarget, leftFilter) },
{ (partitionMapping.TargetMapping, targetFilter) },
{ (partitionMapping.MappingRightOfTarget, rightFilter) },
};

enumeratorsAndTokens = new List<(OrderByQueryPartitionRangePageAsyncEnumerator, OrderByContinuationToken)>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ private static TryCatch<CrossFeedRangeState<QueryState>> MonadicExtractState(
List<IReadOnlyDictionary<FeedRangeEpk, ParallelContinuationToken>> rangesToInitialize = new List<IReadOnlyDictionary<FeedRangeEpk, ParallelContinuationToken>>()
{
// Skip all the partitions left of the target range, since they have already been drained fully.
partitionMapping.TargetPartition,
partitionMapping.PartitionsRightOfTarget,
partitionMapping.TargetMapping,
partitionMapping.MappingRightOfTarget,
};

foreach (IReadOnlyDictionary<FeedRangeEpk, ParallelContinuationToken> rangeToInitalize in rangesToInitialize)
Expand Down
Loading

0 comments on commit bd3f1f1

Please sign in to comment.