Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Query: Adds Split Support for Ode #3572

Merged
merged 41 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
1dbfc8a
Added tests to test different aspects of merge/split support with Opt…
akotalwar Oct 19, 2022
2fc0b89
Added gone exception simulation tests.
akotalwar Oct 26, 2022
36d18bb
Added new tests and improved test infra
akotalwar Nov 4, 2022
924b24a
Removed ParalleContEvocation test. Fixed comments
akotalwar Nov 9, 2022
095a44d
Removed CreateParallelCrossPartitionPipelineStateAsync() as it is not…
akotalwar Nov 9, 2022
9115fcf
Removed while loop in CreateDocumentContainerAsync()
akotalwar Nov 10, 2022
c054909
Fixed comments.
akotalwar Nov 11, 2022
5807da9
Merge branch 'master' into users/akotalwar/TestsForMergeSplitSupport
akotalwar Nov 11, 2022
bfb4110
Updated ExecuteGoneExceptionOnODEPipeline()
akotalwar Nov 12, 2022
ea0aac4
Added type Assert for ExecuteGoneExceptionOnODEPipeline()
akotalwar Nov 12, 2022
b886f40
Replaced try-catch with if statement in MoveNextAsync()
akotalwar Nov 17, 2022
04389e0
Added delegate to access TryCreateCoreContextAsync()
akotalwar Nov 30, 2022
4af2036
Added check to confirm Ode pipeline is not called in fallback plan
akotalwar Nov 30, 2022
1257ad3
Updated method name from OptimisticDirectExecutionContext() to TryCre…
akotalwar Nov 30, 2022
bd4c195
Using delegate instead of Func<>.
akotalwar Dec 1, 2022
661b4f8
Ode fallback plan always calls Specialized pipeline
akotalwar Dec 2, 2022
039fbe8
Using ServiceInterop/Gateway to get QueryPlan for Specialized Pipeline
akotalwar Dec 7, 2022
60392ad
Added new test to check handling of failing fallback pipeline
akotalwar Dec 18, 2022
f9fbd8a
Code cleanup
akotalwar Dec 19, 2022
c230837
Added logic for handling non ODE continuation tokens
akotalwar Dec 21, 2022
c66be0f
Moved delegate away from member variables
akotalwar Dec 21, 2022
758c07f
Added tests for Merge case
akotalwar Dec 22, 2022
d442b99
Updated method names
akotalwar Dec 28, 2022
d1dcd7e
Added checks for tryCatch
akotalwar Dec 30, 2022
294d173
Updated SetCancellationToken() to use Try
akotalwar Jan 3, 2023
0621090
Updated TryUnwrapContinuationToken()
akotalwar Jan 4, 2023
87c9916
Removed changes in FlakyDocumentContainer.cs
akotalwar Jan 6, 2023
c4b0546
Removed unused imports
akotalwar Jan 6, 2023
3de111c
Updated comments
akotalwar Jan 6, 2023
7543ec7
Fixed comments and cleaned up test code
akotalwar Jan 9, 2023
134dcc8
Added CosmosElement null check in TryUnwrapContinuationToken()
akotalwar Jan 9, 2023
9d37c12
Removed FlakyDocumentContainer.cs from pull request
akotalwar Jan 9, 2023
a009e53
Removed unused imports
akotalwar Jan 9, 2023
400d4f5
Updated TryUnwrapContinuationToken()
akotalwar Jan 9, 2023
6179112
Update MoveNextAsync() call in OptimisticDirectExecutionQueryBaseline…
akotalwar Jan 10, 2023
c94db62
Made MergeTestUtil.IsFailedFallbackPipelineTest a readonly property
akotalwar Jan 11, 2023
2416a89
Added IsPartitionSplitException() overload to take CosmosElement
akotalwar Jan 13, 2023
bc8fa42
Resolved git conflicts
akotalwar Jan 13, 2023
c129afe
Fixed bug regarding syntax error queries
akotalwar Jan 18, 2023
a68dd2c
Merge branch 'master' into users/akotalwar/SplitSupport
akotalwar Jan 18, 2023
0d5390e
Merge branch 'master' into users/akotalwar/SplitSupport
akotalwar Jan 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Using ServiceInterop/Gateway to get QueryPlan for Specialized Pipeline
  • Loading branch information
akotalwar committed Dec 7, 2022
commit 039fbe8d2072ddee59863aa62c040ca0999689a0
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,22 @@ public async Task<TryCatch<T>> TryAsync<T>(
return matchResult;
}

public TryCatch<T> Try<T>(
Func<TResult, TryCatch<T>> onSuccess)
{
TryCatch<T> matchResult;
if (this.Succeeded)
{
matchResult = onSuccess(this.either.FromRight(default));
}
else
{
matchResult = TryCatch<T>.FromException(this.either.FromLeft(default));
}

return matchResult;
}

public TryCatch<TResult> Catch(
Action<Exception> onError)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext
using Microsoft.Azure.Cosmos.SqlObjects;
using Microsoft.Azure.Cosmos.SqlObjects.Visitors;
using Microsoft.Azure.Cosmos.Tracing;
using static Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.CosmosQueryExecutionContextFactory;

internal static class CosmosQueryExecutionContextFactory
{
Expand Down Expand Up @@ -138,12 +137,12 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateCoreContextAsy
cosmosQueryContext.ContainerResourceId = containerQueryProperties.ResourceId;

Documents.PartitionKeyRange targetRange = await GetTargetRangeOptimisticDirectExecutionAsync(
inputParameters,
queryPlanFromContinuationToken,
cosmosQueryContext,
containerQueryProperties,
inputParameters,
queryPlanFromContinuationToken,
cosmosQueryContext,
containerQueryProperties,
trace);

if (targetRange != null)
{
// Test code added to confirm the correct pipeline is being utilized
Expand Down Expand Up @@ -229,33 +228,12 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateCoreContextAsy
}
}

if (cosmosQueryContext.QueryClient.ByPassQueryParsing())
{
// For non-Windows platforms(like Linux and OSX) in .NET Core SDK, we cannot use ServiceInterop, so need to bypass in that case.
// We are also now bypassing this for 32 bit host process running even on Windows as there are many 32 bit apps that will not work without this
partitionedQueryExecutionInfo = await QueryPlanRetriever.GetQueryPlanThroughGatewayAsync(
cosmosQueryContext,
inputParameters.SqlQuerySpec,
cosmosQueryContext.ResourceLink,
inputParameters.PartitionKey,
createQueryPipelineTrace,
cancellationToken);
}
else
{
Documents.PartitionKeyDefinition partitionKeyDefinition = GetPartitionKeyDefinition(inputParameters, containerQueryProperties);

partitionedQueryExecutionInfo = await QueryPlanRetriever.GetQueryPlanWithServiceInteropAsync(
cosmosQueryContext.QueryClient,
inputParameters.SqlQuerySpec,
cosmosQueryContext.ResourceTypeEnum,
partitionKeyDefinition,
inputParameters.PartitionKey != null,
containerQueryProperties.GeospatialType,
cosmosQueryContext.UseSystemPrefix,
createQueryPipelineTrace,
cancellationToken);
}
partitionedQueryExecutionInfo = await GetPartitionedQueryExecutionInfoFromGatewayOrServiceInteropAsync(
cosmosQueryContext,
inputParameters,
containerQueryProperties,
createQueryPipelineTrace,
cancellationToken);
}

return await TryCreateFromPartitionedQueryExecutionInfoAsync(
Expand Down Expand Up @@ -407,7 +385,7 @@ private static TryCatch<IQueryPipelineStage> TryCreateOptimisticDirectExecutionC
inputParameters: inputParameters,
targetRange: new FeedRangeEpk(targetRange.ToRange()),
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: inputParameters.MaxItemCount),
queryPipelineStage: (CosmosElement continuationToken) =>
fallbackQueryPipelineStageFactory: (continuationToken) =>
{
InputParameters updatedInputParameters = new InputParameters(
inputParameters.SqlQuerySpec,
Expand Down Expand Up @@ -447,44 +425,12 @@ public static async Task<TryCatch<IQueryPipelineStage>> TryCreateOdeFallbackPipe
ITrace trace,
CancellationToken cancellationToken)
{
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo;
Documents.PartitionKeyDefinition partitionKeyDefinition = GetPartitionKeyDefinition(inputParameters, containerQueryProperties);

if (partitionKeyDefinition != null)
{
partitionedQueryExecutionInfo = await QueryPlanRetriever.GetQueryPlanWithServiceInteropAsync(
cosmosQueryContext.QueryClient,
inputParameters.SqlQuerySpec,
cosmosQueryContext.ResourceTypeEnum,
partitionKeyDefinition,
inputParameters.PartitionKey != null,
containerQueryProperties.GeospatialType,
cosmosQueryContext.UseSystemPrefix,
trace,
cancellationToken);
}
else
{
partitionedQueryExecutionInfo = new PartitionedQueryExecutionInfo()
{
QueryInfo = new QueryInfo()
{
Aggregates = null,
DistinctType = DistinctQueryType.None,
GroupByAliases = null,
GroupByAliasToAggregateType = null,
GroupByExpressions = null,
HasSelectValue = false,
Limit = null,
Offset = null,
OrderBy = null,
OrderByExpressions = null,
RewrittenQuery = null,
Top = null,
},
QueryRanges = new List<Documents.Routing.Range<string>>(),
};
}
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = await GetPartitionedQueryExecutionInfoFromGatewayOrServiceInteropAsync(
cosmosQueryContext,
inputParameters,
containerQueryProperties,
trace,
cancellationToken);

List<Documents.PartitionKeyRange> targetRanges = await CosmosQueryExecutionContextFactory.GetTargetPartitionKeyRangesAsync(
cosmosQueryContext.QueryClient,
Expand Down Expand Up @@ -588,6 +534,45 @@ private static TryCatch<IQueryPipelineStage> TryCreateSpecializedDocumentQueryEx
requestCancellationToken: cancellationToken);
}

private static async Task<PartitionedQueryExecutionInfo> GetPartitionedQueryExecutionInfoFromGatewayOrServiceInteropAsync(
CosmosQueryContext cosmosQueryContext,
InputParameters inputParameters,
ContainerQueryProperties containerQueryProperties,
ITrace trace,
CancellationToken cancellationToken)
{
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo;
if (cosmosQueryContext.QueryClient.ByPassQueryParsing())
{
// For non-Windows platforms(like Linux and OSX) in .NET Core SDK, we cannot use ServiceInterop, so need to bypass in that case.
// We are also now bypassing this for 32 bit host process running even on Windows as there are many 32 bit apps that will not work without this
partitionedQueryExecutionInfo = await QueryPlanRetriever.GetQueryPlanThroughGatewayAsync(
cosmosQueryContext,
inputParameters.SqlQuerySpec,
cosmosQueryContext.ResourceLink,
inputParameters.PartitionKey,
trace,
cancellationToken);
}
else
{
Documents.PartitionKeyDefinition partitionKeyDefinition = GetPartitionKeyDefinition(inputParameters, containerQueryProperties);

partitionedQueryExecutionInfo = await QueryPlanRetriever.GetQueryPlanWithServiceInteropAsync(
cosmosQueryContext.QueryClient,
inputParameters.SqlQuerySpec,
cosmosQueryContext.ResourceTypeEnum,
partitionKeyDefinition,
inputParameters.PartitionKey != null,
containerQueryProperties.GeospatialType,
cosmosQueryContext.UseSystemPrefix,
trace,
cancellationToken);
}

return partitionedQueryExecutionInfo;
}

/// <summary>
/// Gets the list of partition key ranges.
/// 1. Check partition key range id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.OptimisticDirectExecutionQu
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -18,41 +17,44 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.OptimisticDirectExecutionQu
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.Parallel;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
using static Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.CosmosQueryExecutionContextFactory;
using static Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.PartitionMapper;

internal sealed class OptimisticDirectExecutionQueryPipelineStage : IQueryPipelineStage
{
private enum ExecutionState
{
OptimisticDirectExecution,
SpecializedDocumentQueryExecution,
}

private const string optimisticDirectExecutionToken = "OptimisticDirectExecutionToken";
private const string optimisticDirectExecution = "OptimisticDirectExecution";
private const string specializedDocumentQueryExecution = "SpecializedDocumentQueryExecution";
public delegate Task<TryCatch<IQueryPipelineStage>> FallbackQueryPipelineStageFactory(CosmosElement continuationToken);
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
private readonly FallbackQueryPipelineStageFactory queryPipelineStageFactory;
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
private TryCatch<IQueryPipelineStage> inner;
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
private CosmosElement continuationToken;
private string queryState;
private OptimisticDirectExecutionQueryPipelineStage(TryCatch<IQueryPipelineStage> queryPipelineStage, FallbackQueryPipelineStageFactory queryPipelineStageFactory, CosmosElement continuationToken)
private ExecutionState executionState;

private OptimisticDirectExecutionQueryPipelineStage(TryCatch<IQueryPipelineStage> inner, FallbackQueryPipelineStageFactory queryPipelineStageFactory, CosmosElement continuationToken)
{
this.inner = queryPipelineStage;
this.inner = inner;
this.queryPipelineStageFactory = queryPipelineStageFactory;
this.continuationToken = continuationToken;
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
this.executionState = ExecutionState.OptimisticDirectExecution;
}

public TryCatch<QueryPage> Current => this.inner.Result.Current;
public TryCatch<QueryPage> Current => this.inner.Try<QueryPage>(pipelineStage => pipelineStage.Current);

public ValueTask DisposeAsync()
{
return this.inner.Result.DisposeAsync();
return this.inner.Try<ValueTask>(pipelineStage => pipelineStage.DisposeAsync()).Result;
}
akotalwar marked this conversation as resolved.
Show resolved Hide resolved

public async ValueTask<bool> MoveNextAsync(ITrace trace)
{
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
if (this.inner.Result == null)
if (this.inner.Failed)
{
this.inner = TryCatch<IQueryPipelineStage>.FromException(this.inner.Exception);
return false;
}
akotalwar marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -64,44 +66,49 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)

if (success)
{
this.queryState = optimisticDirectExecution;
this.SaveContinuation(this.Current.Result.State?.Value);
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
}
else if (isGoneException)
else if (isGoneException && this.executionState == ExecutionState.OptimisticDirectExecution)
{
// Only Ode pipeline GoneException should be handled
Debug.Assert(this.queryState == optimisticDirectExecution);
this.inner = await this.queryPipelineStageFactory(TryUnwrapContinuationToken(this.continuationToken).Result);
this.executionState = ExecutionState.SpecializedDocumentQueryExecution;

this.inner = await this.queryPipelineStageFactory(this.UnwrapContinuationToken(this.continuationToken));
this.queryState = specializedDocumentQueryExecution;

// TODO: Failure check for this.inner
bool fallbackPipelineSuccess = await this.inner.Result.MoveNextAsync(trace);
if (this.inner.Failed)
{
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
this.inner = TryCatch<IQueryPipelineStage>.FromException(this.inner.Exception);
return false;
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
}

return fallbackPipelineSuccess;
success = await this.inner.Result.MoveNextAsync(trace);
}
akotalwar marked this conversation as resolved.
Show resolved Hide resolved

return success;
return success;
}

public void SetCancellationToken(CancellationToken cancellationToken)
{
this.inner.Result.SetCancellationToken(cancellationToken);
this.inner.Try<IQueryPipelineStage>((pipelineStage) => pipelineStage = this.inner.Result).Result.SetCancellationToken(cancellationToken);
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
}

public CosmosElement UnwrapContinuationToken(CosmosElement continuationToken)
public static TryCatch<CosmosElement> TryUnwrapContinuationToken(CosmosElement continuationToken)
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
{
((CosmosObject)continuationToken).TryGetValue(optimisticDirectExecutionToken, out CosmosElement specializedContinuationToken);
if (!((CosmosObject)continuationToken).TryGetValue(optimisticDirectExecutionToken, out CosmosElement specializedContinuationToken))
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
{
return TryCatch<CosmosElement>.FromException(
new FormatException(
$"Unable to convert CosmosObject '{optimisticDirectExecutionToken}' to CosmosElement."));
}

CosmosArray cosmosElementFallbackToken = CosmosArray.Create(specializedContinuationToken);
return cosmosElementFallbackToken;
return TryCatch<CosmosElement>.FromResult(cosmosElementFallbackToken);
}

public static TryCatch<IQueryPipelineStage> MonadicCreate(
DocumentContainer documentContainer,
InputParameters inputParameters,
CosmosQueryExecutionContextFactory.InputParameters inputParameters,
FeedRangeEpk targetRange,
QueryPaginationOptions queryPaginationOptions,
FallbackQueryPipelineStageFactory queryPipelineStage,
FallbackQueryPipelineStageFactory fallbackQueryPipelineStageFactory,
CancellationToken cancellationToken)
{
TryCatch<IQueryPipelineStage> pipelineStage = OptimisticDirectExecutionQueryPipelineImpl.MonadicCreate(
Expand All @@ -118,7 +125,7 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
return TryCatch<IQueryPipelineStage>.FromException(pipelineStage.Exception);
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
}

OptimisticDirectExecutionQueryPipelineStage odePipelineStageMonadicCreate = new OptimisticDirectExecutionQueryPipelineStage(pipelineStage, queryPipelineStage, inputParameters.InitialUserContinuationToken);
OptimisticDirectExecutionQueryPipelineStage odePipelineStageMonadicCreate = new OptimisticDirectExecutionQueryPipelineStage(pipelineStage, fallbackQueryPipelineStageFactory, inputParameters.InitialUserContinuationToken);

return TryCatch<IQueryPipelineStage>.FromResult(odePipelineStageMonadicCreate);
}
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -262,7 +269,7 @@ private static TryCatch<FeedRangeState<QueryState>> MonadicExtractState(
return TryCatch<FeedRangeState<QueryState>>.FromException(tryCreateContinuationToken.Exception);
}

TryCatch<PartitionMapping<OptimisticDirectExecutionContinuationToken>> partitionMappingMonad = PartitionMapper.MonadicGetPartitionMapping(
TryCatch<PartitionMapper.PartitionMapping<OptimisticDirectExecutionContinuationToken>> partitionMappingMonad = PartitionMapper.MonadicGetPartitionMapping(
range,
tryCreateContinuationToken.Result);

Expand All @@ -272,7 +279,7 @@ private static TryCatch<FeedRangeState<QueryState>> MonadicExtractState(
partitionMappingMonad.Exception);
}

PartitionMapping<OptimisticDirectExecutionContinuationToken> partitionMapping = partitionMappingMonad.Result;
PartitionMapper.PartitionMapping<OptimisticDirectExecutionContinuationToken> partitionMapping = partitionMappingMonad.Result;

KeyValuePair<FeedRangeEpk, OptimisticDirectExecutionContinuationToken> kvpRange = new KeyValuePair<FeedRangeEpk, OptimisticDirectExecutionContinuationToken>(
partitionMapping.TargetMapping.Keys.First(),
Expand Down
Loading