Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cross Partition Execution Context Refactor #1260

Merged
merged 39 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
a001801
realized that this will be a lot of work
bchong95 Jan 8, 2020
f70d18a
merged
bchong95 Jan 15, 2020
f8359f2
wired up continuation token on the write path
bchong95 Jan 15, 2020
ab6bcc1
wiring continuation new continuation token in the read path
bchong95 Jan 17, 2020
a5fa91d
updated distinct and group by
bchong95 Jan 18, 2020
b50ae6d
drafted out code
bchong95 Jan 21, 2020
02e95ca
merged
bchong95 Jan 22, 2020
689fe8f
merged
bchong95 Jan 25, 2020
81418d0
wired in serialize state
bchong95 Jan 25, 2020
658d3f7
merged
bchong95 Jan 29, 2020
f4e689e
fixed test cases
bchong95 Jan 29, 2020
2ced726
merged
bchong95 Jan 29, 2020
3e470e0
I give up
bchong95 Jan 29, 2020
d5ff581
can't do it
bchong95 Jan 30, 2020
7ebd158
added min max continuation token
bchong95 Feb 13, 2020
230193c
merged
bchong95 Feb 13, 2020
20000cb
removed TryGetContinuationToken
bchong95 Feb 13, 2020
e76d053
fixed random bugs
bchong95 Feb 14, 2020
d9455b1
fixed more bugs
bchong95 Feb 14, 2020
ca3d0cf
about to gut out RequestContinuationToken and just wire through Cosmo…
bchong95 Feb 27, 2020
fb031f3
made input a cosmos element
bchong95 Feb 28, 2020
8d8279f
returning continuation token instead
bchong95 Feb 28, 2020
1e31ac2
merged
bchong95 Feb 28, 2020
4fe4933
updated tests
bchong95 Feb 28, 2020
abec934
resolved iteration comments
bchong95 Mar 2, 2020
144364f
broke up parallel execution context into different files based on stages
bchong95 Mar 2, 2020
26e2fc5
refactored parrallel resume code
bchong95 Mar 3, 2020
3f65c71
broke order by into separate files
bchong95 Mar 3, 2020
f0d2874
refactored code
bchong95 Mar 4, 2020
c52f60d
fixed bugs
bchong95 Mar 4, 2020
a3b2d5f
merged
bchong95 Mar 5, 2020
c894ddc
resolved iteration comments
bchong95 Mar 6, 2020
6494d65
resolved iteration comments
bchong95 Mar 9, 2020
69d0985
Update Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy…
bchong95 Mar 11, 2020
78339f0
added tests and fixed off by one error
bchong95 Mar 12, 2020
6fcac1a
Merge branch 'master' into users/brchon/Query/ExecutionContextFactory…
bchong95 Mar 12, 2020
a1faaa4
Merge branch 'users/brchon/Query/ExecutionContextFactoryRefactor' of …
bchong95 Mar 12, 2020
2737293
fixed typo
bchong95 Mar 12, 2020
8c85cae
merged
bchong95 Mar 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
resolved iteration comments
  • Loading branch information
bchong95 committed Mar 2, 2020
commit abec934a836d793d92d484065b8e5dbf4d42c59a
32 changes: 26 additions & 6 deletions Microsoft.Azure.Cosmos/src/CosmosElements/CosmosElement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,34 @@ public override string ToString()
return Utf8StringHelpers.ToString(jsonWriter.GetResult());
}

public override bool Equals(object obj)
{
if (!(obj is CosmosElement cosmosElement))
{
return false;
}

return this.Equals(cosmosElement);
}

public bool Equals(CosmosElement cosmosElement)
{
return CosmosElementEqualityComparer.Value.Equals(this, cosmosElement);
}

public override int GetHashCode()
{
return CosmosElementEqualityComparer.Value.GetHashCode(this);
}

public abstract void WriteTo(IJsonWriter jsonWriter);

public abstract void Accept(ICosmosElementVisitor cosmosElementVisitor);

