Skip to content

Commit

Permalink
Client Telemetry: Fixes consistency level issue (Azure#2758)
Browse files Browse the repository at this point in the history
1. Right now, if user is not sending consistency level in the request but sending request option with other configuration. Telemetry is logging Consistency level as "Strong" which is wrong. So fixing it as part of this PR.
2. Optimizing getting consistency level information in telemetry by removing blocking call.
3. Add consistency related tests in client telemetry.
4. Making sure that if user is not passing consistency level (in request or client), it will be stored as NULL in telemetry data.
  • Loading branch information
sourabh1007 authored Sep 30, 2021
1 parent a93d30f commit 80651dd
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 57 deletions.
7 changes: 1 addition & 6 deletions Microsoft.Azure.Cosmos/src/Handler/TelemetryHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public override async Task<ResponseMessage> SendAsync(
databaseId: request.DatabaseId,
operationType: request.OperationType,
resourceType: request.ResourceType,
consistencyLevel: this.GetConsistencyLevel(request),
consistencyLevel: request.Headers?[Documents.HttpConstants.HttpHeaders.ConsistencyLevel],
requestCharge: response.Headers.RequestCharge);
}
catch (Exception ex)
Expand All @@ -54,11 +54,6 @@ private bool IsAllowed(RequestMessage request)
return ClientTelemetryOptions.AllowedResourceTypes.Equals(request.ResourceType);
}

private ConsistencyLevel? GetConsistencyLevel(RequestMessage request)
{
return request.RequestOptions?.BaseConsistencyLevel.GetValueOrDefault();
}

