Skip to content

Commit

Permalink
Internal QueryPartitionProvider: Adds Singleton QueryPartitionProvider (
Browse files Browse the repository at this point in the history
#1842)

* Shared QueryPartitionProvider. Original version creates and initializes one provider per query request.

* Reduce number of array allocations

* Allocate on heap for a larger arrays

* Make the call to check if client available

* Make property that returns provider to return the task

* Return ReturnsAsync to touch less files

* DocumentClient unit test for validating QueryPartitionProvider is singleton

Co-authored-by: REDMOND\dmmelnik <dmitri.melnikov@microsoft.com>
Co-authored-by: j82w <j82w@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 12, 2020
1 parent 6de15df commit c05f8c8
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 77 deletions.
19 changes: 14 additions & 5 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Query;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
Expand Down Expand Up @@ -166,6 +167,7 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider

private readonly bool hasAuthKeyResourceToken;
private readonly string authKeyResourceToken = string.Empty;
private AsyncLazy<QueryPartitionProvider> queryPartitionProvider;

private DocumentClientEventSource eventSource;
internal Task initializeTask;
Expand Down Expand Up @@ -797,6 +799,12 @@ internal virtual void Initialize(Uri serviceEndpoint,

DefaultTrace.InitEventListener();

this.queryPartitionProvider = new AsyncLazy<QueryPartitionProvider>(async () =>
{
await this.EnsureValidClientAsync();
return new QueryPartitionProvider(this.accountServiceConfiguration.QueryEngineConfiguration);
}, CancellationToken.None);

#if !(NETSTANDARD15 || NETSTANDARD16)
#if NETSTANDARD20
// GetEntryAssembly returns null when loaded from native netstandard2.0
Expand Down Expand Up @@ -1401,6 +1409,11 @@ public void Dispose()
this.GlobalEndpointManager = null;
}

if (this.queryPartitionProvider.IsValueCreated)
{
this.queryPartitionProvider.Value.Dispose();
}

DefaultTrace.TraceInformation("DocumentClient with id {0} disposed.", this.traceId);
DefaultTrace.Flush();

Expand Down Expand Up @@ -1446,11 +1459,7 @@ public void Dispose()
/// </remarks>
internal Action<IQueryable> OnExecuteScalarQueryCallback { get; set; }

internal virtual async Task<IDictionary<string, object>> GetQueryEngineConfigurationAsync()
{
await this.EnsureValidClientAsync();
return this.accountServiceConfiguration.QueryEngineConfiguration;
}
internal virtual Task<QueryPartitionProvider> QueryPartitionProvider => this.queryPartitionProvider.Value;

internal virtual async Task<ConsistencyLevel> GetDefaultConsistencyLevelAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,21 +166,33 @@ internal TryCatch<PartitionedQueryExecutionInfoInternal> TryGetPartitionedQueryE
string queryText = JsonConvert.SerializeObject(querySpec);

List<string> paths = new List<string>(partitionKeyDefinition.Paths);
List<IReadOnlyList<string>> pathPartsList = new List<IReadOnlyList<string>>(paths.Count);
uint[] partsLengths = new uint[paths.Count];
int allPartsLength = 0;

List<string[]> pathParts = new List<string[]>();
paths.ForEach(path =>
{
pathParts.Add(PathParser.GetPathParts(path));
});
for (int i = 0; i < paths.Count; i++)
{
IReadOnlyList<string> pathParts = PathParser.GetPathParts(paths[i]);
partsLengths[i] = (uint)pathParts.Count;
pathPartsList.Add(pathParts);
allPartsLength += pathParts.Count;
}

string[] allParts = pathParts.SelectMany(parts => parts).ToArray();
uint[] partsLengths = pathParts.Select(parts => (uint)parts.Length).ToArray();
string[] allParts = new string[allPartsLength];
int allPartsIndex = 0;
foreach (IReadOnlyList<string> pathParts in pathPartsList)
{
foreach (string part in pathParts)
{
allParts[allPartsIndex++] = part;
}
}

PartitionKind partitionKind = partitionKeyDefinition.Kind;

this.Initialize();

byte[] buffer = new byte[InitialBufferSize];
Span<byte> buffer = stackalloc byte[QueryPartitionProvider.InitialBufferSize];
uint errorCode;
uint serializedQueryExecutionInfoResultLength;

Expand All @@ -205,7 +217,11 @@ internal TryCatch<PartitionedQueryExecutionInfoInternal> TryGetPartitionedQueryE

if (errorCode == DISP_E_BUFFERTOOSMALL)
{
buffer = new byte[serializedQueryExecutionInfoResultLength];
// Allocate on stack for smaller arrays, otherwise use heap.
buffer = serializedQueryExecutionInfoResultLength < 4096
? stackalloc byte[(int)serializedQueryExecutionInfoResultLength]
: new byte[serializedQueryExecutionInfoResultLength];

fixed (byte* bytePtr2 = buffer)
{
errorCode = ServiceInteropWrapper.GetPartitionKeyRangesFromQuery(
Expand All @@ -227,7 +243,7 @@ internal TryCatch<PartitionedQueryExecutionInfoInternal> TryGetPartitionedQueryE
}
}

string serializedQueryExecutionInfo = Encoding.UTF8.GetString(buffer, 0, (int)serializedQueryExecutionInfoResultLength);
string serializedQueryExecutionInfo = Encoding.UTF8.GetString(buffer.Slice(0, (int)serializedQueryExecutionInfoResultLength));

Exception exception = Marshal.GetExceptionForHR((int)errorCode);
if (exception != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,24 +100,22 @@ public async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryPlanAsync(
return TryCatch<(PartitionedQueryExecutionInfo, bool)>.FromResult((tryGetQueryInfo.Result, neededQueryFeatures == QueryFeatures.None));
}

private async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryInfoAsync(
private Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryInfoAsync(
SqlQuerySpec sqlQuerySpec,
PartitionKeyDefinition partitionKeyDefinition,
bool hasLogicalPartitionKey,
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

TryCatch<PartitionedQueryExecutionInfo> tryGetPartitoinedQueryExecutionInfo = await this.queryClient.TryGetPartitionedQueryExecutionInfoAsync(
return this.queryClient.TryGetPartitionedQueryExecutionInfoAsync(
sqlQuerySpec: sqlQuerySpec,
partitionKeyDefinition: partitionKeyDefinition,
requireFormattableOrderByQuery: true,
isContinuationExpected: false,
allowNonValueAggregateQuery: true,
hasLogicalPartitionKey: hasLogicalPartitionKey,
cancellationToken: cancellationToken);

return tryGetPartitoinedQueryExecutionInfo;
}

private static class QueryPlanSupportChecker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private async Task<Tuple<DocumentFeedResponse<CosmosElement>, string>> ExecuteOn
// Get the routing map provider
CollectionCache collectionCache = await this.Client.GetCollectionCacheAsync();
ContainerProperties collection = await collectionCache.ResolveCollectionAsync(request, CancellationToken.None);
QueryPartitionProvider queryPartitionProvider = await this.Client.GetQueryPartitionProviderAsync(cancellationToken);
QueryPartitionProvider queryPartitionProvider = await this.Client.GetQueryPartitionProviderAsync();
IRoutingMapProvider routingMapProvider = await this.Client.GetRoutingMapProviderAsync();

// Figure out what partition you are going to based on the range from the continuation token
Expand Down Expand Up @@ -306,7 +306,6 @@ private static bool ServiceInteropAvailable()
}

providedRanges = PartitionRoutingHelper.GetProvidedPartitionKeyRanges(
(errorMessage) => new BadRequestException(errorMessage),
this.QuerySpec,
enableCrossPartitionQuery,
false,
Expand Down
24 changes: 2 additions & 22 deletions Microsoft.Azure.Cosmos/src/Query/v2Query/DocumentQueryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ namespace Microsoft.Azure.Cosmos.Query
internal sealed class DocumentQueryClient : IDocumentQueryClient
{
private readonly DocumentClient innerClient;
private readonly SemaphoreSlim semaphore;
private QueryPartitionProvider queryPartitionProvider;

public DocumentQueryClient(DocumentClient innerClient)
{
Expand All @@ -28,16 +26,11 @@ public DocumentQueryClient(DocumentClient innerClient)
}

this.innerClient = innerClient;
this.semaphore = new SemaphoreSlim(1, 1);
}

public void Dispose()
{
this.innerClient.Dispose();
if (this.queryPartitionProvider != null)
{
this.queryPartitionProvider.Dispose();
}
}

QueryCompatibilityMode IDocumentQueryClient.QueryCompatibilityMode
Expand Down Expand Up @@ -92,22 +85,9 @@ async Task<IRoutingMapProvider> IDocumentQueryClient.GetRoutingMapProviderAsync(
return await this.innerClient.GetPartitionKeyRangeCacheAsync();
}

public async Task<QueryPartitionProvider> GetQueryPartitionProviderAsync(CancellationToken cancellationToken)
public Task<QueryPartitionProvider> GetQueryPartitionProviderAsync()
{
if (this.queryPartitionProvider == null)
{
await this.semaphore.WaitAsync(cancellationToken);

if (this.queryPartitionProvider == null)
{
cancellationToken.ThrowIfCancellationRequested();
this.queryPartitionProvider = new QueryPartitionProvider(await this.innerClient.GetQueryEngineConfigurationAsync());
}

this.semaphore.Release();
}

return this.queryPartitionProvider;
return this.innerClient.QueryPartitionProvider;
}

public Task<DocumentServiceResponse> ExecuteQueryAsync(DocumentServiceRequest request, IDocumentClientRetryPolicy retryPolicyInstance, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public async Task<PartitionedQueryExecutionInfo> GetPartitionedQueryExecutionInf
cancellationToken.ThrowIfCancellationRequested();
// $ISSUE-felixfan-2016-07-13: We should probably get PartitionedQueryExecutionInfo from Gateway in GatewayMode

QueryPartitionProvider queryPartitionProvider = await this.Client.GetQueryPartitionProviderAsync(cancellationToken);
QueryPartitionProvider queryPartitionProvider = await this.Client.GetQueryPartitionProviderAsync();
TryCatch<PartitionedQueryExecutionInfo> tryGetPartitionedQueryExecutionInfo = queryPartitionProvider.TryGetPartitionedQueryExecutionInfo(
this.QuerySpec,
partitionKeyDefinition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal interface IDocumentQueryClient : IDisposable

Task<IRoutingMapProvider> GetRoutingMapProviderAsync();

Task<QueryPartitionProvider> GetQueryPartitionProviderAsync(CancellationToken cancellationToken);
Task<QueryPartitionProvider> GetQueryPartitionProviderAsync();

Task<DocumentServiceResponse> ExecuteQueryAsync(DocumentServiceRequest request, IDocumentClientRetryPolicy retryPolicyInstance, CancellationToken cancellationToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ internal class CosmosQueryClientCore : CosmosQueryClient
private readonly ContainerInternal cosmosContainerCore;
private readonly DocumentClient documentClient;
private readonly SemaphoreSlim semaphore;
private QueryPartitionProvider queryPartitionProvider;

public CosmosQueryClientCore(
CosmosClientContext clientContext,
Expand Down Expand Up @@ -88,26 +87,7 @@ public override async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetPartit
bool hasLogicalPartitionKey,
CancellationToken cancellationToken)
{
if (this.queryPartitionProvider == null)
{
try
{
await this.semaphore.WaitAsync(cancellationToken);

if (this.queryPartitionProvider == null)
{
cancellationToken.ThrowIfCancellationRequested();
IDictionary<string, object> queryConfiguration = await this.documentClient.GetQueryEngineConfigurationAsync();
this.queryPartitionProvider = new QueryPartitionProvider(queryConfiguration);
}
}
finally
{
this.semaphore.Release();
}
}

return this.queryPartitionProvider.TryGetPartitionedQueryExecutionInfo(
return (await this.documentClient.QueryPartitionProvider).TryGetPartitionedQueryExecutionInfo(
sqlQuerySpec,
partitionKeyDefinition,
requireFormattableOrderByQuery,
Expand Down
11 changes: 6 additions & 5 deletions Microsoft.Azure.Cosmos/src/Routing/DocumentAnalyzer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos.Routing
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using Microsoft.Azure.Documents;
Expand Down Expand Up @@ -39,10 +40,10 @@ public static PartitionKeyInternal ExtractPartitionKeyValue(Document document, P
return PartitionKeyInternal.FromObjectArray(
partitionKeyDefinition.Paths.Select(path =>
{
string[] parts = PathParser.GetPathParts(path);
Debug.Assert(parts.Length >= 1, "Partition key component definition path is invalid.");
IReadOnlyList<string> parts = PathParser.GetPathParts(path);
Debug.Assert(parts.Count >= 1, "Partition key component definition path is invalid.");
return document.GetValueByPath<object>(parts, Undefined.Value);
return document.GetValueByPath<object>(parts.ToArray(), Undefined.Value);
}).ToArray(),
false);
}
Expand Down Expand Up @@ -73,8 +74,8 @@ internal static PartitionKeyInternal ExtractPartitionKeyValue<T>(T data, Partiti
return PartitionKeyInternal.FromObjectArray(
partitionKeyDefinition.Paths.Select(path =>
{
string[] parts = PathParser.GetPathParts(path);
Debug.Assert(parts.Length >= 1, "Partition key component definition path is invalid.");
IReadOnlyList<string> parts = PathParser.GetPathParts(path);
Debug.Assert(parts.Count >= 1, "Partition key component definition path is invalid.");
JToken token = convertToJToken(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ namespace Microsoft.Azure.Cosmos.Routing
internal class PartitionRoutingHelper
{
public static IReadOnlyList<Range<string>> GetProvidedPartitionKeyRanges(
Func<string, Exception> createBadRequestException,
SqlQuerySpec querySpec,
bool enableCrossPartitionQuery,
bool parallelizeCrossPartitionQuery,
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/Routing/PathParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal sealed class PathParser
/// </remarks>
/// <param name="path">A path string</param>
/// <returns>An array of parts of path</returns>
public static string[] GetPathParts(string path)
public static IReadOnlyList<string> GetPathParts(string path)
{
List<string> tokens = new List<string>();
int currentIndex = 0;
Expand All @@ -45,7 +45,7 @@ public static string[] GetPathParts(string path)
}
}

return tokens.ToArray();
return tokens;
}

private static string GetEscapedToken(string path, ref int currentIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Internal;
using Microsoft.Azure.Cosmos.Linq;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Utils;
using Microsoft.Azure.Documents;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand Down Expand Up @@ -79,6 +81,21 @@ public void RetryExceedingMaxTimeLimit()
Assert.IsTrue(throttled);
}

[TestMethod]
public async Task QueryPartitionProviderSingletonTestAsync()
{
DocumentClient client = new DocumentClient(
new Uri(ConfigurationManager.AppSettings["GatewayEndpoint"]),
ConfigurationManager.AppSettings["MasterKey"],
(HttpMessageHandler)null,
new ConnectionPolicy());

Task<QueryPartitionProvider> queryPartitionProviderTaskOne = client.QueryPartitionProvider;
Task<QueryPartitionProvider> queryPartitionProviderTaskTwo = client.QueryPartitionProvider;
Assert.AreSame(queryPartitionProviderTaskOne, queryPartitionProviderTaskTwo, "QueryPartitionProvider property is not a singleton");
Assert.AreSame(await queryPartitionProviderTaskOne, await queryPartitionProviderTaskTwo, "QueryPartitionProvider property is not a singleton");
}

private void TestRetryOnThrottled(int? numberOfRetries)
{
Mock<IStoreModel> mockStoreModel = new Mock<IStoreModel>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace Microsoft.Azure.Cosmos.Tests
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Collections;
Expand Down Expand Up @@ -126,10 +127,13 @@ internal override Task<PartitionKeyRangeCache> GetPartitionKeyRangeCacheAsync()
return Task.FromResult(this.partitionKeyRangeCache.Object);
}

internal override Task<IDictionary<string, object>> GetQueryEngineConfigurationAsync()
internal override Task<QueryPartitionProvider> QueryPartitionProvider
{
Dictionary<string, object> queryEngineConfiguration = JsonConvert.DeserializeObject< Dictionary<string, object>>("{\"maxSqlQueryInputLength\":262144,\"maxJoinsPerSqlQuery\":5,\"maxLogicalAndPerSqlQuery\":500,\"maxLogicalOrPerSqlQuery\":500,\"maxUdfRefPerSqlQuery\":10,\"maxInExpressionItemsCount\":16000,\"queryMaxInMemorySortDocumentCount\":500,\"maxQueryRequestTimeoutFraction\":0.9,\"sqlAllowNonFiniteNumbers\":false,\"sqlAllowAggregateFunctions\":true,\"sqlAllowSubQuery\":true,\"sqlAllowScalarSubQuery\":true,\"allowNewKeywords\":true,\"sqlAllowLike\":false,\"sqlAllowGroupByClause\":false,\"maxSpatialQueryCells\":12,\"spatialMaxGeometryPointCount\":256,\"sqlAllowTop\":true,\"enableSpatialIndexing\":true}");
return Task.FromResult((IDictionary<string, object>)queryEngineConfiguration);
get
{
return Task.FromResult(new QueryPartitionProvider(
JsonConvert.DeserializeObject<Dictionary<string, object>>("{\"maxSqlQueryInputLength\":262144,\"maxJoinsPerSqlQuery\":5,\"maxLogicalAndPerSqlQuery\":500,\"maxLogicalOrPerSqlQuery\":500,\"maxUdfRefPerSqlQuery\":10,\"maxInExpressionItemsCount\":16000,\"queryMaxInMemorySortDocumentCount\":500,\"maxQueryRequestTimeoutFraction\":0.9,\"sqlAllowNonFiniteNumbers\":false,\"sqlAllowAggregateFunctions\":true,\"sqlAllowSubQuery\":true,\"sqlAllowScalarSubQuery\":true,\"allowNewKeywords\":true,\"sqlAllowLike\":false,\"sqlAllowGroupByClause\":false,\"maxSpatialQueryCells\":12,\"spatialMaxGeometryPointCount\":256,\"sqlAllowTop\":true,\"enableSpatialIndexing\":true}")));
}
}

ValueTask<(string token, string payload)> IAuthorizationTokenProvider.GetUserAuthorizationAsync(
Expand Down

0 comments on commit c05f8c8

Please sign in to comment.