Skip to content

Commit

Permalink
Preview of Bulk Stream (#741)
Browse files Browse the repository at this point in the history
* creating executor

* Adding public API

* Updating contract

* Adding ToResponseMessage

* Wiring to executor

* Mockable executor

* Itemrequestoptions to batchitemrequestoptions

* cleanup

* Tests

* Wiring to executor

* Unit tests on executor

* Emulator tests on container

* executor retry handler

* Retry tests

* Handler tests

* under preview

* else

* Upsert support

* Using ResourceThrottleRetryPolicy

* Generating new context on retry

* Correctly obtaining retrypolicy

* Fixing unrelated test

* Removing unneeded constants

* Fixing conflict tests

* Replace and Delete support

* Adding Read support

* private instead of internal

* Fix for Read and DeleteItemAsync

* Typed API support

* Rename property

* Rename

* Refactor on ClientContext

* Refactoring through policies and batcher

* ItemId is required

* Undo unnecessary changes

* Rename

* Undoing comment

* Description

* Renaming variable

* Fixing test

* Rename
  • Loading branch information
ealsur authored and kirankumarkolli committed Sep 5, 2019
1 parent b2f8f82 commit 272aa1e
Show file tree
Hide file tree
Showing 26 changed files with 1,318 additions and 52 deletions.
21 changes: 10 additions & 11 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,22 +152,21 @@ public virtual bool TryAdd(ItemBatchOperation operation)
try
{
PartitionKeyRangeBatchExecutionResult result = await this.executor(serverRequest, cancellationToken);

if (result.IsSplit())
{
foreach (ItemBatchOperation operationToRetry in result.Operations)
{
await this.retrier(operationToRetry, cancellationToken);
}

return;
}

using (PartitionKeyRangeBatchResponse batchResponse = new PartitionKeyRangeBatchResponse(serverRequest.Operations.Count, result.ServerResponse, this.cosmosSerializer))
{
foreach (ItemBatchOperation itemBatchOperation in batchResponse.Operations)
{
BatchOperationResult response = batchResponse[itemBatchOperation.OperationIndex];
if (!response.IsSuccessStatusCode)
{
Documents.ShouldRetryResult shouldRetry = await itemBatchOperation.Context.ShouldRetryAsync(response, cancellationToken);
if (shouldRetry.ShouldRetry)
{
await this.retrier(itemBatchOperation, cancellationToken);
continue;
}
}

itemBatchOperation.Context.Complete(this, response);
}
}
Expand Down
29 changes: 23 additions & 6 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace Microsoft.Azure.Cosmos
/// <seealso cref="BatchAsyncStreamer"/>
internal class BatchAsyncContainerExecutor : IDisposable
{
private const int DefaultDispatchTimer = 10;
private const int DefaultDispatchTimerInSeconds = 1;
private const int MinimumDispatchTimerInSeconds = 1;

private readonly ContainerCore cosmosContainer;
Expand All @@ -36,13 +36,21 @@ internal class BatchAsyncContainerExecutor : IDisposable
private readonly ConcurrentDictionary<string, BatchAsyncStreamer> streamersByPartitionKeyRange = new ConcurrentDictionary<string, BatchAsyncStreamer>();
private readonly ConcurrentDictionary<string, SemaphoreSlim> limitersByPartitionkeyRange = new ConcurrentDictionary<string, SemaphoreSlim>();
private readonly TimerPool timerPool;
private readonly RetryOptions retryOptions;

/// <summary>
/// For unit testing.
/// </summary>
internal BatchAsyncContainerExecutor()
{
}

public BatchAsyncContainerExecutor(
ContainerCore cosmosContainer,
CosmosClientContext cosmosClientContext,
int maxServerRequestOperationCount,
int maxServerRequestBodyLength,
int dispatchTimerInSeconds = BatchAsyncContainerExecutor.DefaultDispatchTimer)
int dispatchTimerInSeconds = BatchAsyncContainerExecutor.DefaultDispatchTimerInSeconds)
{
if (cosmosContainer == null)
{
Expand Down Expand Up @@ -70,9 +78,10 @@ public BatchAsyncContainerExecutor(
this.maxServerRequestOperationCount = maxServerRequestOperationCount;
this.dispatchTimerInSeconds = dispatchTimerInSeconds;
this.timerPool = new TimerPool(BatchAsyncContainerExecutor.MinimumDispatchTimerInSeconds);
this.retryOptions = cosmosClientContext.ClientOptions.GetConnectionPolicy().RetryOptions;
}

public async Task<BatchOperationResult> AddAsync(
public virtual async Task<BatchOperationResult> AddAsync(
ItemBatchOperation operation,
ItemRequestOptions itemRequestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
Expand All @@ -86,10 +95,10 @@ public async Task<BatchOperationResult> AddAsync(

string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId);
ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.retryOptions));
operation.AttachContext(context);
streamer.Add(operation);
return await context.Task;
return await context.OperationTask;
}

public void Dispose()
Expand All @@ -107,7 +116,7 @@ public void Dispose()
this.timerPool.Dispose();
}

internal async Task ValidateOperationAsync(
internal virtual async Task ValidateOperationAsync(
ItemBatchOperation operation,
ItemRequestOptions itemRequestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
Expand Down Expand Up @@ -135,6 +144,14 @@ internal async Task ValidateOperationAsync(
}
}

private static IDocumentClientRetryPolicy GetRetryPolicy(RetryOptions retryOptions)
{
return new BulkPartitionKeyRangeGoneRetryPolicy(
new ResourceThrottleRetryPolicy(
retryOptions.MaxRetryAttemptsOnThrottledRequests,
retryOptions.MaxRetryWaitTimeInSeconds));
}

private static bool ValidateOperationEPK(
ItemBatchOperation operation,
ItemRequestOptions itemRequestOptions)
Expand Down
17 changes: 17 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/BatchItemRequestOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,22 @@ class BatchItemRequestOptions : RequestOptions
/// <seealso cref="Microsoft.Azure.Cosmos.IndexingPolicy"/>
/// <seealso cref="IndexingDirective"/>
public IndexingDirective? IndexingDirective { get; set; }

internal static BatchItemRequestOptions FromItemRequestOptions(ItemRequestOptions itemRequestOptions)
{
if (itemRequestOptions == null)
{
return null;
}

RequestOptions requestOptions = itemRequestOptions as RequestOptions;
BatchItemRequestOptions batchItemRequestOptions = new BatchItemRequestOptions();
batchItemRequestOptions.IndexingDirective = itemRequestOptions.IndexingDirective;
batchItemRequestOptions.IfMatchEtag = requestOptions.IfMatchEtag;
batchItemRequestOptions.IfNoneMatchEtag = requestOptions.IfNoneMatchEtag;
batchItemRequestOptions.Properties = requestOptions.Properties;
batchItemRequestOptions.IsEffectivePartitionKeyRouting = requestOptions.IsEffectivePartitionKeyRouting;
return batchItemRequestOptions;
}
}
}
10 changes: 10 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/BatchOperationResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,16 @@ private static Result ReadOperationResult(ref RowReader reader, out BatchOperati

return Result.Success;
}

internal ResponseMessage ToResponseMessage()
{
ResponseMessage responseMessage = new ResponseMessage(this.StatusCode);
responseMessage.Headers.SubStatusCode = this.SubStatusCode;
responseMessage.Headers.ETag = this.ETag;
responseMessage.Headers.RetryAfter = this.RetryAfter;
responseMessage.Content = this.ResourceStream;
return responseMessage;
}
}