/// <summary>
/// It returns the payload size after reading it from the Response content stream.
/// To avoid blocking IO calls to get the stream length, it will return response content length if stream is of Memory Type
Expand Down
8 changes: 1 addition & 7 deletions Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ internal void Collect(CosmosDiagnostics cosmosDiagnostics,
string databaseId,
OperationType operationType,
ResourceType resourceType,
Cosmos.ConsistencyLevel? consistencyLevel,
string consistencyLevel,
double requestCharge)
{
DefaultTrace.TraceVerbose("Collecting Operation data for Telemetry.");
Expand All @@ -199,12 +199,6 @@ internal void Collect(CosmosDiagnostics cosmosDiagnostics,

string regionsContacted = this.GetContactedRegions(cosmosDiagnostics);

// If consistency level is not mentioned in request then take the sdk/account level
if (consistencyLevel == null)
{
consistencyLevel = (Cosmos.ConsistencyLevel)this.documentClient.ConsistencyLevel;
}

// Recording Request Latency and Request Charge
OperationInfo payloadKey = new OperationInfo(regionsContacted: regionsContacted?.ToString(),
responseSizeInBytes: responseSizeInBytes,
Expand Down
38 changes: 10 additions & 28 deletions Microsoft.Azure.Cosmos/src/Telemetry/OperationInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ internal OperationInfo(string metricsName, string unitName)

internal OperationInfo(string regionsContacted,
long? responseSizeInBytes,
Cosmos.ConsistencyLevel? consistency,
string consistency,
string databaseName,
string containerName,
OperationType? operation,
Expand All @@ -62,7 +62,7 @@ internal OperationInfo(string regionsContacted,
{
this.GreaterThan1Kb = responseSizeInBytes > ClientTelemetryOptions.OneKbToBytes;
}
this.Consistency = OperationInfo.GetConsistencyString(consistency);
this.Consistency = consistency;
this.DatabaseName = databaseName;
this.ContainerName = containerName;
this.Operation = operation?.ToOperationTypeString();
Expand Down Expand Up @@ -93,24 +93,6 @@ public OperationInfo(string regionsContacted,
this.MetricInfo = metricInfo;
}

private static string GetConsistencyString(Cosmos.ConsistencyLevel? consistency)
{
if (consistency == null)
{
return null;
}

return consistency switch
{
Cosmos.ConsistencyLevel.Strong => "STRONG",
Cosmos.ConsistencyLevel.Session => "SESSION",
Cosmos.ConsistencyLevel.Eventual => "EVENTUAL",
Cosmos.ConsistencyLevel.ConsistentPrefix => "CONSISTENTPREFIX",
Cosmos.ConsistencyLevel.BoundedStaleness => "BOUNDEDSTALENESS",
_ => consistency.ToString().ToUpper(),
};
}

public OperationInfo Copy()
{
return new OperationInfo(this.RegionsContacted,
Expand Down Expand Up @@ -142,14 +124,14 @@ public override int GetHashCode()
public override bool Equals(object obj)
{
bool isequal = obj is OperationInfo payload &&
this.RegionsContacted != null && payload.RegionsContacted != null && this.RegionsContacted.Equals(payload.RegionsContacted) &&
this.GreaterThan1Kb != null && payload.GreaterThan1Kb != null && this.GreaterThan1Kb.Equals(payload.GreaterThan1Kb) &&
this.Consistency != null && payload.Consistency != null && this.Consistency.Equals(payload.Consistency) &&
this.DatabaseName != null && payload.DatabaseName != null && this.DatabaseName.Equals(payload.DatabaseName) &&
this.ContainerName != null && payload.ContainerName != null && this.ContainerName.Equals(payload.ContainerName) &&
this.Operation != null && payload.Operation != null && this.Operation.Equals(payload.Operation) &&
this.Resource != null && payload.Resource != null && this.Resource.Equals(payload.Resource) &&
this.StatusCode != null && payload.StatusCode != null && this.StatusCode.Equals(payload.StatusCode);
((this.RegionsContacted == null && payload.RegionsContacted == null) || (this.RegionsContacted != null && payload.RegionsContacted != null && this.RegionsContacted.Equals(payload.RegionsContacted))) &&
((this.GreaterThan1Kb == null && payload.GreaterThan1Kb == null ) || (this.GreaterThan1Kb != null && payload.GreaterThan1Kb != null && this.GreaterThan1Kb.Equals(payload.GreaterThan1Kb))) &&
((this.Consistency == null && payload.Consistency == null) || (this.Consistency != null && payload.Consistency != null && this.Consistency.Equals(payload.Consistency))) &&
((this.DatabaseName == null && payload.DatabaseName == null) || (this.DatabaseName != null && payload.DatabaseName != null && this.DatabaseName.Equals(payload.DatabaseName))) &&
((this.ContainerName == null && payload.ContainerName == null) || (this.ContainerName != null && payload.ContainerName != null && this.ContainerName.Equals(payload.ContainerName))) &&
((this.Operation == null && payload.Operation == null) || (this.Operation != null && payload.Operation != null && this.Operation.Equals(payload.Operation))) &&
((this.Resource == null && payload.Resource == null) || (this.Resource != null && payload.Resource != null && this.Resource.Equals(payload.Resource))) &&
((this.StatusCode == null && payload.StatusCode == null) || (this.StatusCode != null && payload.StatusCode != null && this.StatusCode.Equals(payload.StatusCode)));

return isequal;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,23 @@ public async Task PointReadFailureOperationsTest(ConnectionMode mode)
// Fail Read
try
{
Container container = await this.GetContainer(mode);
Container container = await this.GetContainer(mode, ConsistencyLevel.ConsistentPrefix);
await container.ReadItemAsync<JObject>(
new Guid().ToString(),
new Cosmos.PartitionKey(new Guid().ToString()));
new Cosmos.PartitionKey(new Guid().ToString()),
new ItemRequestOptions()
{
BaseConsistencyLevel = ConsistencyLevel.Eventual // overriding client level consistency
});
}
catch (CosmosException ce) when (ce.StatusCode == HttpStatusCode.NotFound)
{
string message = ce.ToString();
Assert.IsNotNull(message);
}

await this.WaitAndAssert(2);
await this.WaitAndAssert(expectedOperationCount: 2,
expectedConsistencyLevel: ConsistencyLevel.Eventual);
}

[TestMethod]
Expand All @@ -160,15 +165,20 @@ public async Task StreamReadFailureOperationsTest(ConnectionMode mode)
{
await container.ReadItemStreamAsync(
new Guid().ToString(),
new Cosmos.PartitionKey(new Guid().ToString()));
new Cosmos.PartitionKey(new Guid().ToString()),
new ItemRequestOptions()
{
BaseConsistencyLevel = ConsistencyLevel.ConsistentPrefix // Request level consistency
});
}
catch (CosmosException ce) when (ce.StatusCode == HttpStatusCode.NotFound)
{
string message = ce.ToString();
Assert.IsNotNull(message);
}

await this.WaitAndAssert(2);
await this.WaitAndAssert(expectedOperationCount: 2,
expectedConsistencyLevel: ConsistencyLevel.ConsistentPrefix);
}

[TestMethod]
Expand Down Expand Up @@ -213,7 +223,7 @@ await container
[DataRow(ConnectionMode.Gateway)]
public async Task BatchOperationsTest(ConnectionMode mode)
{
Container container = await this.GetContainer(mode);
Container container = await this.GetContainer(mode, ConsistencyLevel.Eventual); // Client level consistency
using (BatchAsyncContainerExecutor executor =
new BatchAsyncContainerExecutor(
(ContainerInlineCore)container,
Expand All @@ -231,7 +241,8 @@ public async Task BatchOperationsTest(ConnectionMode mode)
await Task.WhenAll(tasks);
}

await this.WaitAndAssert(2);
await this.WaitAndAssert(expectedOperationCount: 2,
expectedConsistencyLevel: ConsistencyLevel.Eventual);
}

[TestMethod]
Expand Down Expand Up @@ -264,7 +275,7 @@ public async Task QueryOperationTest(ConnectionMode mode)
await this.WaitAndAssert(4);
}

private async Task WaitAndAssert(int expectedOperationCount)
private async Task WaitAndAssert(int expectedOperationCount, ConsistencyLevel? expectedConsistencyLevel = null)
{
Assert.IsNotNull(this.actualInfo, "Telemetry Information not available");

Expand Down Expand Up @@ -320,7 +331,7 @@ private async Task WaitAndAssert(int expectedOperationCount)
Assert.IsNotNull(operation.Resource, "Resource Type is null");
Assert.IsNotNull(operation.ResponseSizeInBytes, "ResponseSizeInBytes is null");
Assert.IsNotNull(operation.StatusCode, "StatusCode is null");
Assert.IsNotNull(operation.Consistency, "Consistency is null");
Assert.AreEqual(expectedConsistencyLevel?.ToString(), operation.Consistency, $"Consistency is not {expectedConsistencyLevel}");

Assert.IsNotNull(operation.MetricInfo, "MetricInfo is null");
Assert.IsNotNull(operation.MetricInfo.MetricsName, "MetricsName is null");
Expand Down Expand Up @@ -352,17 +363,16 @@ private static ItemBatchOperation CreateItem(string itemId)
return new ItemBatchOperation(Documents.OperationType.Create, 0, new Cosmos.PartitionKey(itemId), itemId, TestCommon.SerializerCore.ToStream(testItem));
}

private async Task<Container> GetContainer(ConnectionMode mode)
private async Task<Container> GetContainer(ConnectionMode mode, ConsistencyLevel? consistency = null)
{
if (mode == ConnectionMode.Direct)
if (consistency.HasValue)
{
this.cosmosClient = this.cosmosClientBuilder.WithConnectionModeDirect().Build();
this.cosmosClientBuilder = this.cosmosClientBuilder.WithConsistencyLevel(consistency.Value);
}

if (mode == ConnectionMode.Gateway)
{
this.cosmosClient = this.cosmosClientBuilder.WithConnectionModeGateway().Build();
}
this.cosmosClient = mode == ConnectionMode.Gateway
? this.cosmosClientBuilder.WithConnectionModeGateway().Build()
: this.cosmosClientBuilder.Build();

this.database = await this.cosmosClient.CreateDatabaseAsync(Guid.NewGuid().ToString());
return await this.database.CreateContainerAsync(Guid.NewGuid().ToString(), "/id");
Expand Down

0 comments on commit 80651dd

Please sign in to comment.