Skip to content

Commit

Permalink
CosmosDiagnostics: Fixes concurrent operations (#2116)
Browse files Browse the repository at this point in the history
This makes the CosmosDiagnostics thread safe by adding a lock to adding information to the diagnostic list. Benchmarks showed no changes from adding the lock which is likely because most of the diagnostics is not done concurrently.
  • Loading branch information
j82w authored Jan 12, 2021
1 parent bd3f1f1 commit 2b82875
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@ namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Diagnostics;
using System.Linq;
using System.Security.Policy;
using System.Threading;
using Microsoft.Azure.Cosmos.Diagnostics;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Rntbd;

/// <summary>
/// This represents the core diagnostics object used in the SDK.
Expand Down Expand Up @@ -105,14 +101,16 @@ internal override IDisposable CreateScope(string name)
{
CosmosDiagnosticScope scope = new CosmosDiagnosticScope(name);

this.ContextList.Add(scope);
this.AddToContextList(scope);

return scope;
}

internal override IDisposable CreateRequestHandlerScopeScope(RequestHandler requestHandler)
{
RequestHandlerScope requestHandlerScope = new RequestHandlerScope(requestHandler);
this.ContextList.Add(requestHandlerScope);
this.AddToContextList(requestHandlerScope);

return requestHandlerScope;
}

Expand All @@ -123,7 +121,7 @@ internal override void AddDiagnosticsInternal(CosmosSystemInfo processInfo)
throw new ArgumentNullException(nameof(processInfo));
}

this.ContextList.Add(processInfo);
this.AddToContextList(processInfo);
}

internal override void AddDiagnosticsInternal(PointOperationStatistics pointOperationStatistics)
Expand All @@ -134,8 +132,7 @@ internal override void AddDiagnosticsInternal(PointOperationStatistics pointOper
}

this.AddResponseCount((int)pointOperationStatistics.StatusCode);

this.ContextList.Add(pointOperationStatistics);
this.AddToContextList(pointOperationStatistics);
}

internal override void AddDiagnosticsInternal(StoreResponseStatistics storeResponseStatistics)
Expand All @@ -145,22 +142,22 @@ internal override void AddDiagnosticsInternal(StoreResponseStatistics storeRespo
this.AddResponseCount((int)storeResponseStatistics.StoreResult.StatusCode);
}

this.ContextList.Add(storeResponseStatistics);
this.AddToContextList(storeResponseStatistics);
}

internal override void AddDiagnosticsInternal(AddressResolutionStatistics addressResolutionStatistics)
{
this.ContextList.Add(addressResolutionStatistics);
this.AddToContextList(addressResolutionStatistics);
}

internal override void AddDiagnosticsInternal(CosmosClientSideRequestStatistics clientSideRequestStatistics)
{
this.ContextList.Add(clientSideRequestStatistics);
this.AddToContextList(clientSideRequestStatistics);
}

internal override void AddDiagnosticsInternal(FeedRangeStatistics feedRangeStatistics)
{
this.ContextList.Add(feedRangeStatistics);
this.AddToContextList(feedRangeStatistics);
}

internal override void AddDiagnosticsInternal(QueryPageDiagnostics queryPageDiagnostics)
Expand All @@ -175,14 +172,14 @@ internal override void AddDiagnosticsInternal(QueryPageDiagnostics queryPageDiag
this.AddSummaryInfo(queryPageDiagnostics.DiagnosticsContext);
}

this.ContextList.Add(queryPageDiagnostics);
this.AddToContextList(queryPageDiagnostics);
}

internal override void AddDiagnosticsInternal(CosmosDiagnosticsContext newContext)
{
this.AddSummaryInfo(newContext);

this.ContextList.Add(newContext);
this.AddToContextList(newContext);
}

public override void Accept(CosmosDiagnosticsInternalVisitor cosmosDiagnosticsInternalVisitor)
Expand All @@ -200,9 +197,20 @@ public override IEnumerator<CosmosDiagnosticsInternal> GetEnumerator()
// Using a for loop with a yield prevents Issue #1467 which causes
// ThrowInvalidOperationException if a new diagnostics is getting added
// while the enumerator is being used.
for (int i = 0; i < this.ContextList.Count; i++)
lock (this.ContextList)
{
for (int i = 0; i < this.ContextList.Count; i++)
{
yield return this.ContextList[i];
}
}
}

private void AddToContextList(CosmosDiagnosticsInternal cosmosDiagnosticsInternal)
{
lock (this.ContextList)
{
yield return this.ContextList[i];
this.ContextList.Add(cosmosDiagnosticsInternal);
}
}

Expand All @@ -211,12 +219,12 @@ private void AddResponseCount(int statusCode)
this.totalResponseCount++;
if (statusCode < 200 || statusCode > 299)
{
this.failedResponseCount++;
Interlocked.Increment(ref this.failedResponseCount);
}

if (statusCode == (int)StatusCodes.TooManyRequests || statusCode == (int)StatusCodes.RetryWith)
{
this.retriableResponseCount++;
Interlocked.Increment(ref this.retriableResponseCount);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
namespace Microsoft.Azure.Cosmos.Tests
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Runtime.InteropServices.ComTypes;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
Expand Down Expand Up @@ -269,6 +272,70 @@ public void ValidateDiagnosticsAppendContext()
Assert.IsTrue(diagnostics.Contains("CosmosDiagnostics2Scope"));
}

[TestMethod]
public void ValidateDiagnosticsAppendContextConcurrentCalls()
{
int threadCount = 10;
int itemCountPerThread = 100000;
ConcurrentStack<Exception> concurrentStack = new ConcurrentStack<Exception>();
CosmosDiagnosticsContext cosmosDiagnostics = new CosmosDiagnosticsContextCore(
nameof(ValidateDiagnosticsAppendContext),
"MyCustomUserAgentString");
using (cosmosDiagnostics.GetOverallScope())
{
// Test all the different operations on diagnostics context
using (cosmosDiagnostics.CreateScope("ValidateScope"))
{
Thread.Sleep(TimeSpan.FromSeconds(2));
}

List<Thread> threads = new List<Thread>(threadCount);
for (int i = 0; i < threadCount; i++)
{
Thread thread = new Thread(() =>
this.AddDiagnosticsInBackgroundLoop(
itemCountPerThread,
cosmosDiagnostics,
concurrentStack));
thread.Start();
threads.Add(thread);
}

foreach (Thread thread in threads)
{
thread.Join();
}
}

Assert.AreEqual(0, concurrentStack.Count, $"Exceptions count: {concurrentStack.Count} Exceptions: {string.Join(';', concurrentStack)}");
int count = cosmosDiagnostics.Count();
Assert.AreEqual((threadCount * itemCountPerThread) + 1, count);
}

private void AddDiagnosticsInBackgroundLoop(
int count,
CosmosDiagnosticsContext cosmosDiagnostics,
ConcurrentStack<Exception> concurrentStack)
{
CosmosDiagnosticsContext cosmosDiagnostics2 = new CosmosDiagnosticsContextCore(
nameof(ValidateDiagnosticsAppendContext),
"MyCustomUserAgentString");
Random random = new Random();
cosmosDiagnostics2.GetOverallScope().Dispose();

for (int i = 0; i < count; i++)
{
try
{
cosmosDiagnostics.AddDiagnosticsInternal(cosmosDiagnostics2);
}
catch (Exception e)
{
concurrentStack.Append(e);
}
}
}

[TestMethod]
public void ValidateClientSideRequestStatisticsToString()
{
Expand Down

0 comments on commit 2b82875

Please sign in to comment.