Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal ReadFeed: Adds pagination library adoption #1947

Merged
merged 22 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions Microsoft.Azure.Cosmos/src/Pagination/DocumentContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace Microsoft.Azure.Cosmos.Pagination
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline;
using Microsoft.Azure.Cosmos.ReadFeed.Pagination;
using Microsoft.Azure.Documents;

/// <summary>
Expand Down Expand Up @@ -83,24 +84,40 @@ public Task<Record> ReadItemAsync(
cancellationToken),
cancellationToken);

public Task<TryCatch<DocumentContainerPage>> MonadicReadFeedAsync(
public Task<TryCatch<ReadFeedPage>> MonadicReadFeedAsync(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please run the MockedItemBenchmark.ReadFeed benchmark

ReadFeedState readFeedState,
FeedRangeInternal feedRange,
ResourceId resourceIdentifer,
QueryDefinition queryDefinition,
QueryRequestOptions queryRequestOptions,
string resourceLink,
ResourceType resourceType,
int pageSize,
CancellationToken cancellationToken) => this.monadicDocumentContainer.MonadicReadFeedAsync(
readFeedState,
feedRange,
resourceIdentifer,
queryDefinition,
queryRequestOptions,
resourceLink,
resourceType,
pageSize,
cancellationToken);

public Task<DocumentContainerPage> ReadFeedAsync(
public Task<ReadFeedPage> ReadFeedAsync(
ReadFeedState readFeedState,
FeedRangeInternal feedRange,
ResourceId resourceIdentifier,
QueryDefinition queryDefinition,
QueryRequestOptions queryRequestOptions,
string resourceLink,
ResourceType resourceType,
int pageSize,
CancellationToken cancellationToken) => TryCatch<DocumentContainerPage>.UnsafeGetResultAsync(
CancellationToken cancellationToken) => TryCatch<ReadFeedPage>.UnsafeGetResultAsync(
this.MonadicReadFeedAsync(
readFeedState,
feedRange,
resourceIdentifier,
queryDefinition,
queryRequestOptions,
resourceLink,
resourceType,
pageSize,
cancellationToken),
cancellationToken);
Expand Down
22 changes: 0 additions & 22 deletions Microsoft.Azure.Cosmos/src/Pagination/DocumentContainerPage.cs

This file was deleted.

18 changes: 0 additions & 18 deletions Microsoft.Azure.Cosmos/src/Pagination/DocumentContainerState.cs

This file was deleted.

10 changes: 2 additions & 8 deletions Microsoft.Azure.Cosmos/src/Pagination/IDocumentContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ namespace Microsoft.Azure.Cosmos.Pagination
using Microsoft.Azure.Cosmos.ChangeFeed.Pagination;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Cosmos.ReadFeed.Pagination;

internal interface IDocumentContainer : IMonadicDocumentContainer, IFeedRangeProvider, IQueryDataSource, IChangeFeedDataSource
internal interface IDocumentContainer : IMonadicDocumentContainer, IFeedRangeProvider, IQueryDataSource, IReadFeedDataSource, IChangeFeedDataSource
{
Task<Record> CreateItemAsync(
CosmosObject payload,
Expand All @@ -22,12 +22,6 @@ Task<Record> ReadItemAsync(
string identifier,
CancellationToken cancellationToken);

Task<DocumentContainerPage> ReadFeedAsync(
FeedRangeInternal feedRange,
ResourceId resourceIdentifier,
int pageSize,
CancellationToken cancellationToken);

Task SplitAsync(
FeedRangeInternal feedRange,
CancellationToken cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ namespace Microsoft.Azure.Cosmos.Pagination
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Cosmos.ReadFeed.Pagination;

internal interface IMonadicDocumentContainer : IMonadicFeedRangeProvider, IMonadicQueryDataSource, IMonadicChangeFeedDataSource
internal interface IMonadicDocumentContainer : IMonadicFeedRangeProvider, IMonadicQueryDataSource, IMonadicReadFeedDataSource, IMonadicChangeFeedDataSource
{
Task<TryCatch<Record>> MonadicCreateItemAsync(
CosmosObject payload,
Expand All @@ -23,12 +23,6 @@ Task<TryCatch<Record>> MonadicReadItemAsync(
string identifer,
CancellationToken cancellationToken);

Task<TryCatch<DocumentContainerPage>> MonadicReadFeedAsync(
FeedRangeInternal feedRange,
ResourceId resourceIdentifer,
int pageSize,
CancellationToken cancellationToken);

Task<TryCatch> MonadicSplitAsync(
FeedRangeInternal feedRange,
CancellationToken cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,24 @@ namespace Microsoft.Azure.Cosmos.Pagination
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline;
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
using Microsoft.Azure.Cosmos.ReadFeed.Pagination;
using Microsoft.Azure.Documents;

internal sealed class NetworkAttachedDocumentContainer : IMonadicDocumentContainer
{
private readonly ContainerCore container;
private readonly ContainerInternal container;
private readonly CosmosQueryClient cosmosQueryClient;
private readonly CosmosClientContext cosmosClientContext;
private readonly QueryRequestOptions queryRequestOptions;
private readonly CosmosDiagnosticsContext diagnosticsContext;

public NetworkAttachedDocumentContainer(
ContainerCore container,
ContainerInternal container,
CosmosQueryClient cosmosQueryClient,
CosmosClientContext cosmosClientContext,
CosmosDiagnosticsContext diagnosticsContext,
QueryRequestOptions queryRequestOptions = null)
{
this.container = container ?? throw new ArgumentNullException(nameof(container));
this.cosmosQueryClient = cosmosQueryClient ?? throw new ArgumentNullException(nameof(cosmosQueryClient));
this.cosmosClientContext = cosmosClientContext ?? throw new ArgumentNullException(nameof(cosmosClientContext));
this.diagnosticsContext = diagnosticsContext;
this.queryRequestOptions = queryRequestOptions;
}
Expand Down Expand Up @@ -91,7 +89,7 @@ public async Task<TryCatch<List<FeedRangeEpk>>> MonadicGetChildRangeAsync(
{
try
{
ContainerProperties containerProperties = await this.cosmosClientContext.GetCachedContainerPropertiesAsync(
ContainerProperties containerProperties = await this.container.ClientContext.GetCachedContainerPropertiesAsync(
this.container.LinkUri,
cancellationToken);
List<PartitionKeyRange> overlappingRanges = await this.cosmosQueryClient.GetTargetPartitionKeyRangeByFeedRangeAsync(
Expand All @@ -113,13 +111,70 @@ await this.container.GetRIDAsync(cancellationToken),
}
}

public Task<TryCatch<DocumentContainerPage>> MonadicReadFeedAsync(
public async Task<TryCatch<ReadFeedPage>> MonadicReadFeedAsync(
ReadFeedState readFeedState,
FeedRangeInternal feedRange,
ResourceId resourceIdentifer,
QueryDefinition queryDefinition,
QueryRequestOptions queryRequestOptions,
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
string resourceLink,
ResourceType resourceType,
int pageSize,
CancellationToken cancellationToken)
{
throw new NotImplementedException();
cancellationToken.ThrowIfCancellationRequested();

ResponseMessage responseMessage = await this.container.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: resourceLink,
resourceType: resourceType,
operationType: OperationType.ReadFeed,
requestOptions: queryRequestOptions,
cosmosContainerCore: this.container,
requestEnricher: request =>
{
if (readFeedState != null)
{
request.Headers.ContinuationToken = (readFeedState.ContinuationToken as CosmosString).Value;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to guard against any potential nullref by checking if the continuation is a CosmosString in the upper if?

if (!(readFeedState.ContinuationToken is CosmosNull) && readFeedState.Continuation is CosmosString continuation)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add polymorphism to the type.


In reply to: 516222386 [](ancestors = 516222386)

}

feedRange.Accept(FeedRangeRequestMessagePopulatorVisitor.Singleton, request);
request.Headers.PageSize = pageSize.ToString();

if (queryDefinition != null)
{
request.Headers.Add(HttpConstants.HttpHeaders.ContentType, RuntimeConstants.MediaTypes.QueryJson);
request.Headers.Add(HttpConstants.HttpHeaders.IsQuery, bool.TrueString);
}
},
partitionKey: queryRequestOptions.PartitionKey,
streamPayload: default,
diagnosticsContext: default,
cancellationToken: cancellationToken);

TryCatch<ReadFeedPage> monadicReadFeedPage;
if (responseMessage.StatusCode == HttpStatusCode.OK)
{
ReadFeedPage readFeedPage = new ReadFeedPage(
responseMessage.Content,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.ActivityId,
new ReadFeedState(CosmosString.Create(responseMessage.Headers.ETag)));

monadicReadFeedPage = TryCatch<ReadFeedPage>.FromResult(readFeedPage);
}
else
{
CosmosException cosmosException = new CosmosException(
responseMessage.ErrorMessage,
statusCode: responseMessage.StatusCode,
(int)responseMessage.Headers.SubStatusCode,
responseMessage.Headers.ActivityId,
responseMessage.Headers.RequestCharge);
cosmosException.Headers.ContinuationToken = responseMessage.Headers.ContinuationToken;

monadicReadFeedPage = TryCatch<ReadFeedPage>.FromException(cosmosException);
}

return monadicReadFeedPage;
}

public async Task<TryCatch<QueryPage>> MonadicQueryAsync(
Expand All @@ -135,7 +190,7 @@ public async Task<TryCatch<QueryPage>> MonadicQueryAsync(
{
case FeedRangePartitionKey feedRangePartitionKey:
{
ContainerProperties containerProperties = await this.cosmosClientContext.GetCachedContainerPropertiesAsync(
ContainerProperties containerProperties = await this.container.ClientContext.GetCachedContainerPropertiesAsync(
this.container.LinkUri,
cancellationToken);
PartitionKeyDefinition partitionKeyDefinition = await this.container.GetPartitionKeyDefinitionAsync(cancellationToken);
Expand Down Expand Up @@ -202,7 +257,7 @@ await this.container.GetRIDAsync(cancellationToken),

case FeedRangeEpk feedRangeEpk:
{
ContainerProperties containerProperties = await this.cosmosClientContext.GetCachedContainerPropertiesAsync(
ContainerProperties containerProperties = await this.container.ClientContext.GetCachedContainerPropertiesAsync(
this.container.LinkUri,
cancellationToken);
List<PartitionKeyRange> overlappingRanges = await this.cosmosQueryClient.GetTargetPartitionKeyRangeByFeedRangeAsync(
Expand Down Expand Up @@ -260,7 +315,7 @@ public async Task<TryCatch<ChangeFeedPage>> MonadicChangeFeedAsync(
if (feedRange is FeedRangeEpk feedRangeEpk)
{
// convert into physical range or throw a split exception
ContainerProperties containerProperties = await this.cosmosClientContext.GetCachedContainerPropertiesAsync(
ContainerProperties containerProperties = await this.container.ClientContext.GetCachedContainerPropertiesAsync(
this.container.LinkUri,
cancellationToken);
List<PartitionKeyRange> overlappingRanges = await this.cosmosQueryClient.GetTargetPartitionKeyRangeByFeedRangeAsync(
Expand All @@ -285,7 +340,7 @@ await this.container.GetRIDAsync(cancellationToken),
feedRange = new FeedRangePartitionKeyRange(overlappingRanges[0].Id);
}

ResponseMessage responseMessage = await this.cosmosClientContext.ProcessResourceOperationStreamAsync(
ResponseMessage responseMessage = await this.container.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: this.container.LinkUri,
resourceType: ResourceType.Document,
operationType: OperationType.ReadFeed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ public static QueryIterator Create(
NetworkAttachedDocumentContainer networkAttachedDocumentContainer = new NetworkAttachedDocumentContainer(
containerCore,
client,
clientContext,
queryPipelineCreationDiagnostics,
queryRequestOptions);
DocumentContainer documentContainer = new DocumentContainer(networkAttachedDocumentContainer);
Expand Down
Loading