Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// <copyright file="DuplicateFieldNameDiagnostic.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

using Microsoft.CodeAnalysis;

namespace Datadog.Trace.SourceGenerators.MessagePackConstants.Diagnostics;

internal static class DuplicateFieldNameDiagnostic
{
internal const string Id = "DDSG005";
private const string Title = "Duplicate MessagePackField name";
private const string MessageFormat = "MessagePackField '{0}' is defined multiple times. Each field name must be unique to avoid conflicts in generated code.";

private static readonly DiagnosticDescriptor Rule = new(
Id,
Title,
MessageFormat,
category: "CodeGeneration",
defaultSeverity: DiagnosticSeverity.Error,
isEnabledByDefault: true);

public static Diagnostic Create(string fieldName) =>
Diagnostic.Create(Rule, location: null, fieldName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
using Datadog.Trace.SourceGenerators.MessagePackConstants;
using Datadog.Trace.SourceGenerators.MessagePackConstants.Diagnostics;
using Microsoft.CodeAnalysis;
using Microsoft.CodeAnalysis.CSharp.Syntax;
using Microsoft.CodeAnalysis.Text;

/// <inheritdoc />
Expand Down Expand Up @@ -64,7 +63,29 @@ private static void Execute(ImmutableArray<FieldToSerialize> fields, SourceProdu
return;
}

var source = Sources.CreateMessagePackConstants(fields);
// Detect duplicate field names
var fieldGroups = fields.GroupBy(f => f.FieldName).ToList();
var duplicateGroups = fieldGroups.Where(g => g.Count() > 1).ToList();

if (duplicateGroups.Any())
{
foreach (var group in duplicateGroups)
{
var fieldName = group.Key;

// Report diagnostic for each duplicate occurrence (skip first)
foreach (var field in group.Skip(1))
{
var diagnostic = DuplicateFieldNameDiagnostic.Create(fieldName);
context.ReportDiagnostic(diagnostic);
}
}
}

// Only keep the first occurrence of each field name
var uniqueFields = fieldGroups.Select(g => g.First()).ToImmutableArray();

var source = Sources.CreateMessagePackConstants(uniqueFields);
context.AddSource("MessagePackConstants.g.cs", SourceText.From(source, Encoding.UTF8));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Datadog.Trace.Agent.MessagePack
/// MessagePack protocol field names for span serialization.
/// These constants are marked with [MessagePackField] to generate pre-serialized byte arrays.
/// </summary>
internal static class MessagePackFieldNames
internal static partial class MessagePackFieldNames
{
// Span fields
[MessagePackField]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Threading;
using Datadog.Trace.Agent.MessagePack;
using Datadog.Trace.Configuration;
using Datadog.Trace.ContinuousProfiler;
using Datadog.Trace.Vendors.Datadog.Sketches;
Expand All @@ -19,40 +20,14 @@ namespace Datadog.Trace.DataStreamsMonitoring.Aggregation
{
internal sealed class DataStreamsMessagePackFormatter
{
private readonly byte[] _environmentBytes = StringEncoding.UTF8.GetBytes("Env");
private readonly byte[] _serviceBytes = StringEncoding.UTF8.GetBytes("Service");
private readonly long _productMask;
private readonly bool _isInDefaultState;
private readonly bool _writeProcessTags;

// private readonly byte[] _primaryTagBytes = StringEncoding.UTF8.GetBytes("PrimaryTag");
// private readonly byte[] _primaryTagValueBytes;
private readonly byte[] _statsBytes = StringEncoding.UTF8.GetBytes("Stats");
private readonly byte[] _backlogsBytes = StringEncoding.UTF8.GetBytes("Backlogs");
private readonly byte[] _tracerVersionBytes = StringEncoding.UTF8.GetBytes("TracerVersion");
private readonly byte[] _tracerVersionValueBytes = StringEncoding.UTF8.GetBytes(TracerConstants.AssemblyVersion);
private readonly byte[] _langBytes = StringEncoding.UTF8.GetBytes("Lang");
private readonly byte[] _langValueBytes = StringEncoding.UTF8.GetBytes(TracerConstants.Language);

private readonly byte[] _startBytes = StringEncoding.UTF8.GetBytes("Start");
private readonly byte[] _durationBytes = StringEncoding.UTF8.GetBytes("Duration");

private readonly byte[] _edgeTagsBytes = StringEncoding.UTF8.GetBytes("EdgeTags");
private readonly byte[] _hashBytes = StringEncoding.UTF8.GetBytes("Hash");
private readonly byte[] _parentHashBytes = StringEncoding.UTF8.GetBytes("ParentHash");
private readonly byte[] _pathwayLatencyBytes = StringEncoding.UTF8.GetBytes("PathwayLatency");
private readonly byte[] _edgeLatencyBytes = StringEncoding.UTF8.GetBytes("EdgeLatency");
private readonly byte[] _payloadSizeBytes = StringEncoding.UTF8.GetBytes("PayloadSize");
private readonly byte[] _timestampTypeBytes = StringEncoding.UTF8.GetBytes("TimestampType");
private readonly byte[] _currentTimestampTypeBytes = StringEncoding.UTF8.GetBytes("current");
private readonly byte[] _originTimestampTypeBytes = StringEncoding.UTF8.GetBytes("origin");

private readonly byte[] _backlogTagsBytes = StringEncoding.UTF8.GetBytes("Tags");
private readonly byte[] _backlogValueBytes = StringEncoding.UTF8.GetBytes("Value");
private readonly byte[] _productMaskBytes = StringEncoding.UTF8.GetBytes("ProductMask");
private readonly byte[] _processTagsBytes = StringEncoding.UTF8.GetBytes("ProcessTags");
private readonly byte[] _isInDefaultStateBytes = StringEncoding.UTF8.GetBytes("IsInDefaultState");
// This one class isn't yet handled by Source Generators
private readonly byte[] _tracerVersionValueBytes = MessagePackSerializer.Serialize(TracerConstants.AssemblyVersion);

// Runtime value fields (determined at config changes)
private byte[] _environmentValueBytes;
private byte[] _serviceValueBytes;
private ProcessTags? _processTags;
Expand Down Expand Up @@ -127,23 +102,23 @@ public int Serialize(Stream stream, long bucketDurationNs, List<SerializableStat
// -1 because service name override is not supported
bytesWritten += MessagePackBinary.WriteMapHeader(stream, 7 + (withProcessTags ? 1 : 0));

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _environmentBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.EnvDSMBytes);
bytesWritten += MessagePackBinary.WriteStringBytes(stream, _environmentValueBytes);

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _serviceBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.ServiceDSMBytes);
bytesWritten += MessagePackBinary.WriteStringBytes(stream, _serviceValueBytes);

// We never have a primary tag currently, make sure to increase header size if/when we add it
// offset += MessagePackBinary.WriteStringBytes(stream, _primaryTagBytes);
// offset += MessagePackBinary.WriteStringBytes(stream, _primaryTagValueBytes);

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _langBytes);
bytesWritten += MessagePackBinary.WriteStringBytes(stream, _langValueBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.LangBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.DotnetLanguageValueBytes);

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _tracerVersionBytes);
bytesWritten += MessagePackBinary.WriteStringBytes(stream, _tracerVersionValueBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.TracerVersionBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, _tracerVersionValueBytes);

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _statsBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.StatsBytes);
bytesWritten += MessagePackBinary.WriteArrayHeader(stream, statsBuckets.Count + backlogsBuckets.Count);

foreach (var backlogBucket in backlogsBuckets)
Expand All @@ -154,11 +129,11 @@ public int Serialize(Stream stream, long bucketDurationNs, List<SerializableStat
{
bytesWritten += MessagePackBinary.WriteMapHeader(stream, 2);

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _backlogValueBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.ValueBytes);
bytesWritten += MessagePackBinary.WriteInt64(stream, point.Value);

var tags = point.Tags.Split(',');
bytesWritten += MessagePackBinary.WriteStringBytes(stream, _backlogTagsBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.TagsBytes);
bytesWritten += MessagePackBinary.WriteArrayHeader(stream, tags.Length);
foreach (var tag in tags)
{
Expand All @@ -172,8 +147,8 @@ public int Serialize(Stream stream, long bucketDurationNs, List<SerializableStat
bytesWritten += WriteBucketsHeader(stream, statsBucket.BucketStartTimeNs, bucketDurationNs, statsBucket.Bucket.Values.Count, 0);

var timestampTypeBytes = statsBucket.TimestampType == TimestampType.Current
? _currentTimestampTypeBytes
: _originTimestampTypeBytes;
? MessagePackConstants.CurrentBytes
: MessagePackConstants.OriginDSMBytes;

foreach (var point in statsBucket.Bucket.Values)
{
Expand All @@ -185,27 +160,27 @@ public int Serialize(Stream stream, long bucketDurationNs, List<SerializableStat
var itemCount = hasEdges ? 7 : 6;
bytesWritten += MessagePackBinary.WriteMapHeader(stream, itemCount);

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _hashBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.HashBytes);
bytesWritten += MessagePackBinary.WriteUInt64(stream, point.Hash.Value);

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _parentHashBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.ParentHashBytes);
bytesWritten += MessagePackBinary.WriteUInt64(stream, point.ParentHash.Value);

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _timestampTypeBytes);
bytesWritten += MessagePackBinary.WriteStringBytes(stream, timestampTypeBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.TimestampTypeBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, timestampTypeBytes);

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _pathwayLatencyBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.PathwayLatencyBytes);
bytesWritten += SerializeSketch(stream, point.PathwayLatency);

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _edgeLatencyBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.EdgeLatencyBytes);
bytesWritten += SerializeSketch(stream, point.EdgeLatency);

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _payloadSizeBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.PayloadSizeBytes);
bytesWritten += SerializeSketch(stream, point.PayloadSize);

