Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal QueryPartitionProvider: Adds Singleton QueryPartitionProvider #1842

Merged
merged 10 commits into from
Sep 12, 2020
15 changes: 10 additions & 5 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Query;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
Expand Down Expand Up @@ -166,6 +167,7 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider

private readonly bool hasAuthKeyResourceToken;
private readonly string authKeyResourceToken = string.Empty;
private Lazy<QueryPartitionProvider> queryPartitionProvider;
DmitriMelnikov marked this conversation as resolved.
Show resolved Hide resolved

private DocumentClientEventSource eventSource;
internal Task initializeTask;
Expand Down Expand Up @@ -797,6 +799,8 @@ internal virtual void Initialize(Uri serviceEndpoint,

DefaultTrace.InitEventListener();

this.queryPartitionProvider = new Lazy<QueryPartitionProvider>(() => new QueryPartitionProvider(this.accountServiceConfiguration.QueryEngineConfiguration));

#if !(NETSTANDARD15 || NETSTANDARD16)
#if NETSTANDARD20
// GetEntryAssembly returns null when loaded from native netstandard2.0
Expand Down Expand Up @@ -1401,6 +1405,11 @@ public void Dispose()
this.GlobalEndpointManager = null;
}

if (this.queryPartitionProvider.IsValueCreated)
{
this.queryPartitionProvider.Value.Dispose();
}

DefaultTrace.TraceInformation("DocumentClient with id {0} disposed.", this.traceId);
DefaultTrace.Flush();

Expand Down Expand Up @@ -1446,11 +1455,7 @@ public void Dispose()
/// </remarks>
internal Action<IQueryable> OnExecuteScalarQueryCallback { get; set; }

internal virtual async Task<IDictionary<string, object>> GetQueryEngineConfigurationAsync()
{
await this.EnsureValidClientAsync();
return this.accountServiceConfiguration.QueryEngineConfiguration;
}
internal virtual QueryPartitionProvider QueryPartitionProvider => this.queryPartitionProvider.Value;
DmitriMelnikov marked this conversation as resolved.
Show resolved Hide resolved

