Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk: Fixes diagnostic traces by removing redundant info and adding correct retry context #2455

Merged
merged 15 commits into from
May 12, 2021
7 changes: 3 additions & 4 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ private async Task<object> DispatchHelperAsync(
// Any overflow goes to a new batch
foreach (ItemBatchOperation operation in pendingOperations)
{
await this.retrier(operation, trace, cancellationToken);
await this.retrier(operation, cancellationToken);
ealsur marked this conversation as resolved.
Show resolved Hide resolved
}
}
catch (Exception ex)
Expand Down Expand Up @@ -175,10 +175,10 @@ private async Task<object> DispatchHelperAsync(

if (!response.IsSuccessStatusCode)
{
Documents.ShouldRetryResult shouldRetry = await itemBatchOperation.Context.ShouldRetryAsync(response, cancellationToken);
ShouldRetryResult shouldRetry = await itemBatchOperation.Context.ShouldRetryAsync(response, cancellationToken);
if (shouldRetry.ShouldRetry)
{
await this.retrier(itemBatchOperation, trace, cancellationToken);
await this.retrier(itemBatchOperation, cancellationToken);
continue;
}
}
Expand Down Expand Up @@ -244,6 +244,5 @@ internal delegate Task<PartitionKeyRangeBatchExecutionResult> BatchAsyncBatcherE
/// <returns>An instance of <see cref="PartitionKeyRangeBatchResponse"/>.</returns>
internal delegate Task BatchAsyncBatcherRetryDelegate(
ItemBatchOperation operation,
ITrace trace,
CancellationToken cancellationToken);
}
18 changes: 12 additions & 6 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public BatchAsyncContainerExecutor(

public virtual async Task<TransactionalBatchOperationResult> AddAsync(
ItemBatchOperation operation,
ITrace trace,
ItemRequestOptions itemRequestOptions = null,
CancellationToken cancellationToken = default)
{
Expand All @@ -82,9 +83,15 @@ public virtual async Task<TransactionalBatchOperationResult> AddAsync(

await this.ValidateOperationAsync(operation, itemRequestOptions, cancellationToken);

string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, NoOpTrace.Singleton, cancellationToken).ConfigureAwait(false);
string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(
operation,
trace,
cancellationToken).ConfigureAwait(false);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, operation.OperationType, this.retryOptions));
ItemBatchOperationContext context = new ItemBatchOperationContext(
resolvedPartitionKeyRangeId,
trace,
BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, operation.OperationType, this.retryOptions));
operation.AttachContext(context);
streamer.Add(operation);
return await context.OperationTask;
Expand Down Expand Up @@ -178,13 +185,12 @@ private static void AddHeadersToRequestMessage(RequestMessage requestMessage, st

private async Task ReBatchAsync(
ItemBatchOperation operation,
ITrace trace,
CancellationToken cancellationToken)
{
using (ITrace retryTrace = trace.StartChild("Batch Retry Async", TraceComponent.Batch, Tracing.TraceLevel.Info))
using (ITrace trace = Tracing.Trace.GetRootTrace("Batch Retry Async", TraceComponent.Batch, Tracing.TraceLevel.Info))
{
string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, retryTrace, cancellationToken).ConfigureAwait(false);
operation.Context.ReRouteOperation(resolvedPartitionKeyRangeId);
string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, trace, cancellationToken).ConfigureAwait(false);
operation.Context.ReRouteOperation(resolvedPartitionKeyRangeId, trace);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
streamer.Add(operation);
}
Expand Down
35 changes: 30 additions & 5 deletions Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;

/// <summary>
Expand All @@ -25,29 +27,47 @@ internal class ItemBatchOperationContext : IDisposable

private readonly TaskCompletionSource<TransactionalBatchOperationResult> taskCompletionSource = new TaskCompletionSource<TransactionalBatchOperationResult>(TaskCreationOptions.RunContinuationsAsynchronously);

private readonly List<ITrace> traces;

public ItemBatchOperationContext(
string partitionKeyRangeId,
ITrace trace,
IDocumentClientRetryPolicy retryPolicy = null)
{
this.PartitionKeyRangeId = partitionKeyRangeId;
if (trace == null)
{
throw new ArgumentNullException(nameof(trace));
}

this.PartitionKeyRangeId = partitionKeyRangeId ?? throw new ArgumentNullException(nameof(partitionKeyRangeId));
this.traces = new List<ITrace>
{
trace
};
this.retryPolicy = retryPolicy;
}

/// <summary>
/// Based on the Retry Policy, if a failed response should retry.
/// </summary>
public Task<ShouldRetryResult> ShouldRetryAsync(
public async Task<ShouldRetryResult> ShouldRetryAsync(
TransactionalBatchOperationResult batchOperationResult,
CancellationToken cancellationToken)
{
if (this.retryPolicy == null
|| batchOperationResult.IsSuccessStatusCode)
{
return Task.FromResult(ShouldRetryResult.NoRetry());
return ShouldRetryResult.NoRetry();
}

ResponseMessage responseMessage = batchOperationResult.ToResponseMessage();
return this.retryPolicy.ShouldRetryAsync(responseMessage, cancellationToken);
ShouldRetryResult shouldRetry = await this.retryPolicy.ShouldRetryAsync(responseMessage, cancellationToken);
if (shouldRetry.ShouldRetry)
{
this.traces.Add(batchOperationResult.Trace);
}

return shouldRetry;
}

public void Complete(
Expand All @@ -56,6 +76,8 @@ public void Complete(
{
if (this.AssertBatcher(completer))
{
this.traces.Add(result.Trace);
result.Trace = TraceJoiner.JoinTraces(this.traces);
this.taskCompletionSource.SetResult(result);
}

Expand All @@ -74,9 +96,12 @@ public void Fail(
this.Dispose();
}

public void ReRouteOperation(string newPartitionKeyRangeId)
public void ReRouteOperation(
string newPartitionKeyRangeId,
ITrace trace)
{
this.PartitionKeyRangeId = newPartitionKeyRangeId;
this.traces.Add(trace);
}

public void Dispose()
Expand Down
3 changes: 3 additions & 0 deletions Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ internal override Task<ResponseMessage> ProcessResourceOperationStreamAsync(
partitionKey: partitionKey.Value,
itemId: itemId,
streamPayload: streamPayload,
trace: trace,
cancellationToken: cancellationToken);
}

Expand Down Expand Up @@ -442,6 +443,7 @@ private async Task<ResponseMessage> ProcessResourceOperationAsBulkStreamAsync(
PartitionKey partitionKey,
string itemId,
Stream streamPayload,
ITrace trace,
CancellationToken cancellationToken)
{
this.ThrowIfDisposed();
Expand All @@ -458,6 +460,7 @@ private async Task<ResponseMessage> ProcessResourceOperationAsBulkStreamAsync(

TransactionalBatchOperationResult batchOperationResult = await cosmosContainerCore.BatchExecutor.AddAsync(
itemBatchOperation,
trace,
itemRequestOptions,
cancellationToken);

Expand Down
Loading