Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AI integration: Refactor code how container and database name is flowing to opentelemetry module #3532

Merged
merged 11 commits into from
Nov 15, 2022
Prev Previous commit
Next Next commit
WIP
  • Loading branch information
sourabh1007 committed Nov 15, 2022
commit 02068f9a04da8bdb27b7c8adc5e5832fd84be964
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public override async Task<ResponseMessage> ReadNextAsync(CancellationToken canc
return await this.clientContext.OperationHelperAsync("Change Feed Iterator Read Next Async",
requestOptions: this.changeFeedRequestOptions,
task: (trace) => this.ReadNextInternalAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(
onSuccess: (response) => new OpenTelemetryResponse(
responseMessage: response,
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public override Task<ResponseMessage> ReadNextAsync(CancellationToken cancellati
return this.clientContext.OperationHelperAsync("Change Feed Processor Read Next Async",
requestOptions: this.changeFeedOptions,
task: (trace) => this.ReadNextAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(
onSuccess: (response) => new OpenTelemetryResponse(
responseMessage: response,
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public override Task<FeedResponse<ChangeFeedProcessorState>> ReadNextAsync(Cance
return this.monitoredContainer.ClientContext.OperationHelperAsync("Change Feed Estimator Read Next Async",
requestOptions: null,
task: (trace) => this.ReadNextAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse<ChangeFeedProcessorState>(
onSuccess: (response) => new OpenTelemetryResponse<ChangeFeedProcessorState>(
responseMessage: response,
containerName: this.monitoredContainer?.Id,
databaseName: this.monitoredContainer?.Database?.Id ?? this.databaseName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public override Task<ResponseMessage> DeleteAsync(
operationName: nameof(DeleteAsync),
requestOptions: null,
task: (trace) => base.DeleteAsync(conflict, partitionKey, trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(response));
onSuccess: (response) => new OpenTelemetryResponse(response));
}

public override FeedIterator GetConflictQueryStreamIterator(
Expand Down Expand Up @@ -88,7 +88,7 @@ public override Task<ItemResponse<T>> ReadCurrentAsync<T>(
operationName: nameof(ReadCurrentAsync),
requestOptions: null,
task: (trace) => base.ReadCurrentAsync<T>(cosmosConflict, partitionKey, trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse<T>(response));
onSuccess: (response) => new OpenTelemetryResponse<T>(response));
}

public override T ReadConflictContent<T>(ConflictProperties cosmosConflict)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
using Microsoft.Azure.Cosmos.ReadFeed;
using Microsoft.Azure.Cosmos.Serializer;
using Microsoft.Azure.Cosmos.Telemetry;
using Microsoft.Azure.Cosmos.Tracing;

// This class acts as a wrapper for environments that use SynchronizationContext.
Expand Down Expand Up @@ -193,7 +194,11 @@ public override Task<ItemResponse<T>> CreateItemAsync<T>(T item,
nameof(CreateItemAsync),
requestOptions,
(trace) => base.CreateItemAsync<T>(item, trace, partitionKey, requestOptions, cancellationToken),
(response) => new OpenTelemetryResponse<T>(response));
onSuccess: (response) => new OpenTelemetryResponse<T>(response),
onException: (exception) => new OpenTelemetryException(
containerName: this.Id,
databaseName: this.Database.Id,
exception: exception));
}

public override Task<ResponseMessage> ReadItemStreamAsync(
Expand All @@ -206,7 +211,11 @@ public override Task<ResponseMessage> ReadItemStreamAsync(
nameof(ReadItemStreamAsync),
requestOptions,
(trace) => base.ReadItemStreamAsync(id, partitionKey, trace, requestOptions, cancellationToken),
(response) => new OpenTelemetryResponse(response));
onSuccess: (response) => new OpenTelemetryResponse(response),
onException: (exception) => new OpenTelemetryException(
containerName: this.Id,
databaseName: this.Database.Id,
exception: exception));
}

public override Task<ItemResponse<T>> ReadItemAsync<T>(
Expand All @@ -219,7 +228,11 @@ public override Task<ItemResponse<T>> ReadItemAsync<T>(
nameof(ReadItemAsync),
requestOptions,
(trace) => base.ReadItemAsync<T>(id, partitionKey, trace, requestOptions, cancellationToken),
(response) => new OpenTelemetryResponse<T>(response));
onSuccess: (response) => new OpenTelemetryResponse<T>(response),
onException: (exception) => new OpenTelemetryException(
containerName: this.Id,
databaseName: this.Database.Id,
exception: exception));
}

public override Task<ResponseMessage> UpsertItemStreamAsync(
Expand All @@ -232,7 +245,11 @@ public override Task<ResponseMessage> UpsertItemStreamAsync(
nameof(UpsertItemStreamAsync),
requestOptions,
(trace) => base.UpsertItemStreamAsync(streamPayload, partitionKey, trace, requestOptions, cancellationToken),
(response) => new OpenTelemetryResponse(response));
onSuccess: (response) => new OpenTelemetryResponse(response),
onException: (exception) => new OpenTelemetryException(
containerName: this.Id,
databaseName: this.Database.Id,
exception: exception));
}

public override Task<ItemResponse<T>> UpsertItemAsync<T>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ internal abstract Task<TResult> OperationHelperAsync<TResult>(
string operationName,
RequestOptions requestOptions,
Func<ITrace, Task<TResult>> task,
Func<TResult, OpenTelemetryAttributes> openTelemetry = null,
Func<TResult, OpenTelemetryAttributes> onSuccess = null,
Func<Exception, OpenTelemetryException> onException = null,
TraceComponent traceComponent = TraceComponent.Transport,
TraceLevel traceLevel = TraceLevel.Info);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public override Task<ResponseMessage> ReadNextAsync(CancellationToken cancellati
return this.clientContext.OperationHelperAsync("FeedIterator Read Next Async",
requestOptions: null,
task: (trace) => this.feedIteratorInternal.ReadNextAsync(trace, cancellationToken),
openTelemetry: (response) =>
onSuccess: (response) =>
{
if (this.container == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public override Task<FeedResponse<T>> ReadNextAsync(CancellationToken cancellati
return this.clientContext.OperationHelperAsync("Typed FeedIterator ReadNextAsync",
requestOptions: null,
task: trace => this.feedIteratorInternal.ReadNextAsync(trace, cancellationToken),
openTelemetry: (response) =>
onSuccess: (response) =>
{
if (this.container == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ namespace Microsoft.Azure.Cosmos.Telemetry
using System;
using System.Collections.Generic;
using Diagnostics;
using global::Azure;
using global::Azure.Core.Pipeline;
using Microsoft.Azure.Documents;

internal struct OpenTelemetryCoreRecorder : IDisposable
{
Expand Down Expand Up @@ -108,8 +110,16 @@ public void MarkFailed(OpenTelemetryException openTelemetryException)
{
if (this.IsEnabled)
{
Exception exception = openTelemetryException.OriginalException;
this.scope.AddAttribute(OpenTelemetryAttributeKeys.RequestContentLength, openTelemetryException.RequestContentLength);
this.scope.AddAttribute(OpenTelemetryAttributeKeys.ResponseContentLength, openTelemetryException.ResponseContentLength);
this.scope.AddAttribute(OpenTelemetryAttributeKeys.ItemCount, openTelemetryException.ItemCount);
this.scope.AddAttribute(OpenTelemetryAttributeKeys.OperationType, openTelemetryException.OperationType);

this.scope.AddAttribute(OpenTelemetryAttributeKeys.DbName, openTelemetryException.DatabaseName);
this.scope.AddAttribute(OpenTelemetryAttributeKeys.ContainerName, openTelemetryException.ContainerName);

Exception exception = openTelemetryException.OriginalException;

this.scope.AddAttribute(OpenTelemetryAttributeKeys.ExceptionStacktrace, exception.StackTrace);
this.scope.AddAttribute(OpenTelemetryAttributeKeys.ExceptionType, exception.GetType());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
namespace Microsoft.Azure.Cosmos.Telemetry
{
using System;
using System.Collections.Generic;
using System.Text;


internal class OpenTelemetryException : OpenTelemetryAttributes
{
internal Exception OriginalException { get; set; }
internal OpenTelemetryException(string containerName, string databaseName, Exception exception)
: base(null/*need to check*/, containerName, databaseName)
{
this.OriginalException = exception;
}

internal Exception OriginalException { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ private static CosmosClientContext MockClientContext()
It.IsAny<RequestOptions>(),
It.IsAny<Func<ITrace, Task<object>>>(),
It.IsAny<Func<object, OpenTelemetryAttributes>>(),
It.IsAny<Func<Exception, OpenTelemetryException>>(),
It.IsAny<TraceComponent>(),
It.IsAny<TraceLevel>()))
.Returns<string, RequestOptions, Func<ITrace, Task<object>>, Func<object, OpenTelemetryAttributes>, TraceComponent, TraceLevel>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ private Mock<CosmosClientContext> MockClientContext()
It.IsAny<RequestOptions>(),
It.IsAny<Func<ITrace, Task<object>>>(),
It.IsAny<Func<object, OpenTelemetryAttributes>>(),
It.IsAny<Func<Exception, OpenTelemetryException>>(),
It.IsAny<TraceComponent>(),
It.IsAny<TraceLevel>()))
.Returns<string, RequestOptions, Func<ITrace, Task<object>>, Func<object, OpenTelemetryAttributes>, TraceComponent, TraceLevel>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ private CosmosClientContext GetMockClientContext()
It.IsAny<RequestOptions>(),
It.IsAny<Func<ITrace, Task<object>>>(),
It.IsAny<Func<object, OpenTelemetryAttributes>>(),
It.IsAny<Func<Exception, OpenTelemetryException>>(),
It.IsAny<TraceComponent>(),
It.IsAny<TraceLevel>()))
.Returns<string, RequestOptions, Func<ITrace, Task<object>>, Func<object, OpenTelemetryAttributes>, TraceComponent, TraceLevel>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ static FeedIteratorInternal feedCreator(DocumentServiceLease lease, string conti
It.IsAny<RequestOptions>(),
It.IsAny<Func<ITrace, Task<FeedResponse<ChangeFeedProcessorState>>>>(),
It.IsAny<Func<FeedResponse<ChangeFeedProcessorState>, OpenTelemetryAttributes>>(),
It.IsAny<Func<Exception, OpenTelemetryException>>(),
It.Is<TraceComponent>(tc => tc == TraceComponent.ChangeFeed),
It.IsAny<TraceLevel>()))
.Returns<string, RequestOptions, Func<ITrace, Task<FeedResponse<ChangeFeedProcessorState>>>, Func<FeedResponse<ChangeFeedProcessorState>, OpenTelemetryAttributes>, TraceComponent, TraceLevel>(
Expand Down Expand Up @@ -430,6 +431,7 @@ private static ContainerInternal GetMockedContainer()
It.IsAny<RequestOptions>(),
It.IsAny<Func<ITrace, Task<FeedResponse<ChangeFeedProcessorState>>>>(),
It.IsAny<Func<FeedResponse<ChangeFeedProcessorState>, OpenTelemetryAttributes>>(),
It.IsAny<Func<Exception, OpenTelemetryException>>(),
It.Is<TraceComponent>(tc => tc == TraceComponent.ChangeFeed),
It.IsAny<TraceLevel>()))
.Returns<string, RequestOptions, Func<ITrace, Task<FeedResponse<ChangeFeedProcessorState>>>, Func<FeedResponse<ChangeFeedProcessorState>, OpenTelemetryAttributes>, TraceComponent, TraceLevel>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public async Task EtagPassesContinuation()
It.IsAny<RequestOptions>(),
It.IsAny<Func<ITrace, Task<ResponseMessage>>>(),
It.IsAny<Func<ResponseMessage, OpenTelemetryAttributes>>(),
It.IsAny<Func<Exception, OpenTelemetryException>>(),
It.Is<TraceComponent>(tc => tc == TraceComponent.ChangeFeed),
It.IsAny<TraceLevel>()))
.Returns<string, RequestOptions, Func<ITrace, Task<ResponseMessage>>,Func<ResponseMessage, OpenTelemetryAttributes>, TraceComponent, TraceLevel>(
Expand Down Expand Up @@ -118,6 +119,7 @@ public async Task NextReadHasUpdatedContinuation()
It.IsAny<RequestOptions>(),
It.IsAny<Func<ITrace, Task<ResponseMessage>>>(),
It.IsAny<Func<ResponseMessage, OpenTelemetryAttributes>>(),
It.IsAny<Func<Exception, OpenTelemetryException>>(),
It.Is<TraceComponent>(tc => tc == TraceComponent.ChangeFeed),
It.IsAny<TraceLevel>()))
.Returns<string, RequestOptions, Func<ITrace, Task<ResponseMessage>>, Func<ResponseMessage, OpenTelemetryAttributes>, TraceComponent, TraceLevel>(
Expand Down Expand Up @@ -182,6 +184,7 @@ public async Task ShouldSetFeedRangePartitionKeyRange()
It.IsAny<RequestOptions>(),
It.IsAny<Func<ITrace, Task<ResponseMessage>>>(),
It.IsAny<Func<ResponseMessage, OpenTelemetryAttributes>>(),
It.IsAny<Func<Exception, OpenTelemetryException>>(),
It.Is<TraceComponent>(tc => tc == TraceComponent.ChangeFeed),
It.IsAny<TraceLevel>()))
.Returns<string, RequestOptions, Func<ITrace, Task<ResponseMessage>>, Func<ResponseMessage, OpenTelemetryAttributes>, TraceComponent, TraceLevel>(
Expand Down Expand Up @@ -258,6 +261,7 @@ public async Task ShouldUseFeedRangeEpk()
It.IsAny<RequestOptions>(),
It.IsAny<Func<ITrace, Task<ResponseMessage>>>(),
It.IsAny<Func<ResponseMessage, OpenTelemetryAttributes>>(),
It.IsAny<Func<Exception, OpenTelemetryException>>(),
It.Is<TraceComponent>(tc => tc == TraceComponent.ChangeFeed),
It.IsAny<TraceLevel>()))
.Returns<string, RequestOptions, Func<ITrace, Task<ResponseMessage>>, Func<ResponseMessage, OpenTelemetryAttributes>, TraceComponent, TraceLevel>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ public async Task TestMultipleNestedPartitionKeyValueFromStreamAsync()
It.IsAny<RequestOptions>(),
It.IsAny<Func<ITrace, Task<ResponseMessage>>>(),
It.IsAny<Func<ResponseMessage, OpenTelemetryAttributes>>(),
It.IsAny<Func<Exception, OpenTelemetryException>>(),
It.IsAny<TraceComponent>(),
It.IsAny<TraceLevel>()))
.Returns<string, RequestOptions, Func<ITrace, Task<ResponseMessage>>, Func<ResponseMessage, OpenTelemetryAttributes>, TraceComponent, TraceLevel>(
Expand All @@ -724,6 +725,7 @@ public async Task TestMultipleNestedPartitionKeyValueFromStreamAsync()
It.IsAny<RequestOptions>(),
It.IsAny<Func<ITrace, Task<ItemResponse<dynamic>>>>(),
It.IsAny<Func<ItemResponse<dynamic>, OpenTelemetryAttributes>>(),
It.IsAny<Func<Exception, OpenTelemetryException>>(),
It.IsAny<TraceComponent>(),
It.IsAny<TraceLevel>()))
.Returns<string, RequestOptions, Func<ITrace, Task<ItemResponse<dynamic>>>, Func<ItemResponse<dynamic>, OpenTelemetryAttributes>, TraceComponent, TraceLevel>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ private CosmosClientContext MockClientContext()
It.IsAny<RequestOptions>(),
It.IsAny<Func<ITrace, Task<ResponseMessage>>>(),
It.IsAny<Func<ResponseMessage, OpenTelemetryAttributes>>(),
It.IsAny<Func<Exception, OpenTelemetryException>>(),
It.IsAny<TraceComponent>(),
It.IsAny<TraceLevel>()))
.Returns<string, RequestOptions, Func<ITrace, Task<ResponseMessage>>, Func<ResponseMessage, OpenTelemetryAttributes>, TraceComponent, TraceLevel>(
Expand Down