internal virtual async Task<ConsistencyLevel> GetDefaultConsistencyLevelAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private static async Task<TryCatch<CosmosQueryExecutionContext>> TryCreateCoreCo
{
Documents.PartitionKeyDefinition partitionKeyDefinition = GetPartitionKeyDefinition(inputParameters, containerQueryProperties);

partitionedQueryExecutionInfo = await QueryPlanRetriever.GetQueryPlanWithServiceInteropAsync(
partitionedQueryExecutionInfo = QueryPlanRetriever.GetQueryPlanWithServiceInterop(
cosmosQueryContext.QueryClient,
inputParameters.SqlQuerySpec,
partitionKeyDefinition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public abstract Task<ContainerQueryProperties> GetCachedContainerQueryProperties
Documents.Routing.Range<string> range,
bool forceRefresh = false);

public abstract Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetPartitionedQueryExecutionInfoAsync(
public abstract TryCatch<PartitionedQueryExecutionInfo> TryGetPartitionedQueryExecutionInfo(
SqlQuerySpec sqlQuerySpec,
Documents.PartitionKeyDefinition partitionKeyDefinition,
bool requireFormattableOrderByQuery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public QueryPlanHandler(CosmosQueryClient queryClient)
this.queryClient = queryClient ?? throw new ArgumentNullException($"{nameof(queryClient)}");
}

public async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryPlanAsync(
public TryCatch<PartitionedQueryExecutionInfo> TryGetQueryPlan(
SqlQuerySpec sqlQuerySpec,
PartitionKeyDefinition partitionKeyDefinition,
QueryFeatures supportedQueryFeatures,
Expand All @@ -41,7 +41,7 @@ public async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryPlanAsync(
throw new ArgumentNullException($"{nameof(partitionKeyDefinition)}");
}

TryCatch<PartitionedQueryExecutionInfo> tryGetQueryInfo = await this.TryGetQueryInfoAsync(
TryCatch<PartitionedQueryExecutionInfo> tryGetQueryInfo = this.TryGetQueryInfo(
sqlQuerySpec,
partitionKeyDefinition,
hasLogicalPartitionKey,
Expand All @@ -65,7 +65,7 @@ public async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryPlanAsync(
/// <summary>
/// Used in the compute gateway to support legacy gateways query execution pattern.
/// </summary>
public async Task<TryCatch<(PartitionedQueryExecutionInfo queryPlan, bool supported)>> TryGetQueryInfoAndIfSupportedAsync(
public TryCatch<(PartitionedQueryExecutionInfo queryPlan, bool supported)> TryGetQueryInfoAndIfSupported(
QueryFeatures supportedQueryFeatures,
SqlQuerySpec sqlQuerySpec,
PartitionKeyDefinition partitionKeyDefinition,
Expand All @@ -84,7 +84,7 @@ public async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryPlanAsync(

cancellationToken.ThrowIfCancellationRequested();

TryCatch<PartitionedQueryExecutionInfo> tryGetQueryInfo = await this.TryGetQueryInfoAsync(
TryCatch<PartitionedQueryExecutionInfo> tryGetQueryInfo = this.TryGetQueryInfo(
sqlQuerySpec,
partitionKeyDefinition,
hasLogicalPartitionKey,
Expand All @@ -100,15 +100,15 @@ public async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryPlanAsync(
return TryCatch<(PartitionedQueryExecutionInfo, bool)>.FromResult((tryGetQueryInfo.Result, neededQueryFeatures == QueryFeatures.None));
}

private async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryInfoAsync(
private TryCatch<PartitionedQueryExecutionInfo> TryGetQueryInfo(
SqlQuerySpec sqlQuerySpec,
PartitionKeyDefinition partitionKeyDefinition,
bool hasLogicalPartitionKey,
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

TryCatch<PartitionedQueryExecutionInfo> tryGetPartitoinedQueryExecutionInfo = await this.queryClient.TryGetPartitionedQueryExecutionInfoAsync(
TryCatch<PartitionedQueryExecutionInfo> tryGetPartitoinedQueryExecutionInfo = this.queryClient.TryGetPartitionedQueryExecutionInfo(
sqlQuerySpec: sqlQuerySpec,
partitionKeyDefinition: partitionKeyDefinition,
requireFormattableOrderByQuery: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal static class QueryPlanRetriever

private static readonly string SupportedQueryFeaturesString = SupportedQueryFeatures.ToString();

public static async Task<PartitionedQueryExecutionInfo> GetQueryPlanWithServiceInteropAsync(
public static PartitionedQueryExecutionInfo GetQueryPlanWithServiceInterop(
CosmosQueryClient queryClient,
SqlQuerySpec sqlQuerySpec,
PartitionKeyDefinition partitionKeyDefinition,
Expand All @@ -54,7 +54,7 @@ public static async Task<PartitionedQueryExecutionInfo> GetQueryPlanWithServiceI
cancellationToken.ThrowIfCancellationRequested();
QueryPlanHandler queryPlanHandler = new QueryPlanHandler(queryClient);

TryCatch<PartitionedQueryExecutionInfo> tryGetQueryPlan = await queryPlanHandler.TryGetQueryPlanAsync(
TryCatch<PartitionedQueryExecutionInfo> tryGetQueryPlan = queryPlanHandler.TryGetQueryPlan(
sqlQuerySpec,
partitionKeyDefinition,
QueryPlanRetriever.SupportedQueryFeatures,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private async Task<Tuple<DocumentFeedResponse<CosmosElement>, string>> ExecuteOn
// Get the routing map provider
CollectionCache collectionCache = await this.Client.GetCollectionCacheAsync();
ContainerProperties collection = await collectionCache.ResolveCollectionAsync(request, CancellationToken.None);
QueryPartitionProvider queryPartitionProvider = await this.Client.GetQueryPartitionProviderAsync(cancellationToken);
QueryPartitionProvider queryPartitionProvider = this.Client.GetQueryPartitionProvider();
IRoutingMapProvider routingMapProvider = await this.Client.GetRoutingMapProviderAsync();

// Figure out what partition you are going to based on the range from the continuation token
Expand Down Expand Up @@ -306,7 +306,6 @@ private static bool ServiceInteropAvailable()
}

providedRanges = PartitionRoutingHelper.GetProvidedPartitionKeyRanges(
(errorMessage) => new BadRequestException(errorMessage),
this.QuerySpec,
enableCrossPartitionQuery,
false,
Expand Down
24 changes: 2 additions & 22 deletions Microsoft.Azure.Cosmos/src/Query/v2Query/DocumentQueryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ namespace Microsoft.Azure.Cosmos.Query
internal sealed class DocumentQueryClient : IDocumentQueryClient
{
private readonly DocumentClient innerClient;
private readonly SemaphoreSlim semaphore;
private QueryPartitionProvider queryPartitionProvider;

public DocumentQueryClient(DocumentClient innerClient)
{
Expand All @@ -28,16 +26,11 @@ public DocumentQueryClient(DocumentClient innerClient)
}

this.innerClient = innerClient;
this.semaphore = new SemaphoreSlim(1, 1);
}

public void Dispose()
{
this.innerClient.Dispose();
if (this.queryPartitionProvider != null)
{
this.queryPartitionProvider.Dispose();
}
}

QueryCompatibilityMode IDocumentQueryClient.QueryCompatibilityMode
Expand Down Expand Up @@ -92,22 +85,9 @@ async Task<IRoutingMapProvider> IDocumentQueryClient.GetRoutingMapProviderAsync(
return await this.innerClient.GetPartitionKeyRangeCacheAsync();
}

public async Task<QueryPartitionProvider> GetQueryPartitionProviderAsync(CancellationToken cancellationToken)
public QueryPartitionProvider GetQueryPartitionProvider()
{
if (this.queryPartitionProvider == null)
{
await this.semaphore.WaitAsync(cancellationToken);

if (this.queryPartitionProvider == null)
{
cancellationToken.ThrowIfCancellationRequested();
this.queryPartitionProvider = new QueryPartitionProvider(await this.innerClient.GetQueryEngineConfigurationAsync());
}

this.semaphore.Release();
}

return this.queryPartitionProvider;
return this.innerClient.QueryPartitionProvider;
}

public Task<DocumentServiceResponse> ExecuteQueryAsync(DocumentServiceRequest request, IDocumentClientRetryPolicy retryPolicyInstance, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ protected SqlQuerySpec QuerySpec

public Guid CorrelatedActivityId { get; }

public async Task<PartitionedQueryExecutionInfo> GetPartitionedQueryExecutionInfoAsync(
public PartitionedQueryExecutionInfo GetPartitionedQueryExecutionInfo(
PartitionKeyDefinition partitionKeyDefinition,
bool requireFormattableOrderByQuery,
bool isContinuationExpected,
Expand All @@ -163,7 +163,7 @@ public async Task<PartitionedQueryExecutionInfo> GetPartitionedQueryExecutionInf
cancellationToken.ThrowIfCancellationRequested();
// $ISSUE-felixfan-2016-07-13: We should probably get PartitionedQueryExecutionInfo from Gateway in GatewayMode

QueryPartitionProvider queryPartitionProvider = await this.Client.GetQueryPartitionProviderAsync(cancellationToken);
QueryPartitionProvider queryPartitionProvider = this.Client.GetQueryPartitionProvider();
TryCatch<PartitionedQueryExecutionInfo> tryGetPartitionedQueryExecutionInfo = queryPartitionProvider.TryGetPartitionedQueryExecutionInfo(
this.QuerySpec,
partitionKeyDefinition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public static async Task<IDocumentQueryExecutionContext> CreateDocumentQueryExec
//todo:elasticcollections this may rely on information from collection cache which is outdated
//if collection is deleted/created with same name.
//need to make it not rely on information from collection cache.
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = await queryExecutionContext.GetPartitionedQueryExecutionInfoAsync(
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = queryExecutionContext.GetPartitionedQueryExecutionInfo(
partitionKeyDefinition: collection.PartitionKey,
requireFormattableOrderByQuery: true,
isContinuationExpected: isContinuationExpected,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal interface IDocumentQueryClient : IDisposable

Task<IRoutingMapProvider> GetRoutingMapProviderAsync();

Task<QueryPartitionProvider> GetQueryPartitionProviderAsync(CancellationToken cancellationToken);
QueryPartitionProvider GetQueryPartitionProvider();

Task<DocumentServiceResponse> ExecuteQueryAsync(DocumentServiceRequest request, IDocumentClientRetryPolicy retryPolicyInstance, CancellationToken cancellationToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ internal class CosmosQueryClientCore : CosmosQueryClient
private readonly ContainerInternal cosmosContainerCore;
private readonly DocumentClient documentClient;
private readonly SemaphoreSlim semaphore;
private QueryPartitionProvider queryPartitionProvider;

public CosmosQueryClientCore(
CosmosClientContext clientContext,
Expand Down Expand Up @@ -79,7 +78,7 @@ public override async Task<ContainerQueryProperties> GetCachedContainerQueryProp
containerProperties.PartitionKey);
}

public override async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetPartitionedQueryExecutionInfoAsync(
public override TryCatch<PartitionedQueryExecutionInfo> TryGetPartitionedQueryExecutionInfo(
SqlQuerySpec sqlQuerySpec,
PartitionKeyDefinition partitionKeyDefinition,
bool requireFormattableOrderByQuery,
Expand All @@ -88,26 +87,7 @@ public override async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetPartit
bool hasLogicalPartitionKey,
CancellationToken cancellationToken)
{
if (this.queryPartitionProvider == null)
{
try
{
await this.semaphore.WaitAsync(cancellationToken);

if (this.queryPartitionProvider == null)
{
cancellationToken.ThrowIfCancellationRequested();
IDictionary<string, object> queryConfiguration = await this.documentClient.GetQueryEngineConfigurationAsync();
this.queryPartitionProvider = new QueryPartitionProvider(queryConfiguration);
}
}
finally
{
this.semaphore.Release();
}
}

return this.queryPartitionProvider.TryGetPartitionedQueryExecutionInfo(
return this.documentClient.QueryPartitionProvider.TryGetPartitionedQueryExecutionInfo(
sqlQuerySpec,
partitionKeyDefinition,
requireFormattableOrderByQuery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ public override async Task<TryExecuteQueryResult> TryExecuteQueryAsync(

QueryPlanHandler queryPlanHandler = new QueryPlanHandler(this.queryClient);

TryCatch<(PartitionedQueryExecutionInfo queryPlan, bool supported)> tryGetQueryInfoAndIfSupported = await queryPlanHandler.TryGetQueryInfoAndIfSupportedAsync(
TryCatch<(PartitionedQueryExecutionInfo queryPlan, bool supported)> tryGetQueryInfoAndIfSupported = queryPlanHandler.TryGetQueryInfoAndIfSupported(
supportedQueryFeatures,
queryDefinition.ToSqlQuerySpec(),
partitionKeyDefinition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ namespace Microsoft.Azure.Cosmos.Routing
internal class PartitionRoutingHelper
{
public static IReadOnlyList<Range<string>> GetProvidedPartitionKeyRanges(
Func<string, Exception> createBadRequestException,
SqlQuerySpec querySpec,
bool enableCrossPartitionQuery,
bool parallelizeCrossPartitionQuery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,15 @@ public async Task TestCosmosQueryPartitionKeyDefinition()
.Setup(x => x.ByPassQueryParsing())
.Returns(false);
client
.Setup(x => x.TryGetPartitionedQueryExecutionInfoAsync(
.Setup(x => x.TryGetPartitionedQueryExecutionInfo(
It.IsAny<SqlQuerySpec>(),
It.IsAny<PartitionKeyDefinition>(),
It.IsAny<bool>(),
It.IsAny<bool>(),
It.IsAny<bool>(),
It.IsAny<bool>(),
It.IsAny<CancellationToken>()))
.ReturnsAsync(TryCatch<PartitionedQueryExecutionInfo>.FromException(
.Returns(TryCatch<PartitionedQueryExecutionInfo>.FromException(
new InvalidOperationException(
exceptionMessage)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace Microsoft.Azure.Cosmos.Tests
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Collections;
Expand Down Expand Up @@ -126,10 +127,13 @@ internal override Task<PartitionKeyRangeCache> GetPartitionKeyRangeCacheAsync()
return Task.FromResult(this.partitionKeyRangeCache.Object);
}

internal override Task<IDictionary<string, object>> GetQueryEngineConfigurationAsync()
internal override QueryPartitionProvider QueryPartitionProvider
{
Dictionary<string, object> queryEngineConfiguration = JsonConvert.DeserializeObject< Dictionary<string, object>>("{\"maxSqlQueryInputLength\":262144,\"maxJoinsPerSqlQuery\":5,\"maxLogicalAndPerSqlQuery\":500,\"maxLogicalOrPerSqlQuery\":500,\"maxUdfRefPerSqlQuery\":10,\"maxInExpressionItemsCount\":16000,\"queryMaxInMemorySortDocumentCount\":500,\"maxQueryRequestTimeoutFraction\":0.9,\"sqlAllowNonFiniteNumbers\":false,\"sqlAllowAggregateFunctions\":true,\"sqlAllowSubQuery\":true,\"sqlAllowScalarSubQuery\":true,\"allowNewKeywords\":true,\"sqlAllowLike\":false,\"sqlAllowGroupByClause\":false,\"maxSpatialQueryCells\":12,\"spatialMaxGeometryPointCount\":256,\"sqlAllowTop\":true,\"enableSpatialIndexing\":true}");
return Task.FromResult((IDictionary<string, object>)queryEngineConfiguration);
get
{
return new QueryPartitionProvider(
JsonConvert.DeserializeObject<Dictionary<string, object>>("{\"maxSqlQueryInputLength\":262144,\"maxJoinsPerSqlQuery\":5,\"maxLogicalAndPerSqlQuery\":500,\"maxLogicalOrPerSqlQuery\":500,\"maxUdfRefPerSqlQuery\":10,\"maxInExpressionItemsCount\":16000,\"queryMaxInMemorySortDocumentCount\":500,\"maxQueryRequestTimeoutFraction\":0.9,\"sqlAllowNonFiniteNumbers\":false,\"sqlAllowAggregateFunctions\":true,\"sqlAllowSubQuery\":true,\"sqlAllowScalarSubQuery\":true,\"allowNewKeywords\":true,\"sqlAllowLike\":false,\"sqlAllowGroupByClause\":false,\"maxSpatialQueryCells\":12,\"spatialMaxGeometryPointCount\":256,\"sqlAllowTop\":true,\"enableSpatialIndexing\":true}"));
}
}

ValueTask<(string token, string payload)> IAuthorizationTokenProvider.GetUserAuthorizationAsync(
Expand Down