Skip to content

Commit

Permalink
Query: Adds EnableOptimisticDirectExecution flag to QueryRequestOptio…
Browse files Browse the repository at this point in the history
…ns (#3664)

* Added new flag to QueryRequestOptions to allow customers to use Ode pipeline

* Updated comments in QueryRequestOptions.cs

* Renamed enabledOde to enableOde

* Removed default setting for EnableOptimisticDirectExecution
  • Loading branch information
akotalwar authored Jan 26, 2023
1 parent c29abfe commit 2608d4a
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ private static TryCatch<IQueryPipelineStage> TryCreateSpecializedDocumentQueryEx
inputParameters.ExecutionEnvironment,
inputParameters.ReturnResultsInDeterministicOrder,
inputParameters.ForcePassthrough,
inputParameters.EnableOptimisticDirectExecution,
inputParameters.TestInjections);
}

Expand Down Expand Up @@ -752,7 +753,7 @@ private static Documents.PartitionKeyDefinition GetPartitionKeyDefinition(InputP
ContainerQueryProperties containerQueryProperties,
ITrace trace)
{
if (inputParameters.TestInjections == null || !inputParameters.TestInjections.EnableOptimisticDirectExecution) return null;
if (!inputParameters.EnableOptimisticDirectExecution) return null;

// case 1: Is query going to a single partition
bool hasPartitionKey = inputParameters.PartitionKey.HasValue
Expand Down Expand Up @@ -824,6 +825,7 @@ public InputParameters(
ExecutionEnvironment? executionEnvironment,
bool? returnResultsInDeterministicOrder,
bool forcePassthrough,
bool enableOptimisticDirectExecution,
TestInjections testInjections)
{
this.SqlQuerySpec = sqlQuerySpec ?? throw new ArgumentNullException(nameof(sqlQuerySpec));
Expand Down Expand Up @@ -857,6 +859,7 @@ public InputParameters(
this.ExecutionEnvironment = executionEnvironment.GetValueOrDefault(InputParameters.DefaultExecutionEnvironment);
this.ReturnResultsInDeterministicOrder = returnResultsInDeterministicOrder.GetValueOrDefault(InputParameters.DefaultReturnResultsInDeterministicOrder);
this.ForcePassthrough = forcePassthrough;
this.EnableOptimisticDirectExecution = enableOptimisticDirectExecution;
this.TestInjections = testInjections;
}

Expand All @@ -873,6 +876,7 @@ public InputParameters(
public bool ReturnResultsInDeterministicOrder { get; }
public TestInjections TestInjections { get; }
public bool ForcePassthrough { get; }
public bool EnableOptimisticDirectExecution { get; }

public InputParameters WithContinuationToken(CosmosElement token)
{
Expand All @@ -889,6 +893,7 @@ public InputParameters WithContinuationToken(CosmosElement token)
this.ExecutionEnvironment,
this.ReturnResultsInDeterministicOrder,
this.ForcePassthrough,
this.EnableOptimisticDirectExecution,
this.TestInjections);
}
}
Expand Down
5 changes: 1 addition & 4 deletions Microsoft.Azure.Cosmos/src/Query/Core/TestInjections.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,13 @@ public enum PipelineType
OptimisticDirectExecution,
}

public TestInjections(bool simulate429s, bool simulateEmptyPages, bool enableOptimisticDirectExecution = false, ResponseStats responseStats = null)
public TestInjections(bool simulate429s, bool simulateEmptyPages, ResponseStats responseStats = null)
{
this.SimulateThrottles = simulate429s;
this.SimulateEmptyPages = simulateEmptyPages;
this.Stats = responseStats;
this.EnableOptimisticDirectExecution = enableOptimisticDirectExecution;
}

public bool EnableOptimisticDirectExecution { get; }

public bool SimulateThrottles { get; }

public bool SimulateEmptyPages { get; }
Expand Down
1 change: 1 addition & 0 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public static QueryIterator Create(
executionEnvironment: queryRequestOptions.ExecutionEnvironment,
returnResultsInDeterministicOrder: queryRequestOptions.ReturnResultsInDeterministicOrder,
forcePassthrough: forcePassthrough,
enableOptimisticDirectExecution: queryRequestOptions.EnableOptimisticDirectExecution,
testInjections: queryRequestOptions.TestSettings);

return new QueryIterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ public class QueryRequestOptions : RequestOptions
/// </value>
public bool? EnableLowPrecisionOrderBy { get; set; }

/// <summary>
/// Gets or sets the option for customers to opt in for direct (optimistic) execution of the query..
/// </summary>
/// <value>
/// Direct (optimistic) execution offers improved performance for several kinds of queries such as a single partition streaming query.
/// </value>
internal bool EnableOptimisticDirectExecution { get; set; }