/// <summary>
Expand Down
28 changes: 26 additions & 2 deletions Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Documents;

/// <summary>
/// Context for a particular Batch operation.
Expand All @@ -17,13 +19,35 @@ internal class ItemBatchOperationContext : IDisposable

public BatchAsyncBatcher CurrentBatcher { get; set; }

public Task<BatchOperationResult> Task => this.taskCompletionSource.Task;
public Task<BatchOperationResult> OperationTask => this.taskCompletionSource.Task;

private readonly IDocumentClientRetryPolicy retryPolicy;

private TaskCompletionSource<BatchOperationResult> taskCompletionSource = new TaskCompletionSource<BatchOperationResult>();

public ItemBatchOperationContext(string partitionKeyRangeId)
public ItemBatchOperationContext(
string partitionKeyRangeId,
IDocumentClientRetryPolicy retryPolicy = null)
{
this.PartitionKeyRangeId = partitionKeyRangeId;
this.retryPolicy = retryPolicy;
}

/// <summary>
/// Based on the Retry Policy, if a failed response should retry.
/// </summary>
public Task<ShouldRetryResult> ShouldRetryAsync(
BatchOperationResult batchOperationResult,
CancellationToken cancellationToken)
{
if (this.retryPolicy == null
|| batchOperationResult.IsSuccessStatusCode)
{
return Task.FromResult(ShouldRetryResult.NoRetry());
}

ResponseMessage responseMessage = batchOperationResult.ToResponseMessage();
return this.retryPolicy.ShouldRetryAsync(responseMessage, cancellationToken);
}

