From 282c1c1a0f5335885f3b72f5ee0a52fea902c863 Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Wed, 12 May 2021 13:52:51 -0700 Subject: [PATCH] Bulk: Fixes diagnostic traces by removing redundant info and adding correct retry context (#2455) * wiring trace * adding trace as a list * fixing tests * Baseline tests should be independent ops * updating baseline * adding retry baseline * adding test * more tests * removing client config duplication * Adding AddChild * baselines * more tests * sealed --- .../src/Batch/BatchAsyncBatcher.cs | 16 +- .../src/Batch/BatchAsyncContainerExecutor.cs | 18 +- .../src/Batch/ItemBatchOperationContext.cs | 31 +- .../src/Resource/ClientContextCore.cs | 3 + Microsoft.Azure.Cosmos/src/Tracing/ITrace.cs | 6 + .../src/Tracing/NoOpTrace.cs | 5 + Microsoft.Azure.Cosmos/src/Tracing/Trace.cs | 13 +- .../src/Tracing/TraceJoiner.cs | 7 +- ...riterBaselineTests.BulkOperationsAsync.xml | 10789 ++-------------- .../Batch/BatchAsyncContainerExecutorTests.cs | 3 +- .../EndToEndTraceWriterBaselineTests.cs | 64 +- .../Batch/BatchAsyncBatcherTests.cs | 74 +- .../Batch/BatchAsyncContainerExecutorTests.cs | 8 +- .../Batch/BatchAsyncOperationContextTests.cs | 109 +- .../Batch/BatchAsyncStreamerTests.cs | 4 +- .../CosmosItemUnitTests.cs | 26 +- .../Tracing/TraceTests.cs | 17 + .../Tracing/TraceWriterBaselineTests.cs | 11 +- 18 files changed, 1641 insertions(+), 9563 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs b/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs index b2afc0b60f..987264f6bb 100644 --- a/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs +++ b/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs @@ -113,11 +113,10 @@ public virtual async Task DispatchAsync( BatchPartitionMetric partitionMetric, CancellationToken cancellationToken = default) { - await this.clientContext.OperationHelperAsync("Batch Dispatch Async", - requestOptions: null, - task: (trace) => this.DispatchHelperAsync(trace, partitionMetric, cancellationToken), - traceComponent: TraceComponent.Batch, - traceLevel: Tracing.TraceLevel.Info); + using (ITrace trace = Tracing.Trace.GetRootTrace("Batch Dispatch Async", TraceComponent.Batch, Tracing.TraceLevel.Info)) + { + await this.DispatchHelperAsync(trace, partitionMetric, cancellationToken); + } } private async Task DispatchHelperAsync( @@ -141,7 +140,7 @@ private async Task DispatchHelperAsync( // Any overflow goes to a new batch foreach (ItemBatchOperation operation in pendingOperations) { - await this.retrier(operation, trace, cancellationToken); + await this.retrier(operation, cancellationToken); } } catch (Exception ex) @@ -175,10 +174,10 @@ private async Task DispatchHelperAsync( if (!response.IsSuccessStatusCode) { - Documents.ShouldRetryResult shouldRetry = await itemBatchOperation.Context.ShouldRetryAsync(response, cancellationToken); + ShouldRetryResult shouldRetry = await itemBatchOperation.Context.ShouldRetryAsync(response, cancellationToken); if (shouldRetry.ShouldRetry) { - await this.retrier(itemBatchOperation, trace, cancellationToken); + await this.retrier(itemBatchOperation, cancellationToken); continue; } } @@ -244,6 +243,5 @@ internal delegate Task BatchAsyncBatcherE /// An instance of . internal delegate Task BatchAsyncBatcherRetryDelegate( ItemBatchOperation operation, - ITrace trace, CancellationToken cancellationToken); } diff --git a/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs b/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs index 0bb7b3e326..ce4f8ccfe2 100644 --- a/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs +++ b/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs @@ -72,6 +72,7 @@ public BatchAsyncContainerExecutor( public virtual async Task AddAsync( ItemBatchOperation operation, + ITrace trace, ItemRequestOptions itemRequestOptions = null, CancellationToken cancellationToken = default) { @@ -82,9 +83,15 @@ public virtual async Task AddAsync( await this.ValidateOperationAsync(operation, itemRequestOptions, cancellationToken); - string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, NoOpTrace.Singleton, cancellationToken).ConfigureAwait(false); + string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync( + operation, + trace, + cancellationToken).ConfigureAwait(false); BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId); - ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, operation.OperationType, this.retryOptions)); + ItemBatchOperationContext context = new ItemBatchOperationContext( + resolvedPartitionKeyRangeId, + trace, + BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, operation.OperationType, this.retryOptions)); operation.AttachContext(context); streamer.Add(operation); return await context.OperationTask; @@ -178,13 +185,12 @@ private static void AddHeadersToRequestMessage(RequestMessage requestMessage, st private async Task ReBatchAsync( ItemBatchOperation operation, - ITrace trace, CancellationToken cancellationToken) { - using (ITrace retryTrace = trace.StartChild("Batch Retry Async", TraceComponent.Batch, Tracing.TraceLevel.Info)) + using (ITrace trace = Tracing.Trace.GetRootTrace("Batch Retry Async", TraceComponent.Batch, Tracing.TraceLevel.Verbose)) { - string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, retryTrace, cancellationToken).ConfigureAwait(false); - operation.Context.ReRouteOperation(resolvedPartitionKeyRangeId); + string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, trace, cancellationToken).ConfigureAwait(false); + operation.Context.ReRouteOperation(resolvedPartitionKeyRangeId, trace); BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId); streamer.Add(operation); } diff --git a/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs b/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs index 126bcb97a5..e4316eace4 100644 --- a/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs +++ b/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs @@ -8,6 +8,7 @@ namespace Microsoft.Azure.Cosmos using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Core.Trace; + using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Documents; /// @@ -25,29 +26,44 @@ internal class ItemBatchOperationContext : IDisposable private readonly TaskCompletionSource taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly ITrace initialTrace; + public ItemBatchOperationContext( string partitionKeyRangeId, + ITrace trace, IDocumentClientRetryPolicy retryPolicy = null) { - this.PartitionKeyRangeId = partitionKeyRangeId; + if (trace == null) + { + throw new ArgumentNullException(nameof(trace)); + } + + this.PartitionKeyRangeId = partitionKeyRangeId ?? throw new ArgumentNullException(nameof(partitionKeyRangeId)); + this.initialTrace = trace; this.retryPolicy = retryPolicy; } /// /// Based on the Retry Policy, if a failed response should retry. /// - public Task ShouldRetryAsync( + public async Task ShouldRetryAsync( TransactionalBatchOperationResult batchOperationResult, CancellationToken cancellationToken) { if (this.retryPolicy == null || batchOperationResult.IsSuccessStatusCode) { - return Task.FromResult(ShouldRetryResult.NoRetry()); + return ShouldRetryResult.NoRetry(); } ResponseMessage responseMessage = batchOperationResult.ToResponseMessage(); - return this.retryPolicy.ShouldRetryAsync(responseMessage, cancellationToken); + ShouldRetryResult shouldRetry = await this.retryPolicy.ShouldRetryAsync(responseMessage, cancellationToken); + if (shouldRetry.ShouldRetry) + { + this.initialTrace.AddChild(batchOperationResult.Trace); + } + + return shouldRetry; } public void Complete( @@ -56,6 +72,8 @@ public void Complete( { if (this.AssertBatcher(completer)) { + this.initialTrace.AddChild(result.Trace); + result.Trace = this.initialTrace; this.taskCompletionSource.SetResult(result); } @@ -74,9 +92,12 @@ public void Fail( this.Dispose(); } - public void ReRouteOperation(string newPartitionKeyRangeId) + public void ReRouteOperation( + string newPartitionKeyRangeId, + ITrace trace) { this.PartitionKeyRangeId = newPartitionKeyRangeId; + this.initialTrace.AddChild(trace); } public void Dispose() diff --git a/Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs b/Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs index ce43da78ce..bc5b514928 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs @@ -286,6 +286,7 @@ internal override Task ProcessResourceOperationStreamAsync( partitionKey: partitionKey.Value, itemId: itemId, streamPayload: streamPayload, + trace: trace, cancellationToken: cancellationToken); } @@ -442,6 +443,7 @@ private async Task ProcessResourceOperationAsBulkStreamAsync( PartitionKey partitionKey, string itemId, Stream streamPayload, + ITrace trace, CancellationToken cancellationToken) { this.ThrowIfDisposed(); @@ -458,6 +460,7 @@ private async Task ProcessResourceOperationAsBulkStreamAsync( TransactionalBatchOperationResult batchOperationResult = await cosmosContainerCore.BatchExecutor.AddAsync( itemBatchOperation, + trace, itemRequestOptions, cancellationToken); diff --git a/Microsoft.Azure.Cosmos/src/Tracing/ITrace.cs b/Microsoft.Azure.Cosmos/src/Tracing/ITrace.cs index 4ff13a1acb..751af98467 100644 --- a/Microsoft.Azure.Cosmos/src/Tracing/ITrace.cs +++ b/Microsoft.Azure.Cosmos/src/Tracing/ITrace.cs @@ -113,5 +113,11 @@ ITrace StartChild( /// The key to associate the datum. /// The datum itself. void AddDatum(string key, object value); + + /// + /// Adds a trace children that is already completed. + /// + /// Existing trace. + void AddChild(ITrace trace); } } diff --git a/Microsoft.Azure.Cosmos/src/Tracing/NoOpTrace.cs b/Microsoft.Azure.Cosmos/src/Tracing/NoOpTrace.cs index 3b6c68426b..f28c8c3019 100644 --- a/Microsoft.Azure.Cosmos/src/Tracing/NoOpTrace.cs +++ b/Microsoft.Azure.Cosmos/src/Tracing/NoOpTrace.cs @@ -76,5 +76,10 @@ public void AddDatum(string key, object value) { // NoOp } + + public void AddChild(ITrace trace) + { + // NoOp + } } } diff --git a/Microsoft.Azure.Cosmos/src/Tracing/Trace.cs b/Microsoft.Azure.Cosmos/src/Tracing/Trace.cs index cfd4aea1ef..5a5d7df45c 100644 --- a/Microsoft.Azure.Cosmos/src/Tracing/Trace.cs +++ b/Microsoft.Azure.Cosmos/src/Tracing/Trace.cs @@ -11,7 +11,7 @@ namespace Microsoft.Azure.Cosmos.Tracing internal sealed class Trace : ITrace { - private readonly List children; + private readonly List children; private readonly Dictionary data; private readonly Stopwatch stopwatch; @@ -30,7 +30,7 @@ private Trace( this.Level = level; this.Component = component; this.Parent = parent; - this.children = new List(); + this.children = new List(); this.data = new Dictionary(); } @@ -89,12 +89,17 @@ public ITrace StartChild( component: component, parent: this); + this.AddChild(child); + + return child; + } + + public void AddChild(ITrace child) + { lock (this.children) { this.children.Add(child); } - - return child; } public static Trace GetRootTrace(string name) diff --git a/Microsoft.Azure.Cosmos/src/Tracing/TraceJoiner.cs b/Microsoft.Azure.Cosmos/src/Tracing/TraceJoiner.cs index 61a5376d16..acbcdc1994 100644 --- a/Microsoft.Azure.Cosmos/src/Tracing/TraceJoiner.cs +++ b/Microsoft.Azure.Cosmos/src/Tracing/TraceJoiner.cs @@ -89,9 +89,14 @@ public ITrace StartChild(string name, [CallerMemberName] string memberName = "", public ITrace StartChild(string name, TraceComponent component, TraceLevel level, [CallerMemberName] string memberName = "", [CallerFilePath] string sourceFilePath = "", [CallerLineNumber] int sourceLineNumber = 0) { ITrace child = Trace.GetRootTrace(name, component, level, memberName, sourceFilePath, sourceLineNumber); - this.children.Add(child); + this.AddChild(child); return child; } + + public void AddChild(ITrace trace) + { + this.children.Add(trace); + } } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/BaselineTest/TestBaseline/EndToEndTraceWriterBaselineTests.BulkOperationsAsync.xml b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/BaselineTest/TestBaseline/EndToEndTraceWriterBaselineTests.BulkOperationsAsync.xml index ecd8b7733e..71d3d31f17 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/BaselineTest/TestBaseline/EndToEndTraceWriterBaselineTests.BulkOperationsAsync.xml +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/BaselineTest/TestBaseline/EndToEndTraceWriterBaselineTests.BulkOperationsAsync.xml @@ -7,7 +7,7 @@ CosmosClient bulkClient = TestCommon.CreateCosmosClient(builder => builder.WithBulkExecution(true)); Container bulkContainer = bulkClient.GetContainer(database.Id, container.Id); List>> createItemsTasks = new List>>(); - for (int i = 0; i < 100; i++) + for (int i = 0; i < 10; i++) { ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(pk: pkValue); createItemsTasks.Add(bulkContainer.CreateItemAsync(item, new PartitionKey(item.id))); @@ -25,8718 +25,47 @@ traces.Add(trace); } - ITrace joinedTrace = TraceJoiner.JoinTraces(traces); ]]> - + + + + + + Bulk Operation + builder.WithBulkExecution(true)); + Container bulkContainer = bulkClient.GetContainer(database.Id, container.Id); + List>> createItemsTasks = new List>>(); + for (int i = 0; i < 10; i++) { - "name": "Batch Dispatch Async", + ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(pk: pkValue); + createItemsTasks.Add(bulkContainer.CreateItemAsync(item, new PartitionKey(item.id))); + } + + await Task.WhenAll(createItemsTasks); + + List traces = new List(); + foreach (Task> createTask in createItemsTasks) + { + ItemResponse itemResponse = await createTask; + Assert.IsNotNull(itemResponse); + + ITrace trace = ((CosmosTraceDiagnostics)itemResponse.Diagnostics).Value; + traces.Add(trace); + } + +]]> + + + + + + + + + Bulk Operation + builder.WithBulkExecution(true)); + Container bulkContainer = bulkClient.GetContainer(database.Id, container.Id); + List>> createItemsTasks = new List>>(); + for (int i = 0; i < 10; i++) + { + ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(pk: pkValue); + createItemsTasks.Add(bulkContainer.CreateItemAsync(item, new PartitionKey(item.id))); + } + + await Task.WhenAll(createItemsTasks); + + List traces = new List(); + foreach (Task> createTask in createItemsTasks) + { + ItemResponse itemResponse = await createTask; + Assert.IsNotNull(itemResponse); + + ITrace trace = ((CosmosTraceDiagnostics)itemResponse.Diagnostics).Value; + traces.Add(trace); + } + +]]> + + + + + + + + + Bulk Operation + builder.WithBulkExecution(true)); + Container bulkContainer = bulkClient.GetContainer(database.Id, container.Id); + List>> createItemsTasks = new List>>(); + for (int i = 0; i < 10; i++) + { + ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(pk: pkValue); + createItemsTasks.Add(bulkContainer.CreateItemAsync(item, new PartitionKey(item.id))); + } + + await Task.WhenAll(createItemsTasks); + + List traces = new List(); + foreach (Task> createTask in createItemsTasks) + { + ItemResponse itemResponse = await createTask; + Assert.IsNotNull(itemResponse); + + ITrace trace = ((CosmosTraceDiagnostics)itemResponse.Diagnostics).Value; + traces.Add(trace); + } + +]]> + + + + + + + + + Bulk Operation + builder.WithBulkExecution(true)); + Container bulkContainer = bulkClient.GetContainer(database.Id, container.Id); + List>> createItemsTasks = new List>>(); + for (int i = 0; i < 10; i++) + { + ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(pk: pkValue); + createItemsTasks.Add(bulkContainer.CreateItemAsync(item, new PartitionKey(item.id))); + } + + await Task.WhenAll(createItemsTasks); + + List traces = new List(); + foreach (Task> createTask in createItemsTasks) + { + ItemResponse itemResponse = await createTask; + Assert.IsNotNull(itemResponse); + + ITrace trace = ((CosmosTraceDiagnostics)itemResponse.Diagnostics).Value; + traces.Add(trace); + } + +]]> + + + + + + + + + Bulk Operation + builder.WithBulkExecution(true)); + Container bulkContainer = bulkClient.GetContainer(database.Id, container.Id); + List>> createItemsTasks = new List>>(); + for (int i = 0; i < 10; i++) + { + ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(pk: pkValue); + createItemsTasks.Add(bulkContainer.CreateItemAsync(item, new PartitionKey(item.id))); + } + + await Task.WhenAll(createItemsTasks); + + List traces = new List(); + foreach (Task> createTask in createItemsTasks) + { + ItemResponse itemResponse = await createTask; + Assert.IsNotNull(itemResponse); + + ITrace trace = ((CosmosTraceDiagnostics)itemResponse.Diagnostics).Value; + traces.Add(trace); + } + +]]> + + + + + + + + + Bulk Operation + builder.WithBulkExecution(true)); + Container bulkContainer = bulkClient.GetContainer(database.Id, container.Id); + List>> createItemsTasks = new List>>(); + for (int i = 0; i < 10; i++) + { + ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(pk: pkValue); + createItemsTasks.Add(bulkContainer.CreateItemAsync(item, new PartitionKey(item.id))); + } + + await Task.WhenAll(createItemsTasks); + + List traces = new List(); + foreach (Task> createTask in createItemsTasks) + { + ItemResponse itemResponse = await createTask; + Assert.IsNotNull(itemResponse); + + ITrace trace = ((CosmosTraceDiagnostics)itemResponse.Diagnostics).Value; + traces.Add(trace); + } + +]]> + + + + + + + + + Bulk Operation + builder.WithBulkExecution(true)); + Container bulkContainer = bulkClient.GetContainer(database.Id, container.Id); + List>> createItemsTasks = new List>>(); + for (int i = 0; i < 10; i++) + { + ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(pk: pkValue); + createItemsTasks.Add(bulkContainer.CreateItemAsync(item, new PartitionKey(item.id))); + } + + await Task.WhenAll(createItemsTasks); + + List traces = new List(); + foreach (Task> createTask in createItemsTasks) + { + ItemResponse itemResponse = await createTask; + Assert.IsNotNull(itemResponse); + + ITrace trace = ((CosmosTraceDiagnostics)itemResponse.Diagnostics).Value; + traces.Add(trace); + } + +]]> + + + + + + + + + Bulk Operation + builder.WithBulkExecution(true)); + Container bulkContainer = bulkClient.GetContainer(database.Id, container.Id); + List>> createItemsTasks = new List>>(); + for (int i = 0; i < 10; i++) + { + ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(pk: pkValue); + createItemsTasks.Add(bulkContainer.CreateItemAsync(item, new PartitionKey(item.id))); + } + + await Task.WhenAll(createItemsTasks); + + List traces = new List(); + foreach (Task> createTask in createItemsTasks) + { + ItemResponse itemResponse = await createTask; + Assert.IsNotNull(itemResponse); + + ITrace trace = ((CosmosTraceDiagnostics)itemResponse.Diagnostics).Value; + traces.Add(trace); + } + +]]> + + + + + + + + + Bulk Operation + builder.WithBulkExecution(true)); + Container bulkContainer = bulkClient.GetContainer(database.Id, container.Id); + List>> createItemsTasks = new List>>(); + for (int i = 0; i < 10; i++) + { + ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(pk: pkValue); + createItemsTasks.Add(bulkContainer.CreateItemAsync(item, new PartitionKey(item.id))); + } + + await Task.WhenAll(createItemsTasks); + + List traces = new List(); + foreach (Task> createTask in createItemsTasks) + { + ItemResponse itemResponse = await createTask; + Assert.IsNotNull(itemResponse); + + ITrace trace = ((CosmosTraceDiagnostics)itemResponse.Diagnostics).Value; + traces.Add(trace); + } + +]]> + + + + + + + + + Bulk Operation With Throttle + + builder.WithThrottlingRetryOptions(TimeSpan.FromSeconds(5), 3) + .WithBulkExecution(true) + .WithTransportClientHandlerFactory(transportClient => new TransportClientWrapper( + transportClient, + (uri, resourceOperation, request) => TransportClientHelper.ReturnThrottledStoreResponseOnItemOperation( + uri, + resourceOperation, + request, + exceptionActivityId, + errorMessage))) + ); + + ItemRequestOptions requestOptions = new ItemRequestOptions(); + Container containerWithThrottleException = throttleClient.GetContainer( + database.Id, + container.Id); + + ToDoActivity testItem = ToDoActivity.CreateRandomToDoActivity(); + ITrace trace = null; + try + { + ItemResponse createResponse = await containerWithThrottleException.CreateItemAsync( + item: testItem, + partitionKey: new PartitionKey(testItem.id), + requestOptions: requestOptions); + Assert.Fail("Should have thrown a throttling exception"); + } + catch (CosmosException ce) when ((int)ce.StatusCode == (int)Documents.StatusCodes.TooManyRequests) + { + trace = ((CosmosTraceDiagnostics)ce.Diagnostics).Value; + } +]]> + + + + > tasks = new List>(); for (int i = 0; i < 100; i++) { - tasks.Add(executor.AddAsync(CreateItem(i.ToString()), null, default(CancellationToken))); + tasks.Add(executor.AddAsync(CreateItem(i.ToString()), NoOpTrace.Singleton)); } await Task.WhenAll(tasks); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Tracing/EndToEndTraceWriterBaselineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Tracing/EndToEndTraceWriterBaselineTests.cs index 02194958a5..8bdbb0c8a4 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Tracing/EndToEndTraceWriterBaselineTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Tracing/EndToEndTraceWriterBaselineTests.cs @@ -879,7 +879,7 @@ public async Task BulkOperationsAsync() CosmosClient bulkClient = TestCommon.CreateCosmosClient(builder => builder.WithBulkExecution(true)); Container bulkContainer = bulkClient.GetContainer(database.Id, container.Id); List>> createItemsTasks = new List>>(); - for (int i = 0; i < 100; i++) + for (int i = 0; i < 10; i++) { ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(pk: pkValue); createItemsTasks.Add(bulkContainer.CreateItemAsync(item, new PartitionKey(item.id))); @@ -897,13 +897,60 @@ public async Task BulkOperationsAsync() traces.Add(trace); } - ITrace joinedTrace = TraceJoiner.JoinTraces(traces); endLineNumber = GetLineNumber(); - inputs.Add(new Input("Bulk Operation", joinedTrace, startLineNumber, endLineNumber)); + foreach (ITrace trace in traces) + { + inputs.Add(new Input("Bulk Operation", trace, startLineNumber, endLineNumber)); + } } //---------------------------------------------------------------- + //---------------------------------------------------------------- + // Bulk with retry on throttle + //---------------------------------------------------------------- + { + startLineNumber = GetLineNumber(); + string errorMessage = "Mock throttle exception" + Guid.NewGuid().ToString(); + Guid exceptionActivityId = Guid.NewGuid(); + // Set a small retry count to reduce test time + CosmosClient throttleClient = TestCommon.CreateCosmosClient(builder => + builder.WithThrottlingRetryOptions(TimeSpan.FromSeconds(5), 3) + .WithBulkExecution(true) + .WithTransportClientHandlerFactory(transportClient => new TransportClientWrapper( + transportClient, + (uri, resourceOperation, request) => TransportClientHelper.ReturnThrottledStoreResponseOnItemOperation( + uri, + resourceOperation, + request, + exceptionActivityId, + errorMessage))) + ); + + ItemRequestOptions requestOptions = new ItemRequestOptions(); + Container containerWithThrottleException = throttleClient.GetContainer( + database.Id, + container.Id); + + ToDoActivity testItem = ToDoActivity.CreateRandomToDoActivity(); + ITrace trace = null; + try + { + ItemResponse createResponse = await containerWithThrottleException.CreateItemAsync( + item: testItem, + partitionKey: new PartitionKey(testItem.id), + requestOptions: requestOptions); + Assert.Fail("Should have thrown a throttling exception"); + } + catch (CosmosException ce) when ((int)ce.StatusCode == (int)Documents.StatusCodes.TooManyRequests) + { + trace = ((CosmosTraceDiagnostics)ce.Diagnostics).Value; + } + endLineNumber = GetLineNumber(); + + inputs.Add(new Input("Bulk Operation With Throttle", trace, startLineNumber, endLineNumber)); + } + this.ExecuteTestSuite(inputs); } @@ -1202,7 +1249,7 @@ public override void SerializeAsXml(XmlWriter xmlWriter) private sealed class TraceForBaselineTesting : ITrace { public readonly Dictionary data; - public readonly List children; + public readonly List children; public TraceForBaselineTesting( string name, @@ -1214,7 +1261,7 @@ public TraceForBaselineTesting( this.Level = level; this.Component = component; this.Parent = parent; - this.children = new List(); + this.children = new List(); this.data = new Dictionary(); } @@ -1266,10 +1313,15 @@ public ITrace StartChild(string name, [CallerMemberName] string memberName = "", public ITrace StartChild(string name, TraceComponent component, TraceLevel level, [CallerMemberName] string memberName = "", [CallerFilePath] string sourceFilePath = "", [CallerLineNumber] int sourceLineNumber = 0) { TraceForBaselineTesting child = new TraceForBaselineTesting(name, level, component, parent: this); - this.children.Add(child); + this.AddChild(child); return child; } + public void AddChild(ITrace trace) + { + this.children.Add(trace); + } + public static TraceForBaselineTesting GetRootTrace() { return new TraceForBaselineTesting("Trace For Baseline Testing", TraceLevel.Info, TraceComponent.Unknown, parent: null); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs index 5de3e67fb7..98787e4660 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs @@ -33,7 +33,7 @@ private ItemBatchOperation CreateItemBatchOperation(bool withContext = false) resourceStream: new MemoryStream(new byte[] { 0x41, 0x42 }, index: 0, count: 2, writable: false, publiclyVisible: true)); if (withContext) { - operation.AttachContext(new ItemBatchOperationContext(string.Empty)); + operation.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton)); } return operation; @@ -310,7 +310,7 @@ private readonly BatchAsyncBatcherExecuteDelegate ExecutorWithLessResponses private readonly BatchAsyncBatcherExecuteDelegate ExecutorWithFailure = (PartitionKeyRangeServerBatchRequest request, ITrace trace, CancellationToken cancellationToken) => throw expectedException; - private readonly BatchAsyncBatcherRetryDelegate Retrier = (ItemBatchOperation operation, ITrace trace, CancellationToken cancellation) => Task.CompletedTask; + private readonly BatchAsyncBatcherRetryDelegate Retrier = (ItemBatchOperation operation, CancellationToken cancellation) => Task.CompletedTask; [DataTestMethod] [ExpectedException(typeof(ArgumentOutOfRangeException))] @@ -378,9 +378,9 @@ public async Task ExceptionsFailOperationsAsync() BatchAsyncBatcher batchAsyncBatcher = new BatchAsyncBatcher(2, 1000, MockCosmosUtil.Serializer, this.ExecutorWithFailure, this.Retrier, BatchAsyncBatcherTests.MockClientContext()); ItemBatchOperation operation1 = this.CreateItemBatchOperation(); ItemBatchOperation operation2 = this.CreateItemBatchOperation(); - ItemBatchOperationContext context1 = new ItemBatchOperationContext(string.Empty); + ItemBatchOperationContext context1 = new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton); operation1.AttachContext(context1); - ItemBatchOperationContext context2 = new ItemBatchOperationContext(string.Empty); + ItemBatchOperationContext context2 = new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton); operation2.AttachContext(context2); batchAsyncBatcher.TryAdd(operation1); batchAsyncBatcher.TryAdd(operation2); @@ -405,7 +405,7 @@ public async Task DispatchProcessInOrderAsync() partitionKey: new Cosmos.PartitionKey(i.ToString()), id: i.ToString()); - ItemBatchOperationContext context = new ItemBatchOperationContext(string.Empty); + ItemBatchOperationContext context = new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton); operation.AttachContext(context); operations.Add(operation); Assert.IsTrue(batchAsyncBatcher.TryAdd(operation)); @@ -431,7 +431,7 @@ public async Task DispatchWithLessResponses() for (int i = 0; i < 10; i++) { ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, i, Cosmos.PartitionKey.Null, i.ToString()); - ItemBatchOperationContext context = new ItemBatchOperationContext(string.Empty); + ItemBatchOperationContext context = new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton); operation.AttachContext(context); operations.Add(operation); Assert.IsTrue(batchAsyncBatcher.TryAdd(operation)); @@ -496,7 +496,7 @@ public async Task CannotAddToDispatchedBatch() { BatchAsyncBatcher batchAsyncBatcher = new BatchAsyncBatcher(1, 1000, MockCosmosUtil.Serializer, this.Executor, this.Retrier, BatchAsyncBatcherTests.MockClientContext()); ItemBatchOperation operation = this.CreateItemBatchOperation(); - operation.AttachContext(new ItemBatchOperationContext(string.Empty)); + operation.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton)); Assert.IsTrue(batchAsyncBatcher.TryAdd(operation)); await batchAsyncBatcher.DispatchAsync(metric); Assert.IsFalse(batchAsyncBatcher.TryAdd(this.CreateItemBatchOperation())); @@ -517,8 +517,8 @@ public async Task RetrierGetsCalledOnSplit() ItemBatchOperation operation1 = this.CreateItemBatchOperation(); ItemBatchOperation operation2 = this.CreateItemBatchOperation(); - operation1.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy1)); - operation2.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy2)); + operation1.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy1)); + operation2.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy2)); Mock retryDelegate = new Mock(); @@ -526,9 +526,9 @@ public async Task RetrierGetsCalledOnSplit() Assert.IsTrue(batchAsyncBatcher.TryAdd(operation1)); Assert.IsTrue(batchAsyncBatcher.TryAdd(operation2)); await batchAsyncBatcher.DispatchAsync(metric); - retryDelegate.Verify(a => a(It.Is(o => o == operation1), It.IsAny(), It.IsAny()), Times.Once); - retryDelegate.Verify(a => a(It.Is(o => o == operation2), It.IsAny(), It.IsAny()), Times.Once); - retryDelegate.Verify(a => a(It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(2)); + retryDelegate.Verify(a => a(It.Is(o => o == operation1), It.IsAny()), Times.Once); + retryDelegate.Verify(a => a(It.Is(o => o == operation2), It.IsAny()), Times.Once); + retryDelegate.Verify(a => a(It.IsAny(), It.IsAny()), Times.Exactly(2)); } [TestMethod] @@ -546,8 +546,8 @@ public async Task RetrierGetsCalledOnCompletingSplit() ItemBatchOperation operation1 = this.CreateItemBatchOperation(); ItemBatchOperation operation2 = this.CreateItemBatchOperation(); - operation1.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy1)); - operation2.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy2)); + operation1.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy1)); + operation2.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy2)); Mock retryDelegate = new Mock(); @@ -555,9 +555,9 @@ public async Task RetrierGetsCalledOnCompletingSplit() Assert.IsTrue(batchAsyncBatcher.TryAdd(operation1)); Assert.IsTrue(batchAsyncBatcher.TryAdd(operation2)); await batchAsyncBatcher.DispatchAsync(metric); - retryDelegate.Verify(a => a(It.Is(o => o == operation1), It.IsAny(), It.IsAny()), Times.Once); - retryDelegate.Verify(a => a(It.Is(o => o == operation2), It.IsAny(), It.IsAny()), Times.Once); - retryDelegate.Verify(a => a(It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(2)); + retryDelegate.Verify(a => a(It.Is(o => o == operation1), It.IsAny()), Times.Once); + retryDelegate.Verify(a => a(It.Is(o => o == operation2), It.IsAny()), Times.Once); + retryDelegate.Verify(a => a(It.IsAny(), It.IsAny()), Times.Exactly(2)); } [TestMethod] @@ -575,8 +575,8 @@ public async Task RetrierGetsCalledOnCompletingPartitionMigration() ItemBatchOperation operation1 = this.CreateItemBatchOperation(); ItemBatchOperation operation2 = this.CreateItemBatchOperation(); - operation1.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy1)); - operation2.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy2)); + operation1.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy1)); + operation2.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy2)); Mock retryDelegate = new Mock(); @@ -584,9 +584,9 @@ public async Task RetrierGetsCalledOnCompletingPartitionMigration() Assert.IsTrue(batchAsyncBatcher.TryAdd(operation1)); Assert.IsTrue(batchAsyncBatcher.TryAdd(operation2)); await batchAsyncBatcher.DispatchAsync(metric); - retryDelegate.Verify(a => a(It.Is(o => o == operation1), It.IsAny(), It.IsAny()), Times.Once); - retryDelegate.Verify(a => a(It.Is(o => o == operation2), It.IsAny(), It.IsAny()), Times.Once); - retryDelegate.Verify(a => a(It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(2)); + retryDelegate.Verify(a => a(It.Is(o => o == operation1), It.IsAny()), Times.Once); + retryDelegate.Verify(a => a(It.Is(o => o == operation2), It.IsAny()), Times.Once); + retryDelegate.Verify(a => a(It.IsAny(), It.IsAny()), Times.Exactly(2)); } [TestMethod] @@ -594,8 +594,8 @@ public async Task RetrierGetsCalledOnOverFlow() { ItemBatchOperation operation1 = this.CreateItemBatchOperation(); ItemBatchOperation operation2 = this.CreateItemBatchOperation(); - operation1.AttachContext(new ItemBatchOperationContext(string.Empty)); - operation2.AttachContext(new ItemBatchOperationContext(string.Empty)); + operation1.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton)); + operation2.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton)); Mock retryDelegate = new Mock(); Mock executeDelegate = new Mock(); @@ -604,9 +604,9 @@ public async Task RetrierGetsCalledOnOverFlow() Assert.IsTrue(batchAsyncBatcher.TryAdd(operation1)); Assert.IsTrue(batchAsyncBatcher.TryAdd(operation2)); await batchAsyncBatcher.DispatchAsync(metric); - retryDelegate.Verify(a => a(It.Is(o => o == operation1), It.IsAny(), It.IsAny()), Times.Never); - retryDelegate.Verify(a => a(It.Is(o => o == operation2), It.IsAny(), It.IsAny()), Times.Once); - retryDelegate.Verify(a => a(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + retryDelegate.Verify(a => a(It.Is(o => o == operation1), It.IsAny()), Times.Never); + retryDelegate.Verify(a => a(It.Is(o => o == operation2), It.IsAny()), Times.Once); + retryDelegate.Verify(a => a(It.IsAny(), It.IsAny()), Times.Once); } [TestMethod] @@ -624,8 +624,8 @@ public async Task RetrierGetsCalledOn413_OnRead() ItemBatchOperation operation1 = this.CreateItemBatchOperation(); ItemBatchOperation operation2 = this.CreateItemBatchOperation(); - operation1.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy1)); - operation2.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy2)); + operation1.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy1)); + operation2.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy2)); Mock retryDelegate = new Mock(); @@ -633,9 +633,9 @@ public async Task RetrierGetsCalledOn413_OnRead() Assert.IsTrue(batchAsyncBatcher.TryAdd(operation1)); Assert.IsTrue(batchAsyncBatcher.TryAdd(operation2)); await batchAsyncBatcher.DispatchAsync(metric); - retryDelegate.Verify(a => a(It.Is(o => o == operation1), It.IsAny(), It.IsAny()), Times.Never); - retryDelegate.Verify(a => a(It.Is(o => o == operation2), It.IsAny(), It.IsAny()), Times.Once); - retryDelegate.Verify(a => a(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + retryDelegate.Verify(a => a(It.Is(o => o == operation1), It.IsAny()), Times.Never); + retryDelegate.Verify(a => a(It.Is(o => o == operation2), It.IsAny()), Times.Once); + retryDelegate.Verify(a => a(It.IsAny(), It.IsAny()), Times.Once); } [TestMethod] @@ -653,8 +653,8 @@ public async Task RetrierGetsCalledOn413_OnWrite() ItemBatchOperation operation1 = this.CreateItemBatchOperation(); ItemBatchOperation operation2 = this.CreateItemBatchOperation(); - operation1.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy1)); - operation2.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy2)); + operation1.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy1)); + operation2.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy2)); Mock retryDelegate = new Mock(); @@ -662,9 +662,9 @@ public async Task RetrierGetsCalledOn413_OnWrite() Assert.IsTrue(batchAsyncBatcher.TryAdd(operation1)); Assert.IsTrue(batchAsyncBatcher.TryAdd(operation2)); await batchAsyncBatcher.DispatchAsync(metric); - retryDelegate.Verify(a => a(It.Is(o => o == operation1), It.IsAny(), It.IsAny()), Times.Never); - retryDelegate.Verify(a => a(It.Is(o => o == operation2), It.IsAny(), It.IsAny()), Times.Never); - retryDelegate.Verify(a => a(It.IsAny(), It.IsAny(), It.IsAny()), Times.Never); + retryDelegate.Verify(a => a(It.Is(o => o == operation1), It.IsAny()), Times.Never); + retryDelegate.Verify(a => a(It.Is(o => o == operation2), It.IsAny()), Times.Never); + retryDelegate.Verify(a => a(It.IsAny(), It.IsAny()), Times.Never); } private static ContainerInternal GetSplitEnabledContainer() diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs index 227426f14d..96154f65f6 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs @@ -64,7 +64,7 @@ public async Task RetryOnSplit() string.Empty); mockContainer.Setup(x => x.GetRoutingMapAsync(It.IsAny())).Returns(Task.FromResult(routingMap)); BatchAsyncContainerExecutor executor = new BatchAsyncContainerExecutor(mockContainer.Object, mockedContext.Object, 20, BatchAsyncContainerExecutorCache.DefaultMaxBulkRequestBodySizeInBytes); - TransactionalBatchOperationResult result = await executor.AddAsync(itemBatchOperation); + TransactionalBatchOperationResult result = await executor.AddAsync(itemBatchOperation, NoOpTrace.Singleton); Mock.Get(mockContainer.Object) .Verify(x => x.GetCachedContainerPropertiesAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(2)); @@ -122,7 +122,7 @@ public async Task RetryOnNameStale() string.Empty); mockContainer.Setup(x => x.GetRoutingMapAsync(It.IsAny())).Returns(Task.FromResult(routingMap)); BatchAsyncContainerExecutor executor = new BatchAsyncContainerExecutor(mockContainer.Object, mockedContext.Object, 20, BatchAsyncContainerExecutorCache.DefaultMaxBulkRequestBodySizeInBytes); - TransactionalBatchOperationResult result = await executor.AddAsync(itemBatchOperation); + TransactionalBatchOperationResult result = await executor.AddAsync(itemBatchOperation, NoOpTrace.Singleton); Mock.Get(mockContainer.Object) .Verify(x => x.GetCachedContainerPropertiesAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(2)); @@ -180,7 +180,7 @@ public async Task RetryOn429() string.Empty); mockContainer.Setup(x => x.GetRoutingMapAsync(It.IsAny())).Returns(Task.FromResult(routingMap)); BatchAsyncContainerExecutor executor = new BatchAsyncContainerExecutor(mockContainer.Object, mockedContext.Object, 20, BatchAsyncContainerExecutorCache.DefaultMaxBulkRequestBodySizeInBytes); - TransactionalBatchOperationResult result = await executor.AddAsync(itemBatchOperation); + TransactionalBatchOperationResult result = await executor.AddAsync(itemBatchOperation, NoOpTrace.Singleton); Mock.Get(mockContainer.Object) .Verify(x => x.GetCachedContainerPropertiesAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(2)); @@ -237,7 +237,7 @@ public async Task DoesNotRecalculatePartitionKeyRangeOnNoSplits() string.Empty); mockContainer.Setup(x => x.GetRoutingMapAsync(It.IsAny())).Returns(Task.FromResult(routingMap)); BatchAsyncContainerExecutor executor = new BatchAsyncContainerExecutor(mockContainer.Object, mockedContext.Object, 20, BatchAsyncContainerExecutorCache.DefaultMaxBulkRequestBodySizeInBytes); - TransactionalBatchOperationResult result = await executor.AddAsync(itemBatchOperation); + TransactionalBatchOperationResult result = await executor.AddAsync(itemBatchOperation, NoOpTrace.Singleton); Mock.Get(mockContainer.Object) .Verify(x => x.GetCachedContainerPropertiesAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs index cc01ad6cce..3f4abb694e 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs @@ -18,12 +18,93 @@ namespace Microsoft.Azure.Cosmos.Tests [TestClass] public class BatchAsyncOperationContextTests { + [TestMethod] + public async Task TraceIsJoinedOnCompletionWithRetry() + { + IDocumentClientRetryPolicy retryPolicy = new BulkExecutionRetryPolicy( + Mock.Of(), + OperationType.Read, + new ResourceThrottleRetryPolicy(1)); + + Trace rootTrace = Trace.GetRootTrace(name: "RootTrace"); + + ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); + + // Start with the base trace + ItemBatchOperationContext batchAsyncOperationContext = new ItemBatchOperationContext(Guid.NewGuid().ToString(), rootTrace, retryPolicy); + operation.AttachContext(batchAsyncOperationContext); + + // Simulate a retry scenario that should append to the context traces + Trace retryTrace = Trace.GetRootTrace(name: "TransportTrace"); + TransactionalBatchOperationResult retryResult = new TransactionalBatchOperationResult(HttpStatusCode.TooManyRequests) + { + Trace = retryTrace + }; + ShouldRetryResult shouldRetryResult = await batchAsyncOperationContext.ShouldRetryAsync(retryResult, default); + Assert.IsTrue(shouldRetryResult.ShouldRetry); + + // Simulate the completion that should append to the context traces + Trace transportTrace = Trace.GetRootTrace(name: "TransportTrace"); + TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.OK) + { + Trace = transportTrace + }; + + batchAsyncOperationContext.Complete(null, result); + + Assert.AreEqual(result, await batchAsyncOperationContext.OperationTask); + Assert.AreEqual(2, result.Trace.Children.Count, "The final trace should have the initial trace, plus the retries, plus the final trace"); + Assert.AreEqual(rootTrace, result.Trace, "The first trace child should be the initial root"); + Assert.AreEqual(retryTrace, result.Trace.Children[0], "The second trace child should be the one from the retry"); + Assert.AreEqual(transportTrace, result.Trace.Children[1], "The third trace child should be the one from the final result"); + } + + [TestMethod] + public async Task TraceIsJoinedOnCompletionWithoutRetry() + { + IDocumentClientRetryPolicy retryPolicy = new BulkExecutionRetryPolicy( + Mock.Of(), + OperationType.Read, + new ResourceThrottleRetryPolicy(1)); + + Trace rootTrace = Trace.GetRootTrace(name: "RootTrace"); + + ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); + + // Start with the base trace + ItemBatchOperationContext batchAsyncOperationContext = new ItemBatchOperationContext(Guid.NewGuid().ToString(), rootTrace, retryPolicy); + operation.AttachContext(batchAsyncOperationContext); + + // Simulate a retry scenario that should not append to the context traces + Trace retryTrace = Trace.GetRootTrace(name: "TransportTrace"); + TransactionalBatchOperationResult retryResult = new TransactionalBatchOperationResult(HttpStatusCode.Forbidden) + { + Trace = retryTrace + }; + ShouldRetryResult shouldRetryResult = await batchAsyncOperationContext.ShouldRetryAsync(retryResult, default); + Assert.IsFalse(shouldRetryResult.ShouldRetry); + + // Simulate the completion that should append to the context traces + Trace transportTrace = Trace.GetRootTrace(name: "TransportTrace"); + TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.OK) + { + Trace = transportTrace + }; + + batchAsyncOperationContext.Complete(null, result); + + Assert.AreEqual(result, await batchAsyncOperationContext.OperationTask); + Assert.AreEqual(1, result.Trace.Children.Count, "The final trace should have the initial trace, plus the final trace, since the result is not retried, it should not capture it"); + Assert.AreEqual(rootTrace, result.Trace, "The first trace child should be the initial root"); + Assert.AreEqual(transportTrace, result.Trace.Children[0], "The second trace child should be the one from the final result"); + } + [TestMethod] public void PartitionKeyRangeIdIsSetOnInitialization() { string expectedPkRangeId = Guid.NewGuid().ToString(); ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); - ItemBatchOperationContext batchAsyncOperationContext = new ItemBatchOperationContext(expectedPkRangeId); + ItemBatchOperationContext batchAsyncOperationContext = new ItemBatchOperationContext(expectedPkRangeId, NoOpTrace.Singleton); operation.AttachContext(batchAsyncOperationContext); Assert.IsNotNull(batchAsyncOperationContext.OperationTask); @@ -36,7 +117,7 @@ public void PartitionKeyRangeIdIsSetOnInitialization() public void TaskIsCreatedOnInitialization() { ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); - ItemBatchOperationContext batchAsyncOperationContext = new ItemBatchOperationContext(string.Empty); + ItemBatchOperationContext batchAsyncOperationContext = new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton); operation.AttachContext(batchAsyncOperationContext); Assert.IsNotNull(batchAsyncOperationContext.OperationTask); @@ -48,7 +129,7 @@ public void TaskIsCreatedOnInitialization() public async Task TaskResultIsSetOnCompleteAsync() { ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); - ItemBatchOperationContext batchAsyncOperationContext = new ItemBatchOperationContext(string.Empty); + ItemBatchOperationContext batchAsyncOperationContext = new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton); operation.AttachContext(batchAsyncOperationContext); TransactionalBatchOperationResult expected = new TransactionalBatchOperationResult(HttpStatusCode.OK); @@ -64,7 +145,7 @@ public async Task ExceptionIsSetOnFailAsync() { Exception failure = new Exception("It failed"); ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); - ItemBatchOperationContext batchAsyncOperationContext = new ItemBatchOperationContext(string.Empty); + ItemBatchOperationContext batchAsyncOperationContext = new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton); operation.AttachContext(batchAsyncOperationContext); batchAsyncOperationContext.Fail(null, failure); @@ -78,8 +159,8 @@ public async Task ExceptionIsSetOnFailAsync() public void CannotAttachMoreThanOnce() { ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); - operation.AttachContext(new ItemBatchOperationContext(string.Empty)); - Assert.ThrowsException(() => operation.AttachContext(new ItemBatchOperationContext(string.Empty))); + operation.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton)); + Assert.ThrowsException(() => operation.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton))); } [TestMethod] @@ -87,7 +168,7 @@ public async Task ShouldRetry_NoPolicy() { TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.OK); ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); - operation.AttachContext(new ItemBatchOperationContext(string.Empty)); + operation.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton)); ShouldRetryResult shouldRetryResult = await operation.Context.ShouldRetryAsync(result, default); Assert.IsFalse(shouldRetryResult.ShouldRetry); } @@ -101,7 +182,7 @@ public async Task ShouldRetry_WithPolicy_OnSuccess() new ResourceThrottleRetryPolicy(1)); TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.OK); ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); - operation.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy)); + operation.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy)); ShouldRetryResult shouldRetryResult = await operation.Context.ShouldRetryAsync(result, default); Assert.IsFalse(shouldRetryResult.ShouldRetry); } @@ -115,7 +196,7 @@ public async Task ShouldRetry_WithPolicy_On429() new ResourceThrottleRetryPolicy(1)); TransactionalBatchOperationResult result = new TransactionalBatchOperationResult((HttpStatusCode)StatusCodes.TooManyRequests); ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); - operation.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy)); + operation.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy)); ShouldRetryResult shouldRetryResult = await operation.Context.ShouldRetryAsync(result, default); Assert.IsTrue(shouldRetryResult.ShouldRetry); } @@ -129,7 +210,7 @@ public async Task ShouldRetry_WithPolicy_On413_OnRead() new ResourceThrottleRetryPolicy(1)); TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.RequestEntityTooLarge); ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); - operation.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy)); + operation.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy)); ShouldRetryResult shouldRetryResult = await operation.Context.ShouldRetryAsync(result, default); Assert.IsTrue(shouldRetryResult.ShouldRetry); } @@ -143,7 +224,7 @@ public async Task ShouldRetry_WithPolicy_On413_OnWrite() new ResourceThrottleRetryPolicy(1)); TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.RequestEntityTooLarge); ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); - operation.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy)); + operation.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy)); ShouldRetryResult shouldRetryResult = await operation.Context.ShouldRetryAsync(result, default); Assert.IsFalse(shouldRetryResult.ShouldRetry); } @@ -157,7 +238,7 @@ public async Task ShouldRetry_WithPolicy_OnSplit() new ResourceThrottleRetryPolicy(1)); TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.Gone) { SubStatusCode = SubStatusCodes.PartitionKeyRangeGone }; ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); - operation.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy)); + operation.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy)); ShouldRetryResult shouldRetryResult = await operation.Context.ShouldRetryAsync(result, default); Assert.IsTrue(shouldRetryResult.ShouldRetry); } @@ -171,7 +252,7 @@ public async Task ShouldRetry_WithPolicy_OnCompletingSplit() new ResourceThrottleRetryPolicy(1)); TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.Gone) { SubStatusCode = SubStatusCodes.CompletingSplit }; ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); - operation.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy)); + operation.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy)); ShouldRetryResult shouldRetryResult = await operation.Context.ShouldRetryAsync(result, default); Assert.IsTrue(shouldRetryResult.ShouldRetry); } @@ -185,7 +266,7 @@ public async Task ShouldRetry_WithPolicy_OnCompletingPartitionMigration() new ResourceThrottleRetryPolicy(1)); TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.Gone) { SubStatusCode = SubStatusCodes.CompletingPartitionMigration }; ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); - operation.AttachContext(new ItemBatchOperationContext(string.Empty, retryPolicy)); + operation.AttachContext(new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton, retryPolicy)); ShouldRetryResult shouldRetryResult = await operation.Context.ShouldRetryAsync(result, default); Assert.IsTrue(shouldRetryResult.ShouldRetry); } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncStreamerTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncStreamerTests.cs index 00cf2bb543..408b0ce172 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncStreamerTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncStreamerTests.cs @@ -68,7 +68,7 @@ private readonly BatchAsyncBatcherExecuteDelegate Executor private readonly BatchAsyncBatcherExecuteDelegate ExecutorWithFailure = (PartitionKeyRangeServerBatchRequest request, ITrace trace, CancellationToken cancellationToken) => throw expectedException; - private readonly BatchAsyncBatcherRetryDelegate Retrier = (ItemBatchOperation operation, ITrace trace, CancellationToken cancellation) => Task.CompletedTask; + private readonly BatchAsyncBatcherRetryDelegate Retrier = (ItemBatchOperation operation, CancellationToken cancellation) => Task.CompletedTask; [DataTestMethod] [ExpectedException(typeof(ArgumentOutOfRangeException))] @@ -190,7 +190,7 @@ public async Task DispatchesAsync() private static ItemBatchOperationContext AttachContext(ItemBatchOperation operation) { - ItemBatchOperationContext context = new ItemBatchOperationContext(string.Empty); + ItemBatchOperationContext context = new ItemBatchOperationContext(string.Empty, NoOpTrace.Singleton); operation.AttachContext(context); return context; } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosItemUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosItemUnitTests.cs index b67f8484dd..b1aedb0fce 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosItemUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosItemUnitTests.cs @@ -201,7 +201,7 @@ public async Task AllowBatchingRequestsSendsToExecutor_CreateStream() partitionKey: partitionKey, streamPayload: itemStream)) { - mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); } } } @@ -225,7 +225,7 @@ public async Task AllowBatchingRequestsSendsToExecutor_UpsertStream() partitionKey: partitionKey, streamPayload: itemStream)) { - mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); } } } @@ -250,7 +250,7 @@ public async Task AllowBatchingRequestsSendsToExecutor_ReplaceStream() id: testItem.id, streamPayload: itemStream)) { - mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); } } } @@ -274,7 +274,7 @@ public async Task AllowBatchingRequestsSendsToExecutor_ReadStream() partitionKey: partitionKey, id: testItem.id)) { - mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); } } } @@ -296,7 +296,7 @@ public async Task AllowBatchingRequestsSendsToExecutor_DeleteStream() partitionKey: partitionKey, id: testItem.id)) { - mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); } } @@ -323,7 +323,7 @@ public async Task AllowBatchingRequestsSendsToExecutor_PatchStream() id: testItem.id, patchOperations: patch)) { - mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); } } @@ -342,7 +342,7 @@ public async Task AllowBatchingRequestsSendsToExecutor_Create() ItemResponse response = await container.CreateItemAsync( testItem, partitionKey: partitionKey); - mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); } [TestMethod] @@ -360,7 +360,7 @@ public async Task AllowBatchingRequestsSendsToExecutor_Upsert() ItemResponse response = await container.UpsertItemAsync( testItem, partitionKey: partitionKey); - mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); } [TestMethod] @@ -378,7 +378,7 @@ public async Task AllowBatchingRequestsSendsToExecutor_Replace() ItemResponse response = await container.ReplaceItemAsync( testItem, testItem.id); - mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); } [TestMethod] @@ -396,7 +396,7 @@ public async Task AllowBatchingRequestsSendsToExecutor_Read() ItemResponse response = await container.ReadItemAsync( id: testItem.id, partitionKey: partitionKey); - mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); } [TestMethod] @@ -415,7 +415,7 @@ public async Task AllowBatchingRequestsSendsToExecutor_Delete() partitionKey: partitionKey, id: testItem.id); - mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); } [TestMethod] @@ -440,7 +440,7 @@ public async Task AllowBatchingRequestsSendsToExecutor_Patch() partitionKey, patch); - mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + mockedExecutor.Verify(c => c.AddAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); } [TestMethod] @@ -928,7 +928,7 @@ private Mock GetMockedBatchExcecutor() Mock mockedExecutor = new Mock(); mockedExecutor - .Setup(e => e.AddAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Setup(e => e.AddAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .ReturnsAsync(new TransactionalBatchOperationResult(HttpStatusCode.OK) { }); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Tracing/TraceTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Tracing/TraceTests.cs index c72eaad251..c6195deec8 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Tracing/TraceTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Tracing/TraceTests.cs @@ -37,6 +37,23 @@ public void TestRootTrace() Assert.IsTrue(rootTrace.Duration > TimeSpan.Zero); } + [TestMethod] + public void TestAddChild() + { + Trace oneChild = Trace.GetRootTrace(name: "OneChild"); + Trace twoChild = Trace.GetRootTrace(name: "TwoChild"); + Trace rootTrace; + using (rootTrace = Trace.GetRootTrace(name: "RootTrace")) + { + rootTrace.AddChild(oneChild); + rootTrace.AddChild(twoChild); + } + + Assert.AreEqual(2, rootTrace.Children.Count); + Assert.AreEqual(oneChild, rootTrace.Children[0]); + Assert.AreEqual(twoChild, rootTrace.Children[1]); + } + [TestMethod] public void TestTraceChildren() { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Tracing/TraceWriterBaselineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Tracing/TraceWriterBaselineTests.cs index 7901bd162a..d0227143f1 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Tracing/TraceWriterBaselineTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Tracing/TraceWriterBaselineTests.cs @@ -822,7 +822,7 @@ public override void SerializeAsXml(XmlWriter xmlWriter) private sealed class TraceForBaselineTesting : ITrace { private readonly Dictionary data; - private readonly List children; + private readonly List children; public TraceForBaselineTesting( string name, @@ -834,7 +834,7 @@ public TraceForBaselineTesting( this.Level = level; this.Component = component; this.Parent = parent; - this.children = new List(); + this.children = new List(); this.data = new Dictionary(); } @@ -880,10 +880,15 @@ public ITrace StartChild(string name, [CallerMemberName] string memberName = "", public ITrace StartChild(string name, TraceComponent component, TraceLevel level, [CallerMemberName] string memberName = "", [CallerFilePath] string sourceFilePath = "", [CallerLineNumber] int sourceLineNumber = 0) { TraceForBaselineTesting child = new TraceForBaselineTesting(name, level, component, parent: this); - this.children.Add(child); + this.AddChild(child); return child; } + public void AddChild(ITrace trace) + { + this.children.Add(trace); + } + public static TraceForBaselineTesting GetRootTrace() { return new TraceForBaselineTesting("Trace For Baseline Testing", TraceLevel.Info, TraceComponent.Unknown, parent: null);