Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Query: Adds Split Support for Ode #3572

Merged
merged 41 commits into from
Jan 20, 2023
Merged
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
1dbfc8a
Added tests to test different aspects of merge/split support with Opt…
akotalwar Oct 19, 2022
2fc0b89
Added gone exception simulation tests.
akotalwar Oct 26, 2022
36d18bb
Added new tests and improved test infra
akotalwar Nov 4, 2022
924b24a
Removed ParalleContEvocation test. Fixed comments
akotalwar Nov 9, 2022
095a44d
Removed CreateParallelCrossPartitionPipelineStateAsync() as it is not…
akotalwar Nov 9, 2022
9115fcf
Removed while loop in CreateDocumentContainerAsync()
akotalwar Nov 10, 2022
c054909
Fixed comments.
akotalwar Nov 11, 2022
5807da9
Merge branch 'master' into users/akotalwar/TestsForMergeSplitSupport
akotalwar Nov 11, 2022
bfb4110
Updated ExecuteGoneExceptionOnODEPipeline()
akotalwar Nov 12, 2022
ea0aac4
Added type Assert for ExecuteGoneExceptionOnODEPipeline()
akotalwar Nov 12, 2022
b886f40
Replaced try-catch with if statement in MoveNextAsync()
akotalwar Nov 17, 2022
04389e0
Added delegate to access TryCreateCoreContextAsync()
akotalwar Nov 30, 2022
4af2036
Added check to confirm Ode pipeline is not called in fallback plan
akotalwar Nov 30, 2022
1257ad3
Updated method name from OptimisticDirectExecutionContext() to TryCre…
akotalwar Nov 30, 2022
bd4c195
Using delegate instead of Func<>.
akotalwar Dec 1, 2022
661b4f8
Ode fallback plan always calls Specialized pipeline
akotalwar Dec 2, 2022
039fbe8
Using ServiceInterop/Gateway to get QueryPlan for Specialized Pipeline
akotalwar Dec 7, 2022
60392ad
Added new test to check handling of failing fallback pipeline
akotalwar Dec 18, 2022
f9fbd8a
Code cleanup
akotalwar Dec 19, 2022
c230837
Added logic for handling non ODE continuation tokens
akotalwar Dec 21, 2022
c66be0f
Moved delegate away from member variables
akotalwar Dec 21, 2022
758c07f
Added tests for Merge case
akotalwar Dec 22, 2022
d442b99
Updated method names
akotalwar Dec 28, 2022
d1dcd7e
Added checks for tryCatch
akotalwar Dec 30, 2022
294d173
Updated SetCancellationToken() to use Try
akotalwar Jan 3, 2023
0621090
Updated TryUnwrapContinuationToken()
akotalwar Jan 4, 2023
87c9916
Removed changes in FlakyDocumentContainer.cs
akotalwar Jan 6, 2023
c4b0546
Removed unused imports
akotalwar Jan 6, 2023
3de111c
Updated comments
akotalwar Jan 6, 2023
7543ec7
Fixed comments and cleaned up test code
akotalwar Jan 9, 2023
134dcc8
Added CosmosElement null check in TryUnwrapContinuationToken()
akotalwar Jan 9, 2023
9d37c12
Removed FlakyDocumentContainer.cs from pull request
akotalwar Jan 9, 2023
a009e53
Removed unused imports
akotalwar Jan 9, 2023
400d4f5
Updated TryUnwrapContinuationToken()
akotalwar Jan 9, 2023
6179112
Update MoveNextAsync() call in OptimisticDirectExecutionQueryBaseline…
akotalwar Jan 10, 2023
c94db62
Made MergeTestUtil.IsFailedFallbackPipelineTest a readonly property
akotalwar Jan 11, 2023
2416a89
Added IsPartitionSplitException() overload to take CosmosElement
akotalwar Jan 13, 2023
bc8fa42
Resolved git conflicts
akotalwar Jan 13, 2023
c129afe
Fixed bug regarding syntax error queries
akotalwar Jan 18, 2023
a68dd2c
Merge branch 'master' into users/akotalwar/SplitSupport
akotalwar Jan 18, 2023
0d5390e
Merge branch 'master' into users/akotalwar/SplitSupport
akotalwar Jan 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added gone exception simulation tests.
  • Loading branch information
akotalwar committed Oct 26, 2022
commit 2fc0b89e55ddcbebc59a9277f75ed500fd1558ef
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public async Task TestPipelineSingleContinuationToken()
[TestMethod]
public async Task TestPipelineForBackendDocumentsOnSinglePartitionAsync()
{
int numItems = 10;
int numItems = 100;
string query = "SELECT VALUE COUNT(1) FROM c";
DocumentContainer inMemoryCollection = await CreateDocumentContainerAsync(numItems, multiPartition: false);
IQueryPipelineStage queryPipelineStage = await CreateOptimisticDirectExecutionPipelineStateAsync(inMemoryCollection, query, continuationToken: null);
Expand All @@ -160,7 +160,7 @@ public async Task TestPipelineForBackendDocumentsOnSinglePartitionAsync()
documentCountInSinglePartition += Int32.Parse(tryGetPage.Result.Documents[0].ToString());
}

Assert.AreEqual(documentCountInSinglePartition, 10);
Assert.AreEqual(documentCountInSinglePartition, 100);
}

// test checks that the pipeline can take a query to the backend and returns its associated document(s) + continuation token.
Expand Down Expand Up @@ -199,9 +199,58 @@ public async Task TestPipelineForContinuationTokenOnSinglePartitionAsync()