public void Complete(
Expand Down
96 changes: 96 additions & 0 deletions Microsoft.Azure.Cosmos/src/BulkPartitionKeyRangeGoneRetryPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;

/// <summary>
/// Used only in the context of Bulk Stream operations.
/// </summary>
/// <see cref="BatchAsyncBatcher"/>
/// <see cref="ItemBatchOperationContext"/>
internal sealed class BulkPartitionKeyRangeGoneRetryPolicy : IDocumentClientRetryPolicy
{
private const int MaxRetries = 1;

private readonly IDocumentClientRetryPolicy nextRetryPolicy;

private int retriesAttempted;

public BulkPartitionKeyRangeGoneRetryPolicy(IDocumentClientRetryPolicy nextRetryPolicy)
{
this.nextRetryPolicy = nextRetryPolicy;
}

public Task<ShouldRetryResult> ShouldRetryAsync(
Exception exception,
CancellationToken cancellationToken)
{
DocumentClientException clientException = exception as DocumentClientException;

ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal(
clientException?.StatusCode,
clientException?.GetSubStatus(),
clientException?.ResourceAddress);

if (shouldRetryResult != null)
{
return Task.FromResult(shouldRetryResult);
}

if (this.nextRetryPolicy == null)
{
return Task.FromResult(ShouldRetryResult.NoRetry());
}

return this.nextRetryPolicy.ShouldRetryAsync(exception, cancellationToken);
}

public Task<ShouldRetryResult> ShouldRetryAsync(
ResponseMessage cosmosResponseMessage,
CancellationToken cancellationToken)
{
ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal(cosmosResponseMessage?.StatusCode,
cosmosResponseMessage?.Headers.SubStatusCode,
cosmosResponseMessage?.GetResourceAddress());
if (shouldRetryResult != null)
{
return Task.FromResult(shouldRetryResult);
}

if (this.nextRetryPolicy == null)
{
return Task.FromResult(ShouldRetryResult.NoRetry());
}

return this.nextRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);
}

public void OnBeforeSendRequest(DocumentServiceRequest request)
{
this.nextRetryPolicy.OnBeforeSendRequest(request);
}

private ShouldRetryResult ShouldRetryInternal(
HttpStatusCode? statusCode,
SubStatusCodes? subStatusCode,
string resourceIdOrFullName)
{
if (statusCode == HttpStatusCode.Gone
&& subStatusCode == SubStatusCodes.PartitionKeyRangeGone
&& this.retriesAttempted < MaxRetries)
{
this.retriesAttempted++;
return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
}

return null;
}
}
}
10 changes: 10 additions & 0 deletions Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,16 @@ public CosmosSerializer Serializer
}
}

/// <summary>
/// Allows optimistic batching of requests to service. Setting this option might impact the latency of the operations. Hence this option is recommended for non-latency sensitive scenarios only.
/// </summary>
#if PREVIEW
public
#else
internal
#endif
bool AllowBulkExecution { get; set; }

/// <summary>
/// A JSON serializer used by the CosmosClient to serialize or de-serialize cosmos request/responses.
/// The default serializer is always used for all system owned types like DatabaseProperties.
Expand Down
17 changes: 17 additions & 0 deletions Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,23 @@ public CosmosClientBuilder WithCustomSerializer(CosmosSerializer cosmosJsonSeria
return this;
}

/// <summary>
/// Allows optimistic batching of requests to service. Setting this option might impact the latency of the operations. Hence this option is recommended for non-latency sensitive scenarios only.
/// </summary>
/// <param name="enabled">Whether <see cref="CosmosClientOptions.AllowBulkExecution"/> is enabled.</param>
/// <returns>The <see cref="CosmosClientBuilder"/> object</returns>
/// <seealso cref="CosmosClientOptions.AllowBulkExecution"/>
#if PREVIEW
public
#else
internal
#endif
CosmosClientBuilder WithBulkexecution(bool enabled)
{
this.clientOptions.AllowBulkExecution = enabled;
return this;
}

/// <summary>
/// The event handler to be invoked before the request is sent.
/// </summary>
Expand Down
Loading

0 comments on commit 272aa1e

Please sign in to comment.