if (hasEdges)
{
bytesWritten += MessagePackBinary.WriteStringBytes(stream, _edgeTagsBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.EdgeTagsBytes);
bytesWritten += MessagePackBinary.WriteArrayHeader(stream, point.EdgeTags.Length);

foreach (var edgeTag in point.EdgeTags)
Expand All @@ -216,20 +191,20 @@ public int Serialize(Stream stream, long bucketDurationNs, List<SerializableStat
}
}

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _productMaskBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.ProductMaskBytes);
bytesWritten += MessagePackBinary.WriteInt64(stream, _productMask);

if (withProcessTags)
{
bytesWritten += MessagePackBinary.WriteStringBytes(stream, _processTagsBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.ProcessTagsDSMBytes);
bytesWritten += MessagePackBinary.WriteArrayHeader(stream, _processTags!.TagsList.Count);
foreach (var tag in _processTags.TagsList)
{
bytesWritten += MessagePackBinary.WriteString(stream, tag);
}
}

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _isInDefaultStateBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.IsInDefaultStateBytes);
bytesWritten += MessagePackBinary.WriteBoolean(stream, _isInDefaultState);

return bytesWritten;
Expand Down Expand Up @@ -259,21 +234,21 @@ private int WriteBucketsHeader(Stream stream, long bucketStartTimeNs, long bucke
// https://github.com/DataDog/data-streams-go/blob/60ba06aec619850aef8ed0b9b1f0f5e310438362/datastreams/payload.go#L48
bytesWritten += MessagePackBinary.WriteMapHeader(stream, count);

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _startBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.StartDSMBytes);
bytesWritten += MessagePackBinary.WriteInt64(stream, bucketStartTimeNs);

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _durationBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.DurationDSMBytes);
bytesWritten += MessagePackBinary.WriteInt64(stream, bucketDurationNs);

