Skip to content

Commit

Permalink
[Internal] Query: Fixes issue with distributed query GetItemQueryStre…
Browse files Browse the repository at this point in the history
…amIterator (#4798)

## Description

This change fixes an issue with GetItemQueryStreamIterator for
distributed query where containerRid was not getting set in the
responseHeaders, leading to an exception when building the response.

We must set the container resource id in the CosmosQueryContext, prior
to building the DistributedQueryPipelineStage.

## Type of change


- [] Bug fix (non-breaking change which fixes an issue)

---------

Co-authored-by: neildsh <35383880+neildsh@users.noreply.github.com>
  • Loading branch information
sc978345 and neildsh authored Oct 15, 2024
1 parent 72eed8a commit 9eb6087
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateCoreContextAsy
{
using (ITrace createQueryPipelineTrace = trace.StartChild("Create Query Pipeline", TraceComponent.Query, Tracing.TraceLevel.Info))
{
CosmosQueryClient cosmosQueryClient = cosmosQueryContext.QueryClient;

ContainerQueryProperties containerQueryProperties = await cosmosQueryClient.GetCachedContainerQueryPropertiesAsync(
cosmosQueryContext.ResourceLink,
inputParameters.PartitionKey,
createQueryPipelineTrace,
cancellationToken);
cosmosQueryContext.ContainerResourceId = containerQueryProperties.ResourceId;

if (inputParameters.EnableDistributedQueryGatewayMode &&
cosmosQueryContext.ResourceTypeEnum == Documents.ResourceType.Document &&
cosmosQueryContext.OperationTypeEnum == Documents.OperationType.Query)
Expand Down Expand Up @@ -152,15 +161,6 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateCoreContextAsy
}
}

CosmosQueryClient cosmosQueryClient = cosmosQueryContext.QueryClient;

ContainerQueryProperties containerQueryProperties = await cosmosQueryClient.GetCachedContainerQueryPropertiesAsync(
cosmosQueryContext.ResourceLink,
inputParameters.PartitionKey,
createQueryPipelineTrace,
cancellationToken);
cosmosQueryContext.ContainerResourceId = containerQueryProperties.ResourceId;

Documents.PartitionKeyRange targetRange = await TryGetTargetRangeOptimisticDirectExecutionAsync(
inputParameters,
queryPlanFromContinuationToken,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ namespace Microsoft.Azure.Cosmos.EmulatorTests.Query
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.VisualStudio.TestTools.UnitTesting;

[TestClass]
Expand Down Expand Up @@ -135,6 +137,31 @@ public void TestDistributedQueryGatewayModeOverride()
Assert.IsNull(Environment.GetEnvironmentVariable(ConfigurationManager.DistributedQueryGatewayModeEnabled));
}

[TestMethod]
public async Task StreamIteratorTestsAsync()
{
static Task ImplementationAsync(Container container, IReadOnlyList<CosmosObject> _)
{
int[] pageSizes = new[] { DocumentCount };

TestCase[] testCases = new[]
{
MakeTest(
"SELECT VALUE c.x FROM c WHERE c.x < 200",
pageSizes,
Expectations.AllDocumentsLessThan200ArePresent),
};

return RunStreamIteratorTestsAsync(container, testCases);
}

await this.CreateIngestQueryDeleteAsync(
ConnectionModes.Gateway,
CollectionTypes.SinglePartition | CollectionTypes.MultiPartition,
CreateDocuments(DocumentCount),
ImplementationAsync);
}

private static async Task RunPartitionedParityTestsAsync(Container container, IEnumerable<string> testCases)
{
IReadOnlyList<FeedRange> feedRanges = await container.GetFeedRangesAsync();
Expand Down Expand Up @@ -201,7 +228,7 @@ private static async Task ContinuationTestsAsync(Container container, IEnumerabl
foreach (int pageSize in testCase.PageSizes)
{
List<int> results = await RunContinuationBasedQueryTestAsync(container, testCase.Query, pageSize);
testCase.ValidateResult(results);
Assert.IsTrue(testCase.ValidateResult(results));
}
}
}
Expand Down Expand Up @@ -260,7 +287,55 @@ private static async Task RunTestsAsync(Container container, IEnumerable<TestCas
options,
QueryDrainingMode.HoldState | QueryDrainingMode.ContinuationToken);

testCase.ValidateResult(results);
Assert.IsTrue(testCase.ValidateResult(results));
}
}
}

private static async Task RunStreamIteratorTestsAsync(Container container, IEnumerable<TestCase> testCases)
{
foreach (TestCase testCase in testCases)
{
foreach (int pageSize in testCase.PageSizes)
{
QueryRequestOptions options = new QueryRequestOptions()
{
MaxItemCount = pageSize,
EnableDistributedQueryGatewayMode = true,
};

List<int> extractedResults = new List<int>();
await foreach (ResponseMessage response in RunSimpleQueryAsync(
container,
testCase.Query,
options))
{
Assert.AreEqual(System.Net.HttpStatusCode.OK, response.StatusCode);

using (MemoryStream memoryStream = new MemoryStream())
{
response.Content.CopyTo(memoryStream);
byte[] content = memoryStream.ToArray();

IJsonNavigator navigator = JsonNavigator.Create(content);
IJsonNavigatorNode rootNode = navigator.GetRootNode();

Assert.IsTrue(navigator.TryGetObjectProperty(rootNode, "_rid", out ObjectProperty ridProperty));
string rid = navigator.GetStringValue(ridProperty.ValueNode);
Assert.IsTrue(rid.Length > 0);

Assert.IsTrue(navigator.TryGetObjectProperty(rootNode, "Documents", out ObjectProperty documentsProperty));
IEnumerable<IJsonNavigatorNode> arrayItems = navigator.GetArrayItems(documentsProperty.ValueNode);
foreach (IJsonNavigatorNode node in arrayItems)
{
Assert.AreEqual(JsonNodeType.Number64, navigator.GetNodeType(node));

extractedResults.Add((int)Number64.ToLong(navigator.GetNumber64Value(node)));
}
}
}

Assert.IsTrue(testCase.ValidateResult(extractedResults));
}
}
}
Expand Down

0 comments on commit 9eb6087

Please sign in to comment.