Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cross Partition Execution Context Refactor #1260

Merged
merged 39 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
a001801
realized that this will be a lot of work
bchong95 Jan 8, 2020
f70d18a
merged
bchong95 Jan 15, 2020
f8359f2
wired up continuation token on the write path
bchong95 Jan 15, 2020
ab6bcc1
wiring continuation new continuation token in the read path
bchong95 Jan 17, 2020
a5fa91d
updated distinct and group by
bchong95 Jan 18, 2020
b50ae6d
drafted out code
bchong95 Jan 21, 2020
02e95ca
merged
bchong95 Jan 22, 2020
689fe8f
merged
bchong95 Jan 25, 2020
81418d0
wired in serialize state
bchong95 Jan 25, 2020
658d3f7
merged
bchong95 Jan 29, 2020
f4e689e
fixed test cases
bchong95 Jan 29, 2020
2ced726
merged
bchong95 Jan 29, 2020
3e470e0
I give up
bchong95 Jan 29, 2020
d5ff581
can't do it
bchong95 Jan 30, 2020
7ebd158
added min max continuation token
bchong95 Feb 13, 2020
230193c
merged
bchong95 Feb 13, 2020
20000cb
removed TryGetContinuationToken
bchong95 Feb 13, 2020
e76d053
fixed random bugs
bchong95 Feb 14, 2020
d9455b1
fixed more bugs
bchong95 Feb 14, 2020
ca3d0cf
about to gut out RequestContinuationToken and just wire through Cosmo…
bchong95 Feb 27, 2020
fb031f3
made input a cosmos element
bchong95 Feb 28, 2020
8d8279f
returning continuation token instead
bchong95 Feb 28, 2020
1e31ac2
merged
bchong95 Feb 28, 2020
4fe4933
updated tests
bchong95 Feb 28, 2020
abec934
resolved iteration comments
bchong95 Mar 2, 2020
144364f
broke up parallel execution context into different files based on stages
bchong95 Mar 2, 2020
26e2fc5
refactored parrallel resume code
bchong95 Mar 3, 2020
3f65c71
broke order by into separate files
bchong95 Mar 3, 2020
f0d2874
refactored code
bchong95 Mar 4, 2020
c52f60d
fixed bugs
bchong95 Mar 4, 2020
a3b2d5f
merged
bchong95 Mar 5, 2020
c894ddc
resolved iteration comments
bchong95 Mar 6, 2020
6494d65
resolved iteration comments
bchong95 Mar 9, 2020
69d0985
Update Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy…
bchong95 Mar 11, 2020
78339f0
added tests and fixed off by one error
bchong95 Mar 12, 2020
6fcac1a
Merge branch 'master' into users/brchon/Query/ExecutionContextFactory…
bchong95 Mar 12, 2020
a1faaa4
Merge branch 'users/brchon/Query/ExecutionContextFactoryRefactor' of …
bchong95 Mar 12, 2020
2737293
fixed typo
bchong95 Mar 12, 2020
8c85cae
merged
bchong95 Mar 12, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
merged
  • Loading branch information
bchong95 committed Mar 12, 2020
commit 8c85cae976947180cc0d93b0af009a4d7bb3ad5a
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private async Task<PartitionKeyRangeBatchExecutionResult> ExecuteAsync(
PartitionKeyRangeServerBatchRequest serverRequest,
CancellationToken cancellationToken)
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create();
CosmosDiagnosticsContext diagnosticsContext = new CosmosDiagnosticsContextCore();
CosmosDiagnosticScope limiterScope = diagnosticsContext.CreateScope("BatchAsyncContainerExecutor.Limiter");
SemaphoreSlim limiter = this.GetOrAddLimiterForPartitionKeyRange(serverRequest.PartitionKeyRangeId);
using (await limiter.UsingWaitAsync(cancellationToken))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ internal ResponseMessage ToResponseMessage()
requestMessage: null,
headers: headers,
cosmosException: null,
diagnostics: this.DiagnosticsContext ?? CosmosDiagnosticsContext.Create())
diagnostics: this.DiagnosticsContext ?? new CosmosDiagnosticsContextCore())
{
Content = this.ResourceStream
};
Expand Down
16 changes: 13 additions & 3 deletions Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public ClientRetryPolicy(
this.enableEndpointDiscovery = enableEndpointDiscovery;
this.sessionTokenRetryCount = 0;
this.canUseMultipleWriteLocations = false;

this.sharedStatistics = new CosmosClientSideRequestStatistics();
}