// test to check if pipeline handles a 410 exception properly and returns all the documents.
// it creates a gone exception after the first MoveNexyAsync() call. This allows for the pipeline to return some documents before failing
// TODO: With the addition of the merge/split supprt, this queryPipelineStage should be able to return all documents regardless of a gone exception happening
// TODO: With the addition of the merge/split support, this queryPipelineStage should be able to return all documents regardless of a gone exception happening
[TestMethod]
public async Task TestPipelineForGoneExceptionAsync()
public async Task TestPipelineForGoneExceptionSinglePartitionAsync()
{
int numItems = 100;
string query = "SELECT * FROM c";
List<CosmosElement> documents = new List<CosmosElement>();
string errorMessage = $"Epk Range: Partition does not exist at the given range.";
CosmosException goneException = new CosmosException(
message: errorMessage,
statusCode: System.Net.HttpStatusCode.Gone,
subStatusCode: (int)SubStatusCodes.PartitionKeyRangeGone,
activityId: Guid.NewGuid().ToString(),
requestCharge: default);

int moveNextAsyncCounter = 0;
DocumentContainer inMemoryCollection = await CreateDocumentContainerAsync(
numItems,
multiPartition: false,
failureConfigs: new FlakyDocumentContainer.FailureConfigs(
inject429s: false,
injectEmptyPages: false,
shouldReturnFailure: () => Task.FromResult<Exception>(moveNextAsyncCounter != 1 ? null : goneException)));

IQueryPipelineStage queryPipelineStage = await CreateOptimisticDirectExecutionPipelineStateAsync(inMemoryCollection, query, continuationToken: null);

while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton))
{
moveNextAsyncCounter++;
try
{
TryCatch<QueryPage> tryGetPage = queryPipelineStage.Current;
tryGetPage.ThrowIfFailed();

documents.AddRange(tryGetPage.Result.Documents);
}
catch
{
// check if gone exception is handled properly
Assert.IsTrue(queryPipelineStage.Current.Failed);
Assert.AreEqual(queryPipelineStage.Current.InnerMostException.Message, errorMessage);
Assert.AreEqual(((CosmosException)queryPipelineStage.Current.InnerMostException).StatusCode, System.Net.HttpStatusCode.Gone);
break;
}
}
// Once fallback plan is implemented, this test should be able to return all 100 documents
Assert.AreEqual(documents.Count, 10);
}

// test finds out pipeline response to a gone exception on multiple partitions
[TestMethod]
public async Task TestPipelineForGoneExceptionMultiplePartitionsAsync()
{
int numItems = 100;
string query = "SELECT * FROM c";
Expand All @@ -221,7 +270,7 @@ public async Task TestPipelineForGoneExceptionAsync()
failureConfigs: new FlakyDocumentContainer.FailureConfigs(
inject429s: false,
injectEmptyPages: false,
shouldReturnFailure: ()=> Task.FromResult<Exception>(moveNextAsyncCounter == 0 ? null: goneException)));
shouldReturnFailure: () => Task.FromResult<Exception>(moveNextAsyncCounter != 1 ? null : goneException)));

IQueryPipelineStage queryPipelineStage = await CreateOptimisticDirectExecutionPipelineStateAsync(inMemoryCollection, query, continuationToken: null);

Expand All @@ -244,9 +293,45 @@ public async Task TestPipelineForGoneExceptionAsync()
break;
}
}
// Once fallback plan is implemented, this test should be able to return all 100 documents
Assert.AreEqual(documents.Count, 10);
}

// Start with ODE pipeline and then switch to parallel in a single partition scenario.
// This is to simulate whether a fallback solution would work and provide client with all the required documents
[TestMethod]
public async Task TestPipelineSwitchForContinuationTokenOnSinglePartitionAsync()
{
int numItems = 100;
string query = "SELECT * FROM c";
DocumentContainer inMemoryCollection = await CreateDocumentContainerAsync(numItems, multiPartition: false);
IQueryPipelineStage queryPipelineStage = await CreateOptimisticDirectExecutionPipelineStateAsync(inMemoryCollection, query, continuationToken: null);
List<CosmosElement> documents = new List<CosmosElement>();
int continuationTokenCount = 0;

while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton))
{
TryCatch<QueryPage> tryGetPage = queryPipelineStage.Current;
tryGetPage.ThrowIfFailed();

documents.AddRange(tryGetPage.Result.Documents);

if (tryGetPage.Result.State == null)
{
break;
}
else
{
queryPipelineStage = await CreateParallelCrossPartitionPipelineStateAsync(inMemoryCollection, query, continuationToken: tryGetPage.Result.State.Value);
}

continuationTokenCount++;
}

Assert.AreEqual(continuationTokenCount, 10);
Assert.AreEqual(documents.Count, 100);
}

// Start with ODE pipeline and then switch to parallel in a cross partition scenario.
// This is to simulate whether a fallback solution would work and provide client with all the required documents
[TestMethod]
Expand Down Expand Up @@ -713,8 +798,8 @@ public override Task<List<PartitionKeyRange>> GetTargetPartitionKeyRangesAsync(s
{
return Task.FromResult(new List<PartitionKeyRange>{new PartitionKeyRange()
{
MinInclusive = PartitionKeyHash.V2.Hash("abc").ToString(),
MaxExclusive = PartitionKeyHash.V2.Hash("def").ToString()
MinInclusive = PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey,
MaxExclusive = PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey
}
});
}
Expand Down