/// <summary>
/// Gets or sets the maximum number of items that can be buffered client side during
/// parallel query execution in the Azure Cosmos DB service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ private static async Task TestPositiveOptimisticDirectExecutionOutput(
QueryRequestOptions feedOptions = new QueryRequestOptions
{
MaxItemCount = -1,
TestSettings = GetTestInjections(simulate429s:false, simulateEmptyPages:false, enableOptimisticDirectExecution:true)
EnableOptimisticDirectExecution = true,
TestSettings = new TestInjections(simulate429s: false, simulateEmptyPages: false, new TestInjections.ResponseStats())
};

// check if pipeline returns empty continuation token
Expand Down Expand Up @@ -172,13 +173,11 @@ private static async Task TestPositiveOptimisticDirectExecutionOutput(
feedOptions = new QueryRequestOptions
{
MaxItemCount = pageSizeOptions[i],
PartitionKey = queryAndResults[j].PartitionKey == null
PartitionKey = queryAndResults[j].PartitionKey == null
? null
: new Cosmos.PartitionKey(queryAndResults[j].PartitionKey),
TestSettings = GetTestInjections(
simulate429s: false,
simulateEmptyPages: false,
enableOptimisticDirectExecution: queryAndResults[j].EnableOptimisticDirectExecution)
EnableOptimisticDirectExecution = queryAndResults[j].EnableOptimisticDirectExecution,
TestSettings = new TestInjections(simulate429s: false, simulateEmptyPages: false, new TestInjections.ResponseStats())
};

List<CosmosElement> items = await RunQueryAsync(
Expand Down Expand Up @@ -208,7 +207,8 @@ private static async Task TestNegativeOptimisticDirectExecutionOutput(
QueryRequestOptions feedOptions = new QueryRequestOptions
{
PartitionKey = new Cosmos.PartitionKey("/value"),
TestSettings = GetTestInjections(simulate429s: false, simulateEmptyPages: false, enableOptimisticDirectExecution: true)
EnableOptimisticDirectExecution = true,
TestSettings = new TestInjections(simulate429s: false, simulateEmptyPages: false, new TestInjections.ResponseStats())
};

// check if bad continuation queries and syntax error queries are handled by pipeline
Expand Down Expand Up @@ -241,15 +241,6 @@ await container.GetItemQueryIterator<Document>(
}
}

private static TestInjections GetTestInjections(bool simulate429s, bool simulateEmptyPages, bool enableOptimisticDirectExecution)
{
return new TestInjections(
simulate429s,
simulateEmptyPages,
enableOptimisticDirectExecution,
new TestInjections.ResponseStats());
}

private struct SinglePartitionWithContinuationsArgs
{
public int NumberOfDocuments;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ internal static async Task<List<T>> QueryWithCosmosElementContinuationTokenAsync
Properties = queryRequestOptions.Properties,
IsEffectivePartitionKeyRouting = queryRequestOptions.IsEffectivePartitionKeyRouting,
CosmosElementContinuationToken = queryRequestOptions.CosmosElementContinuationToken,
EnableOptimisticDirectExecution = queryRequestOptions.EnableOptimisticDirectExecution,
TestSettings = queryRequestOptions.TestSettings,
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,12 @@ public void NegativeOptimisticDirectExecutionOutput()
// This test confirms that TestInjection.EnableOptimisticDirectExection is set to false from default.
// Check test "TestPipelineForDistributedQueryAsync" to understand why this is done
[TestMethod]
public async Task TestDefaultTestInjectionSettingsAsync()
public async Task TestDefaultQueryRequestOptionsSettings()
{
TestInjections testInjection = new TestInjections(simulate429s: false, simulateEmptyPages: false);

Assert.AreEqual(testInjection.EnableOptimisticDirectExecution, false);
QueryRequestOptions requestOptions = new QueryRequestOptions();

Assert.AreEqual(requestOptions.EnableOptimisticDirectExecution, false);
}

// test checks that the pipeline can take a query to the backend and returns its associated document(s).
Expand Down Expand Up @@ -563,6 +564,7 @@ public override OptimisticDirectExecutionTestOutput ExecuteTest(OptimisticDirect
executionEnvironment: null,
returnResultsInDeterministicOrder: null,
forcePassthrough: false,
enableOptimisticDirectExecution: queryRequestOptions.EnableOptimisticDirectExecution,
testInjections: queryRequestOptions.TestSettings);

string databaseId = "db1234";
Expand All @@ -587,7 +589,8 @@ private static QueryRequestOptions GetQueryRequestOptions(bool enableOptimisticD
{
MaxConcurrency = 0,
MaxItemCount = 10,
TestSettings = new TestInjections(simulate429s: true, simulateEmptyPages: false, enableOptimisticDirectExecution: enableOptimisticDirectExecution, new TestInjections.ResponseStats()),
EnableOptimisticDirectExecution = enableOptimisticDirectExecution,
TestSettings = new TestInjections(simulate429s: true, simulateEmptyPages: false, new TestInjections.ResponseStats()),
Properties = new Dictionary<string, object>()
{
{ HttpConstants.HttpHeaders.EnumerationDirection, ""},
Expand Down

0 comments on commit 2608d4a

Please sign in to comment.