Skip to content

Commit

Permalink
Diagnostics: Fixes missing POCO deserialization Trace. (#2236)
Browse files Browse the repository at this point in the history
* added trace for POCO

* added more traces for the general resource iterator
  • Loading branch information
bchong95 authored Feb 19, 2021
1 parent 3723188 commit b9cd8d0
Show file tree
Hide file tree
Showing 6 changed files with 565 additions and 299 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ namespace Microsoft.Azure.Cosmos
using System;
using System.IO;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Serializer;
using Microsoft.Azure.Cosmos.Tracing;
Expand Down Expand Up @@ -66,13 +68,8 @@ public override Task<ResponseMessage> ReadNextAsync(CancellationToken cancellati
return this.ReadNextAsync(NoOpTrace.Singleton);
}

public override Task<ResponseMessage> ReadNextAsync(ITrace trace, CancellationToken cancellationToken = default)
{
return this.ReadNextInternalAsync(trace, cancellationToken);
}

private async Task<ResponseMessage> ReadNextInternalAsync(
ITrace trace,
public override async Task<ResponseMessage> ReadNextAsync(
ITrace trace,
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
Expand All @@ -82,13 +79,21 @@ private async Task<ResponseMessage> ReadNextInternalAsync(
throw new ArgumentNullException(nameof(trace));
}

Stream stream = null;
Stream stream;
OperationType operation = OperationType.ReadFeed;
if (this.querySpec != null)
{
stream = this.clientContext.SerializerCore.ToStreamSqlQuerySpec(this.querySpec, this.resourceType);
using (ITrace querySpecStreamTrace = trace.StartChild("QuerySpec to Stream", TraceComponent.Poco, TraceLevel.Info))
{
stream = this.clientContext.SerializerCore.ToStreamSqlQuerySpec(this.querySpec, this.resourceType);
}

operation = OperationType.Query;
}
else
{
stream = null;
}

ResponseMessage responseMessage = await this.clientContext.ProcessResourceOperationStreamAsync(
resourceUri: this.resourceLink,
Expand All @@ -115,7 +120,7 @@ private async Task<ResponseMessage> ReadNextInternalAsync(

if (responseMessage.Content != null)
{
await CosmosElementSerializer.RewriteStreamAsTextAsync(responseMessage, this.requestOptions);
await RewriteStreamAsTextAsync(responseMessage, this.requestOptions, trace);
}

return responseMessage;
Expand All @@ -125,6 +130,68 @@ public override CosmosElement GetCosmosElementContinuationToken()
{
throw new NotImplementedException();
}

private static async Task RewriteStreamAsTextAsync(ResponseMessage responseMessage, QueryRequestOptions requestOptions, ITrace trace)
{
using (ITrace rewriteTrace = trace.StartChild("Rewrite Stream as Text", TraceComponent.Json, TraceLevel.Info))
{
// Rewrite the payload to be in the specified format.
// If it's already in the correct format, then the following will be a memcpy.
MemoryStream memoryStream;
if (responseMessage.Content is MemoryStream responseContentAsMemoryStream)
{
memoryStream = responseContentAsMemoryStream;
}
else
{
memoryStream = new MemoryStream();
await responseMessage.Content.CopyToAsync(memoryStream);
}

ReadOnlyMemory<byte> buffer;
if (memoryStream.TryGetBuffer(out ArraySegment<byte> segment))
{
buffer = segment.Array.AsMemory().Slice(start: segment.Offset, length: segment.Count);
}
else
{
buffer = memoryStream.ToArray();
}

IJsonNavigator jsonNavigator = JsonNavigator.Create(buffer);
if (jsonNavigator.SerializationFormat == JsonSerializationFormat.Text)
{
// Exit to avoid the memory allocation.
return;
}

IJsonWriter jsonWriter;
if (requestOptions?.CosmosSerializationFormatOptions != null)
{
jsonWriter = requestOptions.CosmosSerializationFormatOptions.CreateCustomWriterCallback();
}
else
{
jsonWriter = JsonWriter.Create(JsonSerializationFormat.Text);
}

jsonNavigator.WriteNode(jsonNavigator.GetRootNode(), jsonWriter);

ReadOnlyMemory<byte> result = jsonWriter.GetResult();
MemoryStream rewrittenMemoryStream;
if (MemoryMarshal.TryGetArray(result, out ArraySegment<byte> rewrittenSegment))
{
rewrittenMemoryStream = new MemoryStream(rewrittenSegment.Array, index: rewrittenSegment.Offset, count: rewrittenSegment.Count, writable: false, publiclyVisible: true);
}
else
{
byte[] toArray = result.ToArray();
rewrittenMemoryStream = new MemoryStream(toArray, index: 0, count: toArray.Length, writable: false, publiclyVisible: true);
}

responseMessage.Content = rewrittenMemoryStream;
}
}
}

/// <summary>
Expand Down Expand Up @@ -177,7 +244,10 @@ public override async Task<FeedResponse<T>> ReadNextAsync(ITrace trace, Cancella
}

ResponseMessage response = await this.feedIterator.ReadNextAsync(trace, cancellationToken);
return this.responseCreator(response);
using (ITrace childTrace = trace.StartChild("POCO Materialization", TraceComponent.Poco, TraceLevel.Info))
{
return this.responseCreator(response);
}
}

protected override void Dispose(bool disposing)
Expand Down
59 changes: 0 additions & 59 deletions Microsoft.Azure.Cosmos/src/Serializer/CosmosElementSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,65 +89,6 @@ internal static MemoryStream ToStream(
return GetMemoryStreamFromJsonWriter(jsonWriter);
}

public static async Task RewriteStreamAsTextAsync(ResponseMessage responseMessage, QueryRequestOptions requestOptions)
{
// Rewrite the payload to be in the specified format.
// If it's already in the correct format, then the following will be a memcpy.
MemoryStream memoryStream;
if (responseMessage.Content is MemoryStream responseContentAsMemoryStream)
{
memoryStream = responseContentAsMemoryStream;
}
else
{
memoryStream = new MemoryStream();
await responseMessage.Content.CopyToAsync(memoryStream);
}

ReadOnlyMemory<byte> buffer;
if (memoryStream.TryGetBuffer(out ArraySegment<byte> segment))
{
buffer = segment.Array.AsMemory().Slice(start: segment.Offset, length: segment.Count);
}
else
{
buffer = memoryStream.ToArray();
}

IJsonNavigator jsonNavigator = JsonNavigator.Create(buffer);
if (jsonNavigator.SerializationFormat == JsonSerializationFormat.Text)
{
// Exit to avoid the memory allocation.
return;
}

IJsonWriter jsonWriter;
if (requestOptions?.CosmosSerializationFormatOptions != null)
{
jsonWriter = requestOptions.CosmosSerializationFormatOptions.CreateCustomWriterCallback();
}
else
{
jsonWriter = JsonWriter.Create(JsonSerializationFormat.Text);
}

jsonNavigator.WriteNode(jsonNavigator.GetRootNode(), jsonWriter);

ReadOnlyMemory<byte> result = jsonWriter.GetResult();
MemoryStream rewrittenMemoryStream;
if (MemoryMarshal.TryGetArray(result, out ArraySegment<byte> rewrittenSegment))
{
rewrittenMemoryStream = new MemoryStream(rewrittenSegment.Array, index: rewrittenSegment.Offset, count: rewrittenSegment.Count, writable: false, publiclyVisible: true);
}
else
{
byte[] toArray = result.ToArray();
rewrittenMemoryStream = new MemoryStream(toArray, index: 0, count: toArray.Length, writable: false, publiclyVisible: true);
}

responseMessage.Content = rewrittenMemoryStream;
}

internal static IReadOnlyList<T> GetResources<T>(
IReadOnlyList<CosmosElement> cosmosArray,
CosmosSerializerCore serializerCore)
Expand Down
5 changes: 5 additions & 0 deletions Microsoft.Azure.Cosmos/src/Tracing/TraceComponent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ enum TraceComponent
/// </summary>
Pagination,

/// <summary>
/// Component that handles materializing a POCO.
/// </summary>
Poco,

/// <summary>
/// Component that handles client side aggregation of distributed query results.
/// </summary>
Expand Down
Loading

0 comments on commit b9cd8d0

Please sign in to comment.