Skip to content

Commit

Permalink
Bulk: Fixes diagnostic traces by removing redundant info and adding c…
Browse files Browse the repository at this point in the history
…orrect retry context (Azure#2455)

* wiring trace

* adding trace as a list

* fixing tests

* Baseline tests should be independent ops

* updating baseline

* adding retry baseline

* adding test

* more tests

* removing client config duplication

* Adding AddChild

* baselines

* more tests

* sealed
  • Loading branch information
ealsur authored May 12, 2021
1 parent 7231619 commit 282c1c1
Show file tree
Hide file tree
Showing 18 changed files with 1,641 additions and 9,563 deletions.
16 changes: 7 additions & 9 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,10 @@ public virtual async Task DispatchAsync(
BatchPartitionMetric partitionMetric,
CancellationToken cancellationToken = default)
{
await this.clientContext.OperationHelperAsync("Batch Dispatch Async",
requestOptions: null,
task: (trace) => this.DispatchHelperAsync(trace, partitionMetric, cancellationToken),
traceComponent: TraceComponent.Batch,
traceLevel: Tracing.TraceLevel.Info);
using (ITrace trace = Tracing.Trace.GetRootTrace("Batch Dispatch Async", TraceComponent.Batch, Tracing.TraceLevel.Info))
{
await this.DispatchHelperAsync(trace, partitionMetric, cancellationToken);
}
}

private async Task<object> DispatchHelperAsync(
Expand All @@ -141,7 +140,7 @@ private async Task<object> DispatchHelperAsync(
// Any overflow goes to a new batch
foreach (ItemBatchOperation operation in pendingOperations)
{
await this.retrier(operation, trace, cancellationToken);
await this.retrier(operation, cancellationToken);
}
}
catch (Exception ex)
Expand Down Expand Up @@ -175,10 +174,10 @@ private async Task<object> DispatchHelperAsync(

if (!response.IsSuccessStatusCode)
{
Documents.ShouldRetryResult shouldRetry = await itemBatchOperation.Context.ShouldRetryAsync(response, cancellationToken);
ShouldRetryResult shouldRetry = await itemBatchOperation.Context.ShouldRetryAsync(response, cancellationToken);
if (shouldRetry.ShouldRetry)
{
await this.retrier(itemBatchOperation, trace, cancellationToken);
await this.retrier(itemBatchOperation, cancellationToken);
continue;
}
}
Expand Down Expand Up @@ -244,6 +243,5 @@ internal delegate Task<PartitionKeyRangeBatchExecutionResult> BatchAsyncBatcherE
/// <returns>An instance of <see cref="PartitionKeyRangeBatchResponse"/>.</returns>
internal delegate Task BatchAsyncBatcherRetryDelegate(
ItemBatchOperation operation,
ITrace trace,
CancellationToken cancellationToken);
}
18 changes: 12 additions & 6 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public BatchAsyncContainerExecutor(

public virtual async Task<TransactionalBatchOperationResult> AddAsync(
ItemBatchOperation operation,
ITrace trace,
ItemRequestOptions itemRequestOptions = null,
CancellationToken cancellationToken = default)
{
Expand All @@ -82,9 +83,15 @@ public virtual async Task<TransactionalBatchOperationResult> AddAsync(

await this.ValidateOperationAsync(operation, itemRequestOptions, cancellationToken);

string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, NoOpTrace.Singleton, cancellationToken).ConfigureAwait(false);
string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(
operation,
trace,
cancellationToken).ConfigureAwait(false);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, operation.OperationType, this.retryOptions));
ItemBatchOperationContext context = new ItemBatchOperationContext(
resolvedPartitionKeyRangeId,
trace,
BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, operation.OperationType, this.retryOptions));
operation.AttachContext(context);
streamer.Add(operation);
return await context.OperationTask;
Expand Down Expand Up @@ -178,13 +185,12 @@ private static void AddHeadersToRequestMessage(RequestMessage requestMessage, st

private async Task ReBatchAsync(
ItemBatchOperation operation,
ITrace trace,
CancellationToken cancellationToken)
{
using (ITrace retryTrace = trace.StartChild("Batch Retry Async", TraceComponent.Batch, Tracing.TraceLevel.Info))
using (ITrace trace = Tracing.Trace.GetRootTrace("Batch Retry Async", TraceComponent.Batch, Tracing.TraceLevel.Verbose))
{
string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, retryTrace, cancellationToken).ConfigureAwait(false);
operation.Context.ReRouteOperation(resolvedPartitionKeyRangeId);
string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, trace, cancellationToken).ConfigureAwait(false);
operation.Context.ReRouteOperation(resolvedPartitionKeyRangeId, trace);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
streamer.Add(operation);
}
Expand Down
31 changes: 26 additions & 5 deletions Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace Microsoft.Azure.Cosmos
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;

/// <summary>
Expand All @@ -25,29 +26,44 @@ internal class ItemBatchOperationContext : IDisposable

private readonly TaskCompletionSource<TransactionalBatchOperationResult> taskCompletionSource = new TaskCompletionSource<TransactionalBatchOperationResult>(TaskCreationOptions.RunContinuationsAsynchronously);

private readonly ITrace initialTrace;

public ItemBatchOperationContext(
string partitionKeyRangeId,
ITrace trace,
IDocumentClientRetryPolicy retryPolicy = null)
{
this.PartitionKeyRangeId = partitionKeyRangeId;
if (trace == null)
{
throw new ArgumentNullException(nameof(trace));
}

this.PartitionKeyRangeId = partitionKeyRangeId ?? throw new ArgumentNullException(nameof(partitionKeyRangeId));
this.initialTrace = trace;
this.retryPolicy = retryPolicy;
}

/// <summary>
/// Based on the Retry Policy, if a failed response should retry.
/// </summary>
public Task<ShouldRetryResult> ShouldRetryAsync(
public async Task<ShouldRetryResult> ShouldRetryAsync(
TransactionalBatchOperationResult batchOperationResult,
CancellationToken cancellationToken)
{
if (this.retryPolicy == null
|| batchOperationResult.IsSuccessStatusCode)
{
return Task.FromResult(ShouldRetryResult.NoRetry());
return ShouldRetryResult.NoRetry();
}

ResponseMessage responseMessage = batchOperationResult.ToResponseMessage();
return this.retryPolicy.ShouldRetryAsync(responseMessage, cancellationToken);
ShouldRetryResult shouldRetry = await this.retryPolicy.ShouldRetryAsync(responseMessage, cancellationToken);
if (shouldRetry.ShouldRetry)
{
this.initialTrace.AddChild(batchOperationResult.Trace);
}

return shouldRetry;
}

public void Complete(
Expand All @@ -56,6 +72,8 @@ public void Complete(
{
if (this.AssertBatcher(completer))
{
this.initialTrace.AddChild(result.Trace);
result.Trace = this.initialTrace;
this.taskCompletionSource.SetResult(result);
}

Expand All @@ -74,9 +92,12 @@ public void Fail(
this.Dispose();
}

public void ReRouteOperation(string newPartitionKeyRangeId)
public void ReRouteOperation(
string newPartitionKeyRangeId,
ITrace trace)
{
this.PartitionKeyRangeId = newPartitionKeyRangeId;
this.initialTrace.AddChild(trace);
}

public void Dispose()
Expand Down
3 changes: 3 additions & 0 deletions Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ internal override Task<ResponseMessage> ProcessResourceOperationStreamAsync(
partitionKey: partitionKey.Value,
itemId: itemId,
streamPayload: streamPayload,
trace: trace,
cancellationToken: cancellationToken);
}

Expand Down Expand Up @@ -442,6 +443,7 @@ private async Task<ResponseMessage> ProcessResourceOperationAsBulkStreamAsync(
PartitionKey partitionKey,
string itemId,
Stream streamPayload,
ITrace trace,
CancellationToken cancellationToken)
{
this.ThrowIfDisposed();
Expand All @@ -458,6 +460,7 @@ private async Task<ResponseMessage> ProcessResourceOperationAsBulkStreamAsync(

TransactionalBatchOperationResult batchOperationResult = await cosmosContainerCore.BatchExecutor.AddAsync(
itemBatchOperation,
trace,
itemRequestOptions,
cancellationToken);

Expand Down
6 changes: 6 additions & 0 deletions Microsoft.Azure.Cosmos/src/Tracing/ITrace.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,11 @@ ITrace StartChild(
/// <param name="key">The key to associate the datum.</param>
/// <param name="value">The datum itself.</param>
void AddDatum(string key, object value);

/// <summary>
/// Adds a trace children that is already completed.
/// </summary>
/// <param name="trace">Existing trace.</param>
void AddChild(ITrace trace);
}
}
5 changes: 5 additions & 0 deletions Microsoft.Azure.Cosmos/src/Tracing/NoOpTrace.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,10 @@ public void AddDatum(string key, object value)
{
// NoOp
}

public void AddChild(ITrace trace)
{
// NoOp
}
}
}
13 changes: 9 additions & 4 deletions Microsoft.Azure.Cosmos/src/Tracing/Trace.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Microsoft.Azure.Cosmos.Tracing

internal sealed class Trace : ITrace
{
private readonly List<Trace> children;
private readonly List<ITrace> children;
private readonly Dictionary<string, object> data;
private readonly Stopwatch stopwatch;

Expand All @@ -30,7 +30,7 @@ private Trace(
this.Level = level;
this.Component = component;
this.Parent = parent;
this.children = new List<Trace>();
this.children = new List<ITrace>();
this.data = new Dictionary<string, object>();
}

Expand Down Expand Up @@ -89,12 +89,17 @@ public ITrace StartChild(
component: component,
parent: this);

this.AddChild(child);

return child;
}

public void AddChild(ITrace child)
{
lock (this.children)
{
this.children.Add(child);
}

return child;
}

public static Trace GetRootTrace(string name)
Expand Down
7 changes: 6 additions & 1 deletion Microsoft.Azure.Cosmos/src/Tracing/TraceJoiner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,14 @@ public ITrace StartChild(string name, [CallerMemberName] string memberName = "",
public ITrace StartChild(string name, TraceComponent component, TraceLevel level, [CallerMemberName] string memberName = "", [CallerFilePath] string sourceFilePath = "", [CallerLineNumber] int sourceLineNumber = 0)
{
ITrace child = Trace.GetRootTrace(name, component, level, memberName, sourceFilePath, sourceLineNumber);
this.children.Add(child);
this.AddChild(child);
return child;
}

public void AddChild(ITrace trace)
{
this.children.Add(trace);
}
}
}
}
Loading

0 comments on commit 282c1c1

Please sign in to comment.