/// <summary>
Expand Down Expand Up @@ -126,7 +124,19 @@ public void OnBeforeSendRequest(DocumentServiceRequest request)
this.isReadRequest = request.IsReadOnlyRequest;
this.canUseMultipleWriteLocations = this.globalEndpointManager.CanUseMultipleWriteLocations(request);

request.RequestContext.ClientRequestStatistics = this.sharedStatistics;
if (request.RequestContext.ClientRequestStatistics == null)
{
if (this.sharedStatistics == null)
{
this.sharedStatistics = new CosmosClientSideRequestStatistics();
}

request.RequestContext.ClientRequestStatistics = this.sharedStatistics;
}
else
{
this.sharedStatistics = request.RequestContext.ClientRequestStatistics;
}

// clear previous location-based routing directive
request.RequestContext.ClearRouteToLocation();
Expand Down
24 changes: 17 additions & 7 deletions Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -493,21 +493,31 @@ public virtual Task<DatabaseResponse> CreateDatabaseIfNotExistsAsync(
// Doing a Read before Create will give us better latency for existing databases
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
Database database = this.GetDatabase(id);
ResponseMessage response = await database.ReadStreamAsync(requestOptions: requestOptions, cancellationToken: cancellationToken);
if (response.StatusCode != HttpStatusCode.NotFound)
ResponseMessage readResponse = await database.ReadStreamAsync(
requestOptions: requestOptions,
cancellationToken: cancellationToken);

if (readResponse.StatusCode != HttpStatusCode.NotFound)
{
return await this.ClientContext.ResponseFactory.CreateDatabaseResponseAsync(database, Task.FromResult(response));
return await this.ClientContext.ResponseFactory.CreateDatabaseResponseAsync(database, Task.FromResult(readResponse));
}

response = await this.CreateDatabaseStreamAsync(databaseProperties, throughput, requestOptions, cancellationToken);
if (response.StatusCode != HttpStatusCode.Conflict)
ResponseMessage createResponse = await this.CreateDatabaseStreamAsync(databaseProperties, throughput, requestOptions, cancellationToken);

// Merge the diagnostics with the first read request.
createResponse.DiagnosticsContext.AddDiagnosticsInternal(readResponse.DiagnosticsContext);
if (createResponse.StatusCode != HttpStatusCode.Conflict)
{
return await this.ClientContext.ResponseFactory.CreateDatabaseResponseAsync(this.GetDatabase(databaseProperties.Id), Task.FromResult(response));
return await this.ClientContext.ResponseFactory.CreateDatabaseResponseAsync(this.GetDatabase(databaseProperties.Id), Task.FromResult(createResponse));
}

// This second Read is to handle the race condition when 2 or more threads have Read the database and only one succeeds with Create
// so for the remaining ones we should do a Read instead of throwing Conflict exception
return await database.ReadAsync(cancellationToken: cancellationToken);
ResponseMessage readResponseAfterConflict = await database.ReadStreamAsync(
requestOptions: requestOptions,
cancellationToken: cancellationToken);
readResponseAfterConflict.DiagnosticsContext.AddDiagnosticsInternal(readResponse.DiagnosticsContext);
return await this.ClientContext.ResponseFactory.CreateDatabaseResponseAsync(this.GetDatabase(databaseProperties.Id), Task.FromResult(readResponseAfterConflict));
});
}

Expand Down
11 changes: 11 additions & 0 deletions Microsoft.Azure.Cosmos/src/CosmosElements/CosmosElement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,17 @@ public static CosmosElement Parse(string json)

return cosmosElement;
}

public static TCosmosElement Parse<TCosmosElement>(string json)
where TCosmosElement : CosmosElement
{
if (!CosmosElement.TryParse(json, out TCosmosElement cosmosElement))
{
throw new ArgumentException($"Failed to parse json: {json}.");
}

return cosmosElement;
}
}
#if INTERNAL
#pragma warning restore SA1600 // Elements should be documented
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Microsoft.Azure.Cosmos.CosmosElements
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Azure.Cosmos.Json;
Expand All @@ -21,7 +22,7 @@ private class LazyCosmosObject : CosmosObject
{
private readonly IJsonNavigator jsonNavigator;
private readonly IJsonNavigatorNode jsonNavigatorNode;
private readonly Dictionary<string, CosmosElement> cachedElements;
private readonly ConcurrentDictionary<string, CosmosElement> cachedElements;
private readonly Lazy<int> lazyCount;

public LazyCosmosObject(IJsonNavigator jsonNavigator, IJsonNavigatorNode jsonNavigatorNode)
Expand All @@ -44,7 +45,7 @@ public LazyCosmosObject(IJsonNavigator jsonNavigator, IJsonNavigatorNode jsonNav

this.jsonNavigator = jsonNavigator;
this.jsonNavigatorNode = jsonNavigatorNode;
this.cachedElements = new Dictionary<string, CosmosElement>();
this.cachedElements = new ConcurrentDictionary<string, CosmosElement>();
this.lazyCount = new Lazy<int>(() => this.jsonNavigator.GetObjectPropertyCount(this.jsonNavigatorNode));
}

Expand Down Expand Up @@ -92,31 +93,27 @@ public override IEnumerator<KeyValuePair<string, CosmosElement>> GetEnumerator()

public override bool TryGetValue(string key, out CosmosElement value)
{
value = default;
bool gotValue;
if (this.cachedElements.TryGetValue(
key,
out CosmosElement cosmosElemet))
{
value = cosmosElemet;
gotValue = true;
return true;
}
else if (this.jsonNavigator.TryGetObjectProperty(

if (this.jsonNavigator.TryGetObjectProperty(
this.jsonNavigatorNode,
key,
out ObjectProperty objectProperty))
{
value = CosmosElement.Dispatch(this.jsonNavigator, objectProperty.ValueNode);
gotValue = true;
this.cachedElements[key] = value;
}
else
{
value = null;
gotValue = false;

return true;
}

return gotValue;
value = default;
return false;
}

public override void WriteTo(IJsonWriter jsonWriter)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Diagnostics
{
using System;

internal sealed class AddressResolutionStatistics : CosmosDiagnosticsInternal
{
public AddressResolutionStatistics(
DateTime startTime,
DateTime endTime,
string targetEndpoint)
{
this.StartTime = startTime;
this.EndTime = endTime;
this.TargetEndpoint = targetEndpoint ?? throw new ArgumentNullException(nameof(startTime));
}

public DateTime StartTime { get; }
public DateTime? EndTime { get; set; }
public string TargetEndpoint { get; }

public override void Accept(CosmosDiagnosticsInternalVisitor visitor)
{
visitor.Visit(this);
}

public override TResult Accept<TResult>(CosmosDiagnosticsInternalVisitor<TResult> visitor)
{
return visitor.Visit(this);
}
}
}
20 changes: 18 additions & 2 deletions Microsoft.Azure.Cosmos/src/Diagnostics/BackendMetricsExtractor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace Microsoft.Azure.Cosmos.Diagnostics
internal sealed class BackendMetricsExtractor : CosmosDiagnosticsInternalVisitor<(ParseFailureReason, BackendMetrics)>
{
public static readonly BackendMetricsExtractor Singleton = new BackendMetricsExtractor();
private static readonly (ParseFailureReason, BackendMetrics) MetricsNotFound = (ParseFailureReason.MetricsNotFound, default);

private BackendMetricsExtractor()
{
Expand All @@ -21,7 +22,7 @@ private BackendMetricsExtractor()

public override (ParseFailureReason, BackendMetrics) Visit(PointOperationStatistics pointOperationStatistics)
{
return (ParseFailureReason.MetricsNotFound, default);
return BackendMetricsExtractor.MetricsNotFound;
}

public override (ParseFailureReason, BackendMetrics) Visit(CosmosDiagnosticsContext cosmosDiagnosticsContext)
Expand Down Expand Up @@ -61,7 +62,7 @@ public override (ParseFailureReason, BackendMetrics) Visit(CosmosDiagnosticsCont

public override (ParseFailureReason, BackendMetrics) Visit(CosmosDiagnosticScope cosmosDiagnosticScope)
{
return (ParseFailureReason.MetricsNotFound, default);
return BackendMetricsExtractor.MetricsNotFound;
}

public override (ParseFailureReason, BackendMetrics) Visit(QueryPageDiagnostics queryPageDiagnostics)
Expand All @@ -74,6 +75,21 @@ public override (ParseFailureReason, BackendMetrics) Visit(QueryPageDiagnostics
return (ParseFailureReason.None, backendMetrics);
}

public override (ParseFailureReason, BackendMetrics) Visit(AddressResolutionStatistics addressResolutionStatistics)
{
return BackendMetricsExtractor.MetricsNotFound;
}

public override (ParseFailureReason, BackendMetrics) Visit(StoreResponseStatistics storeResponseStatistics)
{
return BackendMetricsExtractor.MetricsNotFound;
}

public override (ParseFailureReason, BackendMetrics) Visit(CosmosClientSideRequestStatistics clientSideRequestStatistics)
{
return BackendMetricsExtractor.MetricsNotFound;
}

public enum ParseFailureReason
{
None,
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.