public abstract TResult Accept<TResult>(ICosmosElementVisitor<TResult> cosmosElementVisitor);

public abstract TResult Accept<TArg, TResult>(ICosmosElementVisitor<TArg, TResult> cosmosElementVisitor, TArg input);

public static bool TryCreateFromBuffer<TCosmosElement>(ReadOnlyMemory<byte> buffer, out TCosmosElement cosmosElement)
where TCosmosElement : CosmosElement
{
Expand All @@ -59,12 +85,6 @@ public static bool TryCreateFromBuffer<TCosmosElement>(ReadOnlyMemory<byte> buff
return true;
}

public abstract void Accept(ICosmosElementVisitor cosmosElementVisitor);

public abstract TResult Accept<TResult>(ICosmosElementVisitor<TResult> cosmosElementVisitor);

public abstract TResult Accept<TArg, TResult>(ICosmosElementVisitor<TArg, TResult> cosmosElementVisitor, TArg input);

public static CosmosElement CreateFromBuffer(ReadOnlyMemory<byte> buffer)
{
IJsonNavigator jsonNavigator = JsonNavigator.Create(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,23 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens
/// </summary>
internal sealed class CompositeContinuationToken
{
private const string TokenName = "token";
private const string RangeName = "range";
private static class PropertyNames
{
public const string Token = "token";
public const string Range = "range";

private const string MinName = "min";
private const string MaxName = "max";
public const string Min = "min";
public const string Max = "max";
}

[JsonProperty(TokenName)]
[JsonProperty(PropertyNames.Token)]
public string Token
{
get;
set;
}

[JsonProperty(RangeName)]
[JsonProperty(PropertyNames.Range)]
[JsonConverter(typeof(RangeJsonConverter))]
public Documents.Routing.Range<string> Range
{
Expand All @@ -49,14 +52,14 @@ public static CosmosElement ToCosmosElement(CompositeContinuationToken composite
return CosmosObject.Create(
new Dictionary<string, CosmosElement>()
{
{ CompositeContinuationToken.TokenName, token },
{ CompositeContinuationToken.PropertyNames.Token, token },
{
CompositeContinuationToken.RangeName,
CompositeContinuationToken.PropertyNames.Range,
CosmosObject.Create(
new Dictionary<string, CosmosElement>()
{
{ MinName, CosmosString.Create(compositeContinuationToken.Range.Min) },
{ MaxName, CosmosString.Create(compositeContinuationToken.Range.Max) }
{ PropertyNames.Min, CosmosString.Create(compositeContinuationToken.Range.Min) },
{ PropertyNames.Max, CosmosString.Create(compositeContinuationToken.Range.Max) }
})
},
});
Expand All @@ -70,10 +73,10 @@ public static TryCatch<CompositeContinuationToken> TryCreateFromCosmosElement(Co
new MalformedContinuationTokenException($"{nameof(CompositeContinuationToken)} is not an object: {cosmosElement}"));
}

if (!cosmosObject.TryGetValue(TokenName, out CosmosElement rawToken))
if (!cosmosObject.TryGetValue(PropertyNames.Token, out CosmosElement rawToken))
{
return TryCatch<CompositeContinuationToken>.FromException(
new MalformedContinuationTokenException($"{nameof(CompositeContinuationToken)} is missing field: '{TokenName}': {cosmosElement}"));
new MalformedContinuationTokenException($"{nameof(CompositeContinuationToken)} is missing field: '{PropertyNames.Token}': {cosmosElement}"));
}

string token;
Expand All @@ -86,24 +89,24 @@ public static TryCatch<CompositeContinuationToken> TryCreateFromCosmosElement(Co
token = null;
}

if (!cosmosObject.TryGetValue(RangeName, out CosmosObject rawRange))
if (!cosmosObject.TryGetValue(PropertyNames.Range, out CosmosObject rawRange))
{
return TryCatch<CompositeContinuationToken>.FromException(
new MalformedContinuationTokenException($"{nameof(CompositeContinuationToken)} is missing field: '{RangeName}': {cosmosElement}"));
new MalformedContinuationTokenException($"{nameof(CompositeContinuationToken)} is missing field: '{PropertyNames.Range}': {cosmosElement}"));
}

if (!rawRange.TryGetValue(MinName, out CosmosString rawMin))
if (!rawRange.TryGetValue(PropertyNames.Min, out CosmosString rawMin))
{
return TryCatch<CompositeContinuationToken>.FromException(
new MalformedContinuationTokenException($"{nameof(CompositeContinuationToken)} is missing field: '{MinName}': {cosmosElement}"));
new MalformedContinuationTokenException($"{nameof(CompositeContinuationToken)} is missing field: '{PropertyNames.Min}': {cosmosElement}"));
}

string min = rawMin.Value;

if (!rawRange.TryGetValue(MaxName, out CosmosString rawMax))
if (!rawRange.TryGetValue(PropertyNames.Max, out CosmosString rawMax))
{
return TryCatch<CompositeContinuationToken>.FromException(
new MalformedContinuationTokenException($"{nameof(CompositeContinuationToken)} is missing field: '{MaxName}': {cosmosElement}"));
new MalformedContinuationTokenException($"{nameof(CompositeContinuationToken)} is missing field: '{PropertyNames.Max}': {cosmosElement}"));
}

string max = rawMax.Value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.CosmosElements.Numbers;
using Microsoft.Azure.Cosmos.Query.Core.Exceptions;
Expand Down Expand Up @@ -57,11 +55,14 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens
/// </example>
internal sealed class OrderByContinuationToken
{
private const string CompositeTokenName = "compositeToken";
private const string OrderByItemsName = "orderByItems";
private const string RidName = "rid";
private const string SkipCountName = "skipCount";
private const string FilterName = "filter";
private static class PropertyNames
{
public const string CompositeToken = "compositeToken";
public const string OrderByItems = "orderByItems";
public const string Rid = "rid";
public const string SkipCount = "skipCount";
public const string Filter = "filter";
}

/// <summary>
/// Initializes a new instance of the OrderByContinuationToken struct.
Expand Down Expand Up @@ -109,7 +110,7 @@ public OrderByContinuationToken(
/// {"compositeToken":{"token":"+RID:OpY0AN-mFAACAAAAAAAABA==#RT:1#TRC:1#RTD:qdTAEA==","range":{"min":"05C1D9CD673398","max":"05C1E399CD6732"}}
/// ]]>
/// </example>
[JsonProperty(CompositeTokenName)]
[JsonProperty(PropertyNames.CompositeToken)]
public CompositeContinuationToken CompositeContinuationToken
{
get;
Expand All @@ -129,7 +130,7 @@ public CompositeContinuationToken CompositeContinuationToken
/// <remarks>
/// Right now, we don't support orderBy by multiple fields, so orderByItems is an array of one element.
/// </remarks>>
[JsonProperty("orderByItems")]
[JsonProperty(PropertyNames.OrderByItems)]
public IReadOnlyList<OrderByItem> OrderByItems
{
get;
Expand All @@ -150,7 +151,7 @@ public IReadOnlyList<OrderByItem> OrderByItems
/// "rid":"OpY0AN-mFAACAAAAAAAABA=="
/// ]]>
/// </example>
[JsonProperty(RidName)]
[JsonProperty(PropertyNames.Rid)]
public string Rid
{
get;
Expand All @@ -176,7 +177,7 @@ public string Rid
/// </para>
/// The skip count keeps track of that information.
/// </summary>
[JsonProperty(SkipCountName)]
[JsonProperty(PropertyNames.SkipCount)]
public int SkipCount
{
get;
Expand All @@ -199,7 +200,7 @@ public int SkipCount
/// ]]>
/// </para>
/// </example>
[JsonProperty(FilterName)]
[JsonProperty(PropertyNames.Filter)]
public string Filter
{
get;
Expand All @@ -221,11 +222,11 @@ public static CosmosElement ToCosmosElement(OrderByContinuationToken orderByCont
CosmosObject cosmosObject = CosmosObject.Create(
new Dictionary<string, CosmosElement>()
{
{ CompositeTokenName, compositeContinuationToken },
{ OrderByItemsName, orderByItems },
{ RidName, CosmosString.Create(orderByContinuationToken.Rid) },
{ SkipCountName, CosmosNumber64.Create(orderByContinuationToken.SkipCount) },
{ FilterName, filter },
{ PropertyNames.CompositeToken, compositeContinuationToken },
{ PropertyNames.OrderByItems, orderByItems },
{ PropertyNames.Rid, CosmosString.Create(orderByContinuationToken.Rid) },
{ PropertyNames.SkipCount, CosmosNumber64.Create(orderByContinuationToken.SkipCount) },
{ PropertyNames.Filter, filter },
});

return cosmosObject;
Expand All @@ -239,10 +240,10 @@ public static TryCatch<OrderByContinuationToken> TryCreateFromCosmosElement(Cosm
new MalformedContinuationTokenException($"{nameof(OrderByContinuationToken)} is not an object: {cosmosElement}"));
}

if (!cosmosObject.TryGetValue(CompositeTokenName, out CosmosElement compositeContinuationTokenElement))
if (!cosmosObject.TryGetValue(PropertyNames.CompositeToken, out CosmosElement compositeContinuationTokenElement))
{
return TryCatch<OrderByContinuationToken>.FromException(
new MalformedContinuationTokenException($"{nameof(OrderByContinuationToken)} is missing field: '{CompositeTokenName}': {cosmosElement}"));
new MalformedContinuationTokenException($"{nameof(OrderByContinuationToken)} is missing field: '{PropertyNames.CompositeToken}': {cosmosElement}"));
}

TryCatch<CompositeContinuationToken> tryCompositeContinuation = CompositeContinuationToken.TryCreateFromCosmosElement(compositeContinuationTokenElement);
Expand All @@ -253,34 +254,34 @@ public static TryCatch<OrderByContinuationToken> TryCreateFromCosmosElement(Cosm

CompositeContinuationToken compositeContinuationToken = tryCompositeContinuation.Result;

if (!cosmosObject.TryGetValue(OrderByItemsName, out CosmosArray orderByItemsRaw))
if (!cosmosObject.TryGetValue(PropertyNames.OrderByItems, out CosmosArray orderByItemsRaw))
{
return TryCatch<OrderByContinuationToken>.FromException(
new MalformedContinuationTokenException($"{nameof(OrderByContinuationToken)} is missing field: '{OrderByItemsName}': {cosmosElement}"));
new MalformedContinuationTokenException($"{nameof(OrderByContinuationToken)} is missing field: '{PropertyNames.OrderByItems}': {cosmosElement}"));
}

List<OrderByItem> orderByItems = orderByItemsRaw.Select(x => OrderByItem.FromCosmosElement(x)).ToList();

if (!cosmosObject.TryGetValue(RidName, out CosmosString ridRaw))
if (!cosmosObject.TryGetValue(PropertyNames.Rid, out CosmosString ridRaw))
{
return TryCatch<OrderByContinuationToken>.FromException(
new MalformedContinuationTokenException($"{nameof(OrderByContinuationToken)} is missing field: '{RidName}': {cosmosElement}"));
new MalformedContinuationTokenException($"{nameof(OrderByContinuationToken)} is missing field: '{PropertyNames.Rid}': {cosmosElement}"));
}

string rid = ridRaw.Value;

if (!cosmosObject.TryGetValue(SkipCountName, out CosmosNumber64 skipCountRaw))
if (!cosmosObject.TryGetValue(PropertyNames.SkipCount, out CosmosNumber64 skipCountRaw))
{
return TryCatch<OrderByContinuationToken>.FromException(
new MalformedContinuationTokenException($"{nameof(OrderByContinuationToken)} is missing field: '{SkipCountName}': {cosmosElement}"));
new MalformedContinuationTokenException($"{nameof(OrderByContinuationToken)} is missing field: '{PropertyNames.SkipCount}': {cosmosElement}"));
}

int skipCount = (int)Number64.ToLong(skipCountRaw.GetValue());

if (!cosmosObject.TryGetValue(FilterName, out CosmosElement filterRaw))
if (!cosmosObject.TryGetValue(PropertyNames.Filter, out CosmosElement filterRaw))
{
return TryCatch<OrderByContinuationToken>.FromException(
new MalformedContinuationTokenException($"{nameof(OrderByContinuationToken)} is missing field: '{FilterName}': {cosmosElement}"));
new MalformedContinuationTokenException($"{nameof(OrderByContinuationToken)} is missing field: '{PropertyNames.Filter}': {cosmosElement}"));
}

string filter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private ClientAggregateDocumentQueryExecutionComponent(
}

public static async Task<TryCatch<IDocumentQueryExecutionComponent>> TryCreateAsync(
AggregateOperator[] aggregates,
IReadOnlyList<AggregateOperator> aggregates,
IReadOnlyDictionary<string, AggregateOperator?> aliasToAggregateType,
IReadOnlyList<string> orderedAliases,
bool hasSelectValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent.Aggregate
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens;
using Microsoft.Azure.Cosmos.Query.Core.Exceptions;
using Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent.Aggregate.Aggregators;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
Expand All @@ -32,7 +30,7 @@ private ComputeAggregateDocumentQueryExecutionComponent(
}

public static async Task<TryCatch<IDocumentQueryExecutionComponent>> TryCreateAsync(
AggregateOperator[] aggregates,
IReadOnlyList<AggregateOperator> aggregates,
IReadOnlyDictionary<string, AggregateOperator?> aliasToAggregateType,
IReadOnlyList<string> orderedAliases,
bool hasSelectValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected AggregateDocumentQueryExecutionComponent(

public static async Task<TryCatch<IDocumentQueryExecutionComponent>> TryCreateAsync(
ExecutionEnvironment executionEnvironment,
AggregateOperator[] aggregates,
IReadOnlyList<AggregateOperator> aggregates,
IReadOnlyDictionary<string, AggregateOperator?> aliasToAggregateType,
IReadOnlyList<string> orderedAliases,
bool hasSelectValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,6 @@ public CosmosElement GetResult()
return this.globalAverage.GetAverage();
}

public string GetContinuationToken()
{
return this.globalAverage.ToString();
}

public void SerializeState(IJsonWriter jsonWriter)
{
if (jsonWriter == null)
{
throw new ArgumentNullException(nameof(jsonWriter));
}

this.globalAverage.SerializeState(jsonWriter);
}

public static TryCatch<IAggregator> TryCreate(CosmosElement continuationToken)
{
AverageInfo averageInfo;
Expand Down Expand Up @@ -220,28 +205,7 @@ public CosmosNumber GetAverage()
return null;
}

return CosmosNumber64.Create(this.Sum.Value / this.Count);
}

public void SerializeState(IJsonWriter jsonWriter)
{
if (jsonWriter == null)
{
throw new ArgumentNullException(nameof(jsonWriter));
}

jsonWriter.WriteObjectStart();

if (this.Sum.HasValue)
{
jsonWriter.WriteFieldName(AverageInfo.SumName);
jsonWriter.WriteFloat64Value(this.Sum.Value);
}

jsonWriter.WriteFieldName(AverageInfo.CountName);
jsonWriter.WriteInt64Value(this.Count);

jsonWriter.WriteObjectEnd();
return CosmosNumber64.Create(this.Sum.Value / (double)this.Count);
}

public override string ToString()
Expand Down
Loading