Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal QueryPartitionProvider: Adds Singleton QueryPartitionProvider #1842

Merged
merged 10 commits into from
Sep 12, 2020
19 changes: 14 additions & 5 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Query;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
Expand Down Expand Up @@ -166,6 +167,7 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider

private readonly bool hasAuthKeyResourceToken;
private readonly string authKeyResourceToken = string.Empty;
private AsyncLazy<QueryPartitionProvider> queryPartitionProvider;

private DocumentClientEventSource eventSource;
internal Task initializeTask;
Expand Down Expand Up @@ -797,6 +799,12 @@ internal virtual void Initialize(Uri serviceEndpoint,

DefaultTrace.InitEventListener();

this.queryPartitionProvider = new AsyncLazy<QueryPartitionProvider>(async () =>
{
await this.EnsureValidClientAsync();
return new QueryPartitionProvider(this.accountServiceConfiguration.QueryEngineConfiguration);
}, CancellationToken.None);

#if !(NETSTANDARD15 || NETSTANDARD16)
#if NETSTANDARD20
// GetEntryAssembly returns null when loaded from native netstandard2.0
Expand Down Expand Up @@ -1401,6 +1409,11 @@ public void Dispose()
this.GlobalEndpointManager = null;
}

if (this.queryPartitionProvider.IsValueCreated)
{
this.queryPartitionProvider.Value.Dispose();
}

DefaultTrace.TraceInformation("DocumentClient with id {0} disposed.", this.traceId);
DefaultTrace.Flush();

Expand Down Expand Up @@ -1446,11 +1459,7 @@ public void Dispose()
/// </remarks>
internal Action<IQueryable> OnExecuteScalarQueryCallback { get; set; }

internal virtual async Task<IDictionary<string, object>> GetQueryEngineConfigurationAsync()
{
await this.EnsureValidClientAsync();
return this.accountServiceConfiguration.QueryEngineConfiguration;
}
internal virtual Task<QueryPartitionProvider> QueryPartitionProvider => this.queryPartitionProvider.Value;

internal virtual async Task<ConsistencyLevel> GetDefaultConsistencyLevelAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,21 +166,33 @@ internal TryCatch<PartitionedQueryExecutionInfoInternal> TryGetPartitionedQueryE
string queryText = JsonConvert.SerializeObject(querySpec);

List<string> paths = new List<string>(partitionKeyDefinition.Paths);
List<IReadOnlyList<string>> pathPartsList = new List<IReadOnlyList<string>>(paths.Count);
uint[] partsLengths = new uint[paths.Count];
DmitriMelnikov marked this conversation as resolved.
Show resolved Hide resolved
int allPartsLength = 0;

List<string[]> pathParts = new List<string[]>();
paths.ForEach(path =>
{
pathParts.Add(PathParser.GetPathParts(path));
});
for (int i = 0; i < paths.Count; i++)
{
IReadOnlyList<string> pathParts = PathParser.GetPathParts(paths[i]);
partsLengths[i] = (uint)pathParts.Count;
pathPartsList.Add(pathParts);
allPartsLength += pathParts.Count;
}

string[] allParts = pathParts.SelectMany(parts => parts).ToArray();
uint[] partsLengths = pathParts.Select(parts => (uint)parts.Length).ToArray();
string[] allParts = new string[allPartsLength];
int allPartsIndex = 0;
foreach (IReadOnlyList<string> pathParts in pathPartsList)
{
foreach (string part in pathParts)
{
allParts[allPartsIndex++] = part;
}
}

PartitionKind partitionKind = partitionKeyDefinition.Kind;

this.Initialize();

byte[] buffer = new byte[InitialBufferSize];
Span<byte> buffer = stackalloc byte[QueryPartitionProvider.InitialBufferSize];
uint errorCode;
uint serializedQueryExecutionInfoResultLength;

Expand All @@ -205,7 +217,11 @@ internal TryCatch<PartitionedQueryExecutionInfoInternal> TryGetPartitionedQueryE

if (errorCode == DISP_E_BUFFERTOOSMALL)
{
buffer = new byte[serializedQueryExecutionInfoResultLength];
// Allocate on stack for smaller arrays, otherwise use heap.
buffer = serializedQueryExecutionInfoResultLength < 4096
? stackalloc byte[(int)serializedQueryExecutionInfoResultLength]
: new byte[serializedQueryExecutionInfoResultLength];

fixed (byte* bytePtr2 = buffer)
{
errorCode = ServiceInteropWrapper.GetPartitionKeyRangesFromQuery(
Expand All @@ -227,7 +243,7 @@ internal TryCatch<PartitionedQueryExecutionInfoInternal> TryGetPartitionedQueryE
}
}

string serializedQueryExecutionInfo = Encoding.UTF8.GetString(buffer, 0, (int)serializedQueryExecutionInfoResultLength);
string serializedQueryExecutionInfo = Encoding.UTF8.GetString(buffer.Slice(0, (int)serializedQueryExecutionInfoResultLength));
DmitriMelnikov marked this conversation as resolved.
Show resolved Hide resolved

Exception exception = Marshal.GetExceptionForHR((int)errorCode);
if (exception != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,24 +100,22 @@ public async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryPlanAsync(
return TryCatch<(PartitionedQueryExecutionInfo, bool)>.FromResult((tryGetQueryInfo.Result, neededQueryFeatures == QueryFeatures.None));
}

private async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryInfoAsync(
private Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryInfoAsync(
SqlQuerySpec sqlQuerySpec,
PartitionKeyDefinition partitionKeyDefinition,
bool hasLogicalPartitionKey,
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

TryCatch<PartitionedQueryExecutionInfo> tryGetPartitoinedQueryExecutionInfo = await this.queryClient.TryGetPartitionedQueryExecutionInfoAsync(
return this.queryClient.TryGetPartitionedQueryExecutionInfoAsync(
sqlQuerySpec: sqlQuerySpec,
partitionKeyDefinition: partitionKeyDefinition,
requireFormattableOrderByQuery: true,
isContinuationExpected: false,
allowNonValueAggregateQuery: true,
hasLogicalPartitionKey: hasLogicalPartitionKey,
cancellationToken: cancellationToken);

return tryGetPartitoinedQueryExecutionInfo;
}

private static class QueryPlanSupportChecker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private async Task<Tuple<DocumentFeedResponse<CosmosElement>, string>> ExecuteOn
// Get the routing map provider
CollectionCache collectionCache = await this.Client.GetCollectionCacheAsync();
ContainerProperties collection = await collectionCache.ResolveCollectionAsync(request, CancellationToken.None);
QueryPartitionProvider queryPartitionProvider = await this.Client.GetQueryPartitionProviderAsync(cancellationToken);
QueryPartitionProvider queryPartitionProvider = await this.Client.GetQueryPartitionProviderAsync();
IRoutingMapProvider routingMapProvider = await this.Client.GetRoutingMapProviderAsync();

// Figure out what partition you are going to based on the range from the continuation token
Expand Down Expand Up @@ -306,7 +306,6 @@ private static bool ServiceInteropAvailable()
}

providedRanges = PartitionRoutingHelper.GetProvidedPartitionKeyRanges(
(errorMessage) => new BadRequestException(errorMessage),
this.QuerySpec,
enableCrossPartitionQuery,
false,
Expand Down
24 changes: 2 additions & 22 deletions Microsoft.Azure.Cosmos/src/Query/v2Query/DocumentQueryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ namespace Microsoft.Azure.Cosmos.Query
internal sealed class DocumentQueryClient : IDocumentQueryClient
{
private readonly DocumentClient innerClient;
private readonly SemaphoreSlim semaphore;
private QueryPartitionProvider queryPartitionProvider;

public DocumentQueryClient(DocumentClient innerClient)
{
Expand All @@ -28,16 +26,11 @@ public DocumentQueryClient(DocumentClient innerClient)
}

this.innerClient = innerClient;
this.semaphore = new SemaphoreSlim(1, 1);
}

public void Dispose()
{
this.innerClient.Dispose();
if (this.queryPartitionProvider != null)
{
this.queryPartitionProvider.Dispose();
}
}

QueryCompatibilityMode IDocumentQueryClient.QueryCompatibilityMode
Expand Down Expand Up @@ -92,22 +85,9 @@ async Task<IRoutingMapProvider> IDocumentQueryClient.GetRoutingMapProviderAsync(
return await this.innerClient.GetPartitionKeyRangeCacheAsync();
}

public async Task<QueryPartitionProvider> GetQueryPartitionProviderAsync(CancellationToken cancellationToken)
public Task<QueryPartitionProvider> GetQueryPartitionProviderAsync()
{
if (this.queryPartitionProvider == null)
{
await this.semaphore.WaitAsync(cancellationToken);

if (this.queryPartitionProvider == null)
{
cancellationToken.ThrowIfCancellationRequested();
this.queryPartitionProvider = new QueryPartitionProvider(await this.innerClient.GetQueryEngineConfigurationAsync());
}

this.semaphore.Release();
}

return this.queryPartitionProvider;
return this.innerClient.QueryPartitionProvider;
}

public Task<DocumentServiceResponse> ExecuteQueryAsync(DocumentServiceRequest request, IDocumentClientRetryPolicy retryPolicyInstance, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public async Task<PartitionedQueryExecutionInfo> GetPartitionedQueryExecutionInf
cancellationToken.ThrowIfCancellationRequested();
// $ISSUE-felixfan-2016-07-13: We should probably get PartitionedQueryExecutionInfo from Gateway in GatewayMode

QueryPartitionProvider queryPartitionProvider = await this.Client.GetQueryPartitionProviderAsync(cancellationToken);
QueryPartitionProvider queryPartitionProvider = await this.Client.GetQueryPartitionProviderAsync();
TryCatch<PartitionedQueryExecutionInfo> tryGetPartitionedQueryExecutionInfo = queryPartitionProvider.TryGetPartitionedQueryExecutionInfo(
this.QuerySpec,
partitionKeyDefinition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal interface IDocumentQueryClient : IDisposable

Task<IRoutingMapProvider> GetRoutingMapProviderAsync();

Task<QueryPartitionProvider> GetQueryPartitionProviderAsync(CancellationToken cancellationToken);
Task<QueryPartitionProvider> GetQueryPartitionProviderAsync();

Task<DocumentServiceResponse> ExecuteQueryAsync(DocumentServiceRequest request, IDocumentClientRetryPolicy retryPolicyInstance, CancellationToken cancellationToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ internal class CosmosQueryClientCore : CosmosQueryClient
private readonly ContainerInternal cosmosContainerCore;
private readonly DocumentClient documentClient;
private readonly SemaphoreSlim semaphore;
private QueryPartitionProvider queryPartitionProvider;

public CosmosQueryClientCore(
CosmosClientContext clientContext,
Expand Down Expand Up @@ -88,26 +87,7 @@ public override async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetPartit
bool hasLogicalPartitionKey,
CancellationToken cancellationToken)
{
if (this.queryPartitionProvider == null)
{
try
{
await this.semaphore.WaitAsync(cancellationToken);

if (this.queryPartitionProvider == null)
{
cancellationToken.ThrowIfCancellationRequested();
IDictionary<string, object> queryConfiguration = await this.documentClient.GetQueryEngineConfigurationAsync();
this.queryPartitionProvider = new QueryPartitionProvider(queryConfiguration);
}
}
finally
{
this.semaphore.Release();
}
}

return this.queryPartitionProvider.TryGetPartitionedQueryExecutionInfo(
return (await this.documentClient.QueryPartitionProvider).TryGetPartitionedQueryExecutionInfo(
sqlQuerySpec,
partitionKeyDefinition,
requireFormattableOrderByQuery,
Expand Down
11 changes: 6 additions & 5 deletions Microsoft.Azure.Cosmos/src/Routing/DocumentAnalyzer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos.Routing
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using Microsoft.Azure.Documents;
Expand Down Expand Up @@ -39,10 +40,10 @@ public static PartitionKeyInternal ExtractPartitionKeyValue(Document document, P
return PartitionKeyInternal.FromObjectArray(
partitionKeyDefinition.Paths.Select(path =>
{
string[] parts = PathParser.GetPathParts(path);
Debug.Assert(parts.Length >= 1, "Partition key component definition path is invalid.");
IReadOnlyList<string> parts = PathParser.GetPathParts(path);
Debug.Assert(parts.Count >= 1, "Partition key component definition path is invalid.");

return document.GetValueByPath<object>(parts, Undefined.Value);
return document.GetValueByPath<object>(parts.ToArray(), Undefined.Value);
}).ToArray(),
false);
}
Expand Down Expand Up @@ -73,8 +74,8 @@ internal static PartitionKeyInternal ExtractPartitionKeyValue<T>(T data, Partiti
return PartitionKeyInternal.FromObjectArray(
partitionKeyDefinition.Paths.Select(path =>
{
string[] parts = PathParser.GetPathParts(path);
Debug.Assert(parts.Length >= 1, "Partition key component definition path is invalid.");
IReadOnlyList<string> parts = PathParser.GetPathParts(path);
Debug.Assert(parts.Count >= 1, "Partition key component definition path is invalid.");

JToken token = convertToJToken(data);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ namespace Microsoft.Azure.Cosmos.Routing
internal class PartitionRoutingHelper
{
public static IReadOnlyList<Range<string>> GetProvidedPartitionKeyRanges(
Func<string, Exception> createBadRequestException,
SqlQuerySpec querySpec,
bool enableCrossPartitionQuery,
bool parallelizeCrossPartitionQuery,
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/Routing/PathParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal sealed class PathParser
/// </remarks>
/// <param name="path">A path string</param>
/// <returns>An array of parts of path</returns>
public static string[] GetPathParts(string path)
public static IReadOnlyList<string> GetPathParts(string path)
{
List<string> tokens = new List<string>();
int currentIndex = 0;
Expand All @@ -45,7 +45,7 @@ public static string[] GetPathParts(string path)
}
}

return tokens.ToArray();
return tokens;
}

private static string GetEscapedToken(string path, ref int currentIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Internal;
using Microsoft.Azure.Cosmos.Linq;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Utils;
using Microsoft.Azure.Documents;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand Down Expand Up @@ -79,6 +81,21 @@ public void RetryExceedingMaxTimeLimit()
Assert.IsTrue(throttled);
}

[TestMethod]
public async Task QueryPartitionProviderSingletonTestAsync()
{
DocumentClient client = new DocumentClient(
new Uri(ConfigurationManager.AppSettings["GatewayEndpoint"]),
ConfigurationManager.AppSettings["MasterKey"],
(HttpMessageHandler)null,
new ConnectionPolicy());

Task<QueryPartitionProvider> queryPartitionProviderTaskOne = client.QueryPartitionProvider;
Task<QueryPartitionProvider> queryPartitionProviderTaskTwo = client.QueryPartitionProvider;
Assert.AreSame(queryPartitionProviderTaskOne, queryPartitionProviderTaskTwo, "QueryPartitionProvider property is not a singleton");
Assert.AreSame(await queryPartitionProviderTaskOne, await queryPartitionProviderTaskTwo, "QueryPartitionProvider property is not a singleton");
}

private void TestRetryOnThrottled(int? numberOfRetries)
{
Mock<IStoreModel> mockStoreModel = new Mock<IStoreModel>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace Microsoft.Azure.Cosmos.Tests
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Collections;
Expand Down Expand Up @@ -126,10 +127,13 @@ internal override Task<PartitionKeyRangeCache> GetPartitionKeyRangeCacheAsync()
return Task.FromResult(this.partitionKeyRangeCache.Object);
}

internal override Task<IDictionary<string, object>> GetQueryEngineConfigurationAsync()
internal override Task<QueryPartitionProvider> QueryPartitionProvider
DmitriMelnikov marked this conversation as resolved.
Show resolved Hide resolved
{
Dictionary<string, object> queryEngineConfiguration = JsonConvert.DeserializeObject< Dictionary<string, object>>("{\"maxSqlQueryInputLength\":262144,\"maxJoinsPerSqlQuery\":5,\"maxLogicalAndPerSqlQuery\":500,\"maxLogicalOrPerSqlQuery\":500,\"maxUdfRefPerSqlQuery\":10,\"maxInExpressionItemsCount\":16000,\"queryMaxInMemorySortDocumentCount\":500,\"maxQueryRequestTimeoutFraction\":0.9,\"sqlAllowNonFiniteNumbers\":false,\"sqlAllowAggregateFunctions\":true,\"sqlAllowSubQuery\":true,\"sqlAllowScalarSubQuery\":true,\"allowNewKeywords\":true,\"sqlAllowLike\":false,\"sqlAllowGroupByClause\":false,\"maxSpatialQueryCells\":12,\"spatialMaxGeometryPointCount\":256,\"sqlAllowTop\":true,\"enableSpatialIndexing\":true}");
return Task.FromResult((IDictionary<string, object>)queryEngineConfiguration);
get
{
return Task.FromResult(new QueryPartitionProvider(
JsonConvert.DeserializeObject<Dictionary<string, object>>("{\"maxSqlQueryInputLength\":262144,\"maxJoinsPerSqlQuery\":5,\"maxLogicalAndPerSqlQuery\":500,\"maxLogicalOrPerSqlQuery\":500,\"maxUdfRefPerSqlQuery\":10,\"maxInExpressionItemsCount\":16000,\"queryMaxInMemorySortDocumentCount\":500,\"maxQueryRequestTimeoutFraction\":0.9,\"sqlAllowNonFiniteNumbers\":false,\"sqlAllowAggregateFunctions\":true,\"sqlAllowSubQuery\":true,\"sqlAllowScalarSubQuery\":true,\"allowNewKeywords\":true,\"sqlAllowLike\":false,\"sqlAllowGroupByClause\":false,\"maxSpatialQueryCells\":12,\"spatialMaxGeometryPointCount\":256,\"sqlAllowTop\":true,\"enableSpatialIndexing\":true}")));
}
}

ValueTask<(string token, string payload)> IAuthorizationTokenProvider.GetUserAuthorizationAsync(
Expand Down