Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/OpenTelemetry.Exporter.Geneva/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
set using the `PrivatePreviewLogMessagePackStringSizeLimit=<CharCount>`
connection string parameter.
([#2813](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/2813))
* Add httpUrl for HTTP server spans mapped from multiple attributes.
([#2818](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/2818))

## 1.12.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Diagnostics;
using System.Globalization;
using System.Runtime.InteropServices;
using System.Text;
using OpenTelemetry.Exporter.Geneva.Transports;
using OpenTelemetry.Internal;

Expand All @@ -32,14 +33,29 @@ internal sealed class MsgPackTraceExporter : MsgPackExporter, IDisposable
["messaging.url"] = "messagingUrl",
};

internal static readonly Dictionary<string, int> CS40_PART_B_HTTPURL_MAPPING_DICTIONARY = new()
{
// Mapping from HTTP semconv to httpUrl
// Combination of url.scheme, server.address, server.port, url.path and url.query attributes for HTTP server spans
["url.scheme"] = 0,
["server.address"] = 1,
["server.port"] = 2,
["url.path"] = 3,
["url.query"] = 4,
};

#if NET
internal static readonly FrozenDictionary<string, string> CS40_PART_B_MAPPING = CS40_PART_B_MAPPING_DICTIONARY.ToFrozenDictionary();
internal static readonly FrozenDictionary<string, int> CS40_PART_B_HTTPURL_MAPPING = CS40_PART_B_HTTPURL_MAPPING_DICTIONARY.ToFrozenDictionary();
#else
internal static readonly Dictionary<string, string> CS40_PART_B_MAPPING = CS40_PART_B_MAPPING_DICTIONARY;
internal static readonly Dictionary<string, int> CS40_PART_B_HTTPURL_MAPPING = CS40_PART_B_HTTPURL_MAPPING_DICTIONARY;
#endif

internal readonly ThreadLocal<byte[]> Buffer = new();

internal readonly ThreadLocal<object?[]> HttpUrlParts = new();

#if NET
internal readonly FrozenSet<string>? CustomFields;

Expand Down Expand Up @@ -243,6 +259,7 @@ public void Dispose()
{
(this.dataTransport as IDisposable)?.Dispose();
this.Buffer.Dispose();
this.HttpUrlParts.Dispose();
}
catch (Exception ex)
{
Expand All @@ -252,6 +269,50 @@ public void Dispose()
this.isDisposed = true;
}

internal static bool CacheIfPartOfHttpUrl(KeyValuePair<string, object?> entry, object?[] httpUrlParts)
{
if (CS40_PART_B_HTTPURL_MAPPING.TryGetValue(entry.Key, out var index))
{
if (index < httpUrlParts.Length)
{
httpUrlParts[index] = entry.Value;
return true;
}
}

return false;
}

internal static string? GetHttpUrl(object?[] httpUrlParts)
{
// OpenTelemetry Semantic Convention: https://github.com/open-telemetry/semantic-conventions/blob/v1.28.0/docs/http/http-spans.md#http-server-semantic-conventions
var scheme = httpUrlParts[0]?.ToString() ?? string.Empty; // 0 => CS40_PART_B_HTTPURL_MAPPING["url.scheme"]
var address = httpUrlParts[1]?.ToString() ?? string.Empty; // 1 => CS40_PART_B_HTTPURL_MAPPING["server.address"]
var port = httpUrlParts[2]?.ToString(); // 2 => CS40_PART_B_HTTPURL_MAPPING["server.port"]
port = port != null ? $":{port}" : string.Empty;
var path = httpUrlParts[3]?.ToString() ?? string.Empty; // 3 => CS40_PART_B_HTTPURL_MAPPING["url.path"]
var query = httpUrlParts[4]?.ToString(); // 4 => CS40_PART_B_HTTPURL_MAPPING["url.query"]
query = query != null ? $"?{query}" : string.Empty;

var length = scheme.Length + Uri.SchemeDelimiter.Length + address.Length + port.Length + path.Length + query.Length;

// No URL elements found, i.e. no scheme, no address, no port, no path, no query
if (length == Uri.SchemeDelimiter.Length)
{
return null;
}

var urlStringBuilder = new StringBuilder(length)
.Append(scheme)
.Append(Uri.SchemeDelimiter)
.Append(address)
.Append(port)
.Append(path)
.Append(query);

return urlStringBuilder.ToString();
}

internal ArraySegment<byte> SerializeActivity(Activity activity)
{
var buffer = this.Buffer.Value;
Expand Down Expand Up @@ -367,8 +428,27 @@ internal ArraySegment<byte> SerializeActivity(Activity activity)
var isStatusSuccess = true;
string? statusDescription = null;

var isServerActivity = activity.Kind == ActivityKind.Server;
var httpUrlParts = this.HttpUrlParts.Value ?? new object?[CS40_PART_B_HTTPURL_MAPPING.Count];
if (isServerActivity)
{
if (this.HttpUrlParts.Value == null)
{
this.HttpUrlParts.Value = httpUrlParts;
}
else
{
Array.Clear(httpUrlParts, 0, httpUrlParts.Length);
}
}

foreach (ref readonly var entry in activity.EnumerateTagObjects())
{
if (isServerActivity && CacheIfPartOfHttpUrl(entry, httpUrlParts))
{
continue; // Skip this entry, since it is part of httpUrl.
}

// TODO: check name collision
if (CS40_PART_B_MAPPING.TryGetValue(entry.Key, out var replacementKey))
{
Expand All @@ -393,6 +473,18 @@ internal ArraySegment<byte> SerializeActivity(Activity activity)
cntFields += 1;
}

if (isServerActivity)
{
var httpUrl = GetHttpUrl(httpUrlParts);
if (httpUrl != null)
{
// If the activity is a server activity and has http.url, we need to add it as a dedicated field.
cursor = MessagePackSerializer.SerializeAsciiString(buffer, cursor, "httpUrl");
cursor = MessagePackSerializer.SerializeUnicodeString(buffer, cursor, httpUrl);
cntFields += 1;
}
}

if (hasEnvProperties)
{
// Iteration #2 - Get all "other" fields and collapse them into single field
Expand Down
149 changes: 149 additions & 0 deletions test/OpenTelemetry.Exporter.Geneva.Tests/GenevaTraceExporterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,89 @@ public void GenevaTraceExporter_Serialization_Success(bool hasTableNameMapping,
}
}

[Fact]
public void GenevaTraceExporter_ServerSpan_HttpUrl_Success()
{
var path = string.Empty;
Socket server = null;
try
{
var invocationCount = 0;
var exporterOptions = new GenevaExporterOptions();
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
exporterOptions.ConnectionString = "EtwSession=OpenTelemetry";
}
else
{
path = GetRandomFilePath();
exporterOptions.ConnectionString = "Endpoint=unix:" + path;
var endpoint = new UnixDomainSocketEndPoint(path);
server = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.IP);
server.Bind(endpoint);
server.Listen(1);
}

using var exporter = new MsgPackTraceExporter(exporterOptions);

var m_buffer = exporter.Buffer;

// Add an ActivityListener to serialize the activity and assert that it was valid on ActivityStopped event

// Set the ActivitySourceName to the unique value of the test method name to avoid interference with
// the ActivitySource used by other unit tests.
var sourceName = GetTestMethodName();

using var listener = new ActivityListener();
listener.ShouldListenTo = (activitySource) => activitySource.Name == sourceName;
listener.Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllDataAndRecorded;
listener.ActivityStopped = (activity) =>
{
_ = exporter.SerializeActivity(activity);
var fluentdData = MessagePack.MessagePackSerializer.Deserialize<object>(m_buffer.Value, MessagePack.Resolvers.ContractlessStandardResolver.Options);
this.AssertHttpUrlForActivity(exporterOptions, fluentdData, activity);
invocationCount++;
};
ActivitySource.AddActivityListener(listener);

var source = new ActivitySource(sourceName);

// HTTP semconv: Combination of url.scheme, server.address, server.port, url.path and url.query
// attributes for HTTP server spans.
using (var parent = source.StartActivity("HttpIn", ActivityKind.Server))
{
parent.SetTag("http.request.method", "GET");
parent.SetTag("url.scheme", "https");
parent.SetTag("server.address", "localhost");
parent.SetTag("server.port", 443);
parent.SetTag("url.path", "/wiki/Rabbit");

// HTTP semconv: url.full attribute for HTTP client spans.
using (var child = source.StartActivity("HttpOut", ActivityKind.Client))
{
child.SetTag("http.request.method", "GET");
child.SetTag("url.full", "https://www.wikipedia.org/wiki/Rabbit?id=7");
child.SetTag("http.status_code", 404);
}

parent?.SetTag("http.response.status_code", 200);
}

Assert.Equal(2, invocationCount);
}
finally
{
server?.Dispose();
try
{
File.Delete(path);
}
catch
{
}
}
}

[SkipUnlessPlatformMatchesFact(TestPlatform.Linux)]
public void GenevaTraceExporter_Constructor_Missing_Agent_Linux()
{
Expand Down Expand Up @@ -778,4 +861,70 @@ private void AssertFluentdForwardModeForActivity(GenevaExporterOptions exporterO

customChecksForActivity?.Invoke(mapping);
}

private void AssertHttpUrlForActivity(GenevaExporterOptions exporterOptions, object fluentdData, Activity activity)
{
/* Fluentd Forward Mode:
[
"Span",
[
[ <timestamp>, { "env_ver": "4.0", ... } ]
],
{ "TimeFormat": "DateTime" }
]
*/

var signal = (fluentdData as object[])[0] as string;
var TimeStampAndMappings = ((fluentdData as object[])[1] as object[])[0];
var timeStamp = (DateTime)(TimeStampAndMappings as object[])[0];
var mapping = (TimeStampAndMappings as object[])[1] as Dictionary<object, object>;

Assert.Equal((byte)activity.Kind, mapping["kind"]);
var tags = activity.TagObjects.ToDictionary(tag => tag.Key, tag => tag.Value);

if (activity.Kind == ActivityKind.Server)
{
// For HTTP server spans, they might contain these attributes for URL:
// Unstable HTTP semconv: Combination of http.scheme, net.host.name, net.host.port, and http.target attributes.
// Stable HTTP semconv: Combination of url.scheme, server.address, server.port, url.path and url.query attributes.
// They will be mapped to httpUrl by Geneva exporter in MsgPackTraceExporter.
Assert.DoesNotContain("http.scheme", mapping.Keys);
Assert.DoesNotContain("net.host.name", mapping.Keys);
Assert.DoesNotContain("net.host.port", mapping.Keys);
Assert.DoesNotContain("http.target", mapping.Keys);
Assert.DoesNotContain("url.scheme", mapping.Keys);
Assert.DoesNotContain("server.address", mapping.Keys);
Assert.DoesNotContain("server.port", mapping.Keys);
Assert.DoesNotContain("url.path", mapping.Keys);
Assert.DoesNotContain("url.query", mapping.Keys);

Assert.Equal("GET", mapping["httpMethod"]);
Assert.Equal("https://localhost:443/wiki/Rabbit", mapping["httpUrl"]);

Assert.DoesNotContain("http.status_code", mapping.Keys);
Assert.DoesNotContain("http.response.status_code", mapping.Keys);
Assert.Equal(200, Convert.ToInt32(mapping["httpStatusCode"]));
}
else if (activity.Kind == ActivityKind.Client)
{
// For HTTP client spans, they might contain this attribute for URL:
// Unstable HTTP semconv: http.url attribute.
// Stable HTTP semconv: url.full attribute.
// They will be mapped to httpUrl by Geneva exporter in MsgPackTraceExporter.
Assert.DoesNotContain("http.url", mapping.Keys);
Assert.DoesNotContain("url.full", mapping.Keys);

Assert.Equal("GET", mapping["httpMethod"]);

Assert.Equal(tags["url.full"], mapping["httpUrl"]);

Assert.DoesNotContain("http.status_code", mapping.Keys);
Assert.DoesNotContain("http.response.status_code", mapping.Keys);
Assert.Equal(404, Convert.ToInt32(mapping["httpStatusCode"]));
}
else
{
throw new InvalidOperationException($"Unexpected ActivityKind: {activity.Kind}. Expected either Server or Client.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using OpenTelemetry.Exporter.Geneva.MsgPack;
using Xunit;

namespace OpenTelemetry.Exporter.Geneva.Tests;

public class MsgPackTraceExporterTests
{
[Fact]
public void CacheIfPartOfHttpUrl_KeyPresent_IndexInRange_SetsValueAndReturnsTrue()
{
var entry = new KeyValuePair<string, object?>("url.scheme", "https");
var arr = new object?[MsgPackTraceExporter.CS40_PART_B_HTTPURL_MAPPING.Count];
var result = MsgPackTraceExporter.CacheIfPartOfHttpUrl(entry, arr);
Assert.True(result);
Assert.Equal("https", arr[0]);
}

[Fact]
public void CacheIfPartOfHttpUrl_KeyPresent_IndexOutOfRange_ReturnsFalse()
{
var entry = new KeyValuePair<string, object?>("url.scheme", "https");
var arr = Array.Empty<object?>(); // zero-length array
var result = MsgPackTraceExporter.CacheIfPartOfHttpUrl(entry, arr);
Assert.False(result);
}

[Fact]
public void CacheIfPartOfHttpUrl_KeyNotPresent_ReturnsFalse()
{
var entry = new KeyValuePair<string, object?>("not.a.key", "value");
var arr = new object?[MsgPackTraceExporter.CS40_PART_B_HTTPURL_MAPPING.Count];
var result = MsgPackTraceExporter.CacheIfPartOfHttpUrl(entry, arr);
Assert.False(result);
}

[Fact]
public void CacheIfPartOfHttpUrl_NullValue_SetsNull()
{
var entry = new KeyValuePair<string, object?>("url.scheme", null);
var arr = new object?[MsgPackTraceExporter.CS40_PART_B_HTTPURL_MAPPING.Count];
var result = MsgPackTraceExporter.CacheIfPartOfHttpUrl(entry, arr);
Assert.True(result);
Assert.Null(arr[0]);
}

[Theory]
[InlineData("", "", "", "", null)]
[InlineData("http", "host", "", "", "http://host")]
[InlineData("http", "host", "8080", "/x", "http://host:8080/x")]
[InlineData("https", "server", "443", "/api", "https://server:443/api")]
[InlineData("http", "host", "", "/x?y=1", "http://host/x?y=1")]
public void GetHttpUrl_ReturnsExpectedUrl(string scheme, string hostOrAddress, string port, string pathAndQuery, string? expected)
{
var arr = new object?[MsgPackTraceExporter.CS40_PART_B_MAPPING_DICTIONARY.Count];
arr[0] = scheme;
arr[1] = hostOrAddress;
arr[2] = string.IsNullOrEmpty(port) ? null : port;
if (!string.IsNullOrEmpty(pathAndQuery) && pathAndQuery.Contains('?'))
{
var split = pathAndQuery.Split(['?'], 2);
arr[3] = split[0];
arr[4] = split.Length > 1 ? split[1] : null;
}
else
{
arr[3] = string.IsNullOrEmpty(pathAndQuery) ? null : pathAndQuery;
}

var url = MsgPackTraceExporter.GetHttpUrl(arr);
Assert.Equal(expected, url);
}

[Fact]
public void GetHttpUrl_UnknownMethod_ReturnsNull()
{
var arr = new object?[MsgPackTraceExporter.CS40_PART_B_HTTPURL_MAPPING.Count];
var url = MsgPackTraceExporter.GetHttpUrl(arr);
Assert.Null(url);
}
}
Loading