Skip to content

Commit

Permalink
Query: Fixes cancellation token support for the lazy + buffering path. (
Browse files Browse the repository at this point in the history
#2092)

* fixed passing in cancellation token

* missing

Co-authored-by: Samer Boshra <sboshra@microsoft.com>
  • Loading branch information
bchong95 and sboshra authored Jan 6, 2021
1 parent 30f1979 commit 77fcfe3
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ namespace Microsoft.Azure.Cosmos.Pagination
using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Net;
using System.Threading;
Expand All @@ -16,7 +15,6 @@ namespace Microsoft.Azure.Cosmos.Pagination
using Microsoft.Azure.Cosmos.Query.Core.Collections;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;

/// <summary>
/// Coordinates draining pages from multiple <see cref="PartitionRangePageAsyncEnumerator{TPage, TState}"/>, while maintaining a global sort order and handling repartitioning (splits, merge).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ public static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitioned
cosmosQueryContext,
inputParameters,
partitionedQueryExecutionInfo,
targetRanges);
targetRanges,
cancellationToken);
}

return tryCreatePipelineStage;
Expand Down Expand Up @@ -378,7 +379,8 @@ private static TryCatch<IQueryPipelineStage> TryCreateSpecializedDocumentQueryEx
CosmosQueryContext cosmosQueryContext,
InputParameters inputParameters,
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
List<Documents.PartitionKeyRange> targetRanges)
List<Documents.PartitionKeyRange> targetRanges,
CancellationToken cancellationToken)
{
QueryInfo queryInfo = partitionedQueryExecutionInfo.QueryInfo;

Expand Down Expand Up @@ -425,7 +427,7 @@ private static TryCatch<IQueryPipelineStage> TryCreateSpecializedDocumentQueryEx
pageSize: (int)optimalPageSize,
maxConcurrency: inputParameters.MaxConcurrency,
requestContinuationToken: inputParameters.InitialUserContinuationToken,
requestCancellationToken: default);
requestCancellationToken: cancellationToken);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ public override Task<FeedResponse<T>> ReadNextAsync(CancellationToken cancellati

public override async Task<FeedResponse<T>> ReadNextAsync(ITrace trace, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

if (trace == null)
{
throw new ArgumentNullException(nameof(trace));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos.EmulatorTests.Query
using System.Linq;
using System.Net;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Utf8;
using Microsoft.Azure.Cosmos.CosmosElements;
Expand Down Expand Up @@ -924,5 +925,84 @@ static async Task ImplementationAsync(Container container, IReadOnlyList<CosmosO
}
}
}

[TestMethod]
public async Task TestCancellationTokenAsync()
{
int seed = (int)(DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalSeconds;
uint numberOfDocuments = 100;
QueryOracleUtil util = new QueryOracle2(seed);
IEnumerable<string> inputDocuments = util.GetDocuments(numberOfDocuments);

await this.CreateIngestQueryDeleteAsync(
ConnectionModes.Direct,
CollectionTypes.MultiPartition,
inputDocuments,
ImplementationAsync);

static async Task ImplementationAsync(Container container, IReadOnlyList<CosmosObject> documents)
{
foreach (string query in new string[] { "SELECT c.id FROM c", "SELECT c._ts, c.id FROM c ORDER BY c._ts" })
{
QueryRequestOptions queryRequestOptions = new QueryRequestOptions
{
MaxBufferedItemCount = 7000,
MaxConcurrency = 10,
MaxItemCount = 10,
ReturnResultsInDeterministicOrder = true,
};

// See if cancellation token is honored for first request
try
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Cancel();
FeedIteratorInternal<CosmosElement> feedIterator = (FeedIteratorInternal<CosmosElement>)container.GetItemQueryIterator<CosmosElement>(
queryText: query,
requestOptions: queryRequestOptions);
await feedIterator.ReadNextAsync(cancellationTokenSource.Token);

Assert.Fail("Expected exception.");
}
catch (OperationCanceledException)
{
}

// See if cancellation token is honored for second request
try
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Cancel();
FeedIteratorInternal<CosmosElement> feedIterator = (FeedIteratorInternal<CosmosElement>)container.GetItemQueryIterator<CosmosElement>(
queryText: query,
requestOptions: queryRequestOptions);
await feedIterator.ReadNextAsync(default);
await feedIterator.ReadNextAsync(cancellationTokenSource.Token);

Assert.Fail("Expected exception.");
}
catch (OperationCanceledException)
{
}

// See if cancellation token is honored mid draining
try
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
FeedIteratorInternal<CosmosElement> feedIterator = (FeedIteratorInternal<CosmosElement>)container.GetItemQueryIterator<CosmosElement>(
queryText: query,
requestOptions: queryRequestOptions);
await feedIterator.ReadNextAsync(cancellationTokenSource.Token);
cancellationTokenSource.Cancel();
await feedIterator.ReadNextAsync(cancellationTokenSource.Token);

Assert.Fail("Expected exception.");
}
catch (OperationCanceledException)
{
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Pagination;
Expand Down

0 comments on commit 77fcfe3

Please sign in to comment.