if (statsBucketCount > 0)
{
bytesWritten += MessagePackBinary.WriteStringBytes(stream, _statsBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.StatsBytes);
bytesWritten += MessagePackBinary.WriteArrayHeader(stream, statsBucketCount);
}

if (backlogBucketCount > 0)
{
bytesWritten += MessagePackBinary.WriteStringBytes(stream, _backlogsBytes);
bytesWritten += MessagePackBinary.WriteRaw(stream, MessagePackConstants.BacklogsBytes);
bytesWritten += MessagePackBinary.WriteArrayHeader(stream, backlogBucketCount);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// <copyright file="MessagePackFieldNames.DSM.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using Datadog.Trace.SourceGenerators;

namespace Datadog.Trace.Agent.MessagePack
{
/// <summary>
/// MessagePack field names for Data Streams Monitoring serialization (DSM-specific part).
/// These constants are marked with [MessagePackField] to generate pre-serialized byte arrays.
/// </summary>
internal static partial class MessagePackFieldNames
{
// Top-level payload fields (PascalCase per DSM protocol)
// Using DSM suffix to avoid naming conflicts with span protocol fields (which use lowercase)
[MessagePackField]
public const string EnvDSM = "Env";

[MessagePackField]
public const string ServiceDSM = "Service";

[MessagePackField]
public const string Stats = "Stats";

[MessagePackField]
public const string Backlogs = "Backlogs";

[MessagePackField]
public const string TracerVersion = "TracerVersion";

[MessagePackField]
public const string Lang = "Lang";

[MessagePackField]
public const string ProductMask = "ProductMask";

[MessagePackField]
public const string ProcessTagsDSM = "ProcessTags";

[MessagePackField]
public const string IsInDefaultState = "IsInDefaultState";

// Bucket fields
[MessagePackField]
public const string StartDSM = "Start";

[MessagePackField]
public const string DurationDSM = "Duration";

// Stats point fields
[MessagePackField]
public const string Hash = "Hash";

[MessagePackField]
public const string ParentHash = "ParentHash";

[MessagePackField]
public const string TimestampType = "TimestampType";

[MessagePackField]
public const string PathwayLatency = "PathwayLatency";

[MessagePackField]
public const string EdgeLatency = "EdgeLatency";

[MessagePackField]
public const string PayloadSize = "PayloadSize";

[MessagePackField]
public const string EdgeTags = "EdgeTags";

// Backlog fields
[MessagePackField]
public const string Tags = "Tags";

[MessagePackField]
public const string Value = "Value";

// Timestamp type values
[MessagePackField]
public const string Current = "current";

[MessagePackField]
public const string OriginDSM = "origin";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,26 @@
// </copyright>

using System;
using System.IO;

namespace Datadog.Trace.Vendors.MessagePack
{
internal static partial class MessagePackBinary
{
#if NETCOREAPP
public static int WriteRaw(Stream stream, ReadOnlySpan<byte> rawMessagePackBlock)
{
stream.Write(rawMessagePackBlock);
return rawMessagePackBlock.Length;
}
#else
public static int WriteRaw(Stream stream, byte[] rawMessagePackBlock)
{
stream.Write(rawMessagePackBlock, 0, rawMessagePackBlock.Length);
return rawMessagePackBlock.Length;
}
#endif

public static int WriteRaw(ref byte[] bytes, int offset, ReadOnlySpan<byte> rawMessagePackBlock)
{
EnsureCapacity(ref bytes, offset, rawMessagePackBlock.Length);
Expand Down
Loading
Loading