Skip to content

Commit f83a435

Browse files
attempt adding kafka_cluster_id to dsm checkpoints
1 parent b5cadb6 commit f83a435

File tree

6 files changed

+79
-20
lines changed

6 files changed

+79
-20
lines changed

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaConsumerCommitAllIntegration.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,18 @@ internal static CallTargetReturn<TResponse> OnMethodEnd<TTarget, TResponse>(TTar
3232
var dataStreams = Tracer.Instance.TracerManager.DataStreamsManager;
3333
if (exception is null && response.Instance is not null && dataStreams.IsEnabled && instance != null)
3434
{
35-
ConsumerCache.TryGetConsumerGroup(instance, out var groupId, out var _, out var _);
35+
ConsumerCache.TryGetConsumerGroup(instance, out var groupId, out var _, out var clusterId);
3636

3737
for (var i = 0; i < response.Count; i++)
3838
{
3939
var item = response[i];
40-
dataStreams.TrackBacklog(
41-
$"consumer_group:{groupId},partition:{item.Partition.Value},topic:{item.Topic},type:kafka_commit",
42-
item.Offset.Value);
40+
var backlogTags = $"consumer_group:{groupId},partition:{item.Partition.Value},topic:{item.Topic},type:kafka_commit";
41+
if (!string.IsNullOrEmpty(clusterId))
42+
{
43+
backlogTags = $"kafka_cluster_id:{clusterId},{backlogTags}";
44+
}
45+
46+
dataStreams.TrackBacklog(backlogTags, item.Offset.Value);
4347
}
4448
}
4549

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaConsumerCommitIntegration.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,19 @@ internal static CallTargetReturn OnMethodEnd<TTarget>(TTarget instance, Exceptio
3737
var dataStreams = Tracer.Instance.TracerManager.DataStreamsManager;
3838
if (exception is null && state.State is IEnumerable<object> offsets && dataStreams.IsEnabled && instance != null)
3939
{
40-
ConsumerCache.TryGetConsumerGroup(instance, out var groupId, out var _, out var _);
40+
ConsumerCache.TryGetConsumerGroup(instance, out var groupId, out var _, out var clusterId);
4141

4242
foreach (var offset in offsets)
4343
{
4444
if (offset.TryDuckCast<ITopicPartitionOffset>(out var item))
4545
{
46-
dataStreams.TrackBacklog(
47-
$"consumer_group:{groupId},partition:{item.Partition.Value},topic:{item.Topic},type:kafka_commit",
48-
item.Offset.Value);
46+
var backlogTags = $"consumer_group:{groupId},partition:{item.Partition.Value},topic:{item.Topic},type:kafka_commit";
47+
if (!string.IsNullOrEmpty(clusterId))
48+
{
49+
backlogTags = $"kafka_cluster_id:{clusterId},{backlogTags}";
50+
}
51+
52+
dataStreams.TrackBacklog(backlogTags, item.Offset.Value);
4953
}
5054
}
5155
}

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,13 +229,15 @@ private static long GetMessageSize<T>(T message)
229229
tags.Offset = offset.ToString();
230230
}
231231

232+
var consumerClusterId = string.Empty;
232233
if (ConsumerCache.TryGetConsumerGroup(consumer, out var groupId, out var bootstrapServers, out var clusterId))
233234
{
234235
tags.ConsumerGroup = groupId;
235236
tags.BootstrapServers = bootstrapServers;
236237
if (!string.IsNullOrEmpty(clusterId))
237238
{
238239
tags.ClusterId = clusterId;
240+
consumerClusterId = clusterId;
239241
DebugLog($"Added cluster_id tag to Kafka consumer span: {clusterId}");
240242
}
241243
else
@@ -270,9 +272,21 @@ private static long GetMessageSize<T>(T message)
270272
{
271273
// TODO: we could pool these arrays to reduce allocations
272274
// NOTE: the tags must be sorted in alphabetical order
273-
var edgeTags = string.IsNullOrEmpty(topic)
275+
string[] edgeTags;
276+
if (!string.IsNullOrEmpty(consumerClusterId))
277+
{
278+
DebugLog($"DataStreams consume checkpoint - cluster_id: {consumerClusterId}, group: {groupId}, topic: {topic}");
279+
// Include cluster_id in edge tags (sorted alphabetically)
280+
edgeTags = string.IsNullOrEmpty(topic)
281+
? new[] { $"kafka_cluster_id:{consumerClusterId}", "direction:in", $"group:{groupId}", "type:kafka" }
282+
: new[] { $"kafka_cluster_id:{consumerClusterId}", "direction:in", $"group:{groupId}", $"topic:{topic}", "type:kafka" };
283+
}
284+
else
285+
{
286+
edgeTags = string.IsNullOrEmpty(topic)
274287
? new[] { "direction:in", $"group:{groupId}", "type:kafka" }
275288
: new[] { "direction:in", $"group:{groupId}", $"topic:{topic}", "type:kafka" };
289+
}
276290

277291
span.SetDataStreamsCheckpoint(
278292
dataStreamsManager,
@@ -365,9 +379,29 @@ internal static void TryInjectHeaders<TTopicPartitionMarker, TMessage>(
365379

366380
if (dataStreamsManager.IsEnabled)
367381
{
368-
var edgeTags = string.IsNullOrEmpty(topic)
382+
// Try to get cluster_id from span tags (it was set in CreateProducerScope)
383+
var producerClusterId = string.Empty;
384+
if (span.Tags is KafkaTags kafkaTags && !string.IsNullOrEmpty(kafkaTags.ClusterId))
385+
{
386+
producerClusterId = kafkaTags.ClusterId;
387+
}
388+
389+
string[] edgeTags;
390+
if (!string.IsNullOrEmpty(producerClusterId))
391+
{
392+
DebugLog($"DataStreams produce checkpoint - cluster_id: {producerClusterId}, topic: {topic}");
393+
// Include cluster_id in edge tags (sorted alphabetically)
394+
edgeTags = string.IsNullOrEmpty(topic)
395+
? new[] { $"kafka_cluster_id:{producerClusterId}", "direction:out", "type:kafka" }
396+
: new[] { $"kafka_cluster_id:{producerClusterId}", "direction:out", $"topic:{topic}", "type:kafka" };
397+
}
398+
else
399+
{
400+
edgeTags = string.IsNullOrEmpty(topic)
369401
? DefaultProduceEdgeTags
370-
: ["direction:out", $"topic:{topic}", "type:kafka"];
402+
: new[] { "direction:out", $"topic:{topic}", "type:kafka" };
403+
}
404+
371405
var msgSize = dataStreamsManager.IsInDefaultState ? 0 : GetMessageSize(message);
372406
// produce is always the start of the edge, so defaultEdgeStartMs is always 0
373407
span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Produce, edgeTags, msgSize, 0);

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaProduceAsyncIntegration.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,14 @@ internal static TResponse OnAsyncMethodEnd<TTarget, TResponse>(TTarget instance,
100100
var dataStreams = Tracer.Instance.TracerManager.DataStreamsManager;
101101
if (dataStreams.IsEnabled)
102102
{
103-
dataStreams.TrackBacklog(
104-
$"partition:{deliveryResult.Partition.Value},topic:{deliveryResult.Topic},type:kafka_produce",
105-
deliveryResult.Offset.Value);
103+
// Try to get cluster_id from span tags
104+
var backlogTags = $"partition:{deliveryResult.Partition.Value},topic:{deliveryResult.Topic},type:kafka_produce";
105+
if (tags.ClusterId != null)
106+
{
107+
backlogTags = $"kafka_cluster_id:{tags.ClusterId},{backlogTags}";
108+
}
109+
110+
dataStreams.TrackBacklog(backlogTags, deliveryResult.Offset.Value);
106111
}
107112
}
108113
}

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaProduceSyncDeliveryHandlerIntegration.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,14 @@ internal static Action<TDeliveryReport> WrapAction<TDeliveryReport>(Action<TDeli
147147
if (!isError)
148148
{
149149
var dataStreams = Tracer.Instance.TracerManager.DataStreamsManager;
150-
dataStreams.TrackBacklog(
151-
$"partition:{report.Partition.Value},topic:{report.Topic},type:kafka_produce",
152-
report.Offset.Value);
150+
// Try to get cluster_id from span tags
151+
var backlogTags = $"partition:{report.Partition.Value},topic:{report.Topic},type:kafka_produce";
152+
if (tags.ClusterId != null)
153+
{
154+
backlogTags = $"kafka_cluster_id:{tags.ClusterId},{backlogTags}";
155+
}
156+
157+
dataStreams.TrackBacklog(backlogTags, report.Offset.Value);
153158
}
154159
}
155160

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/OffsetsCommittedCallbacks.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,19 @@ public void OnDelegateEnd(object? sender, Exception? exception, object? state)
3232
if (result.TryDuckCast<ICommittedOffsets>(out var committedOffsets))
3333
{
3434
var dataStreams = Tracer.Instance.TracerManager.DataStreamsManager;
35+
36+
// Get cluster_id from ConsumerCache when the callback is invoked
37+
string? clusterId = null;
38+
if (sender != null)
39+
{
40+
ConsumerCache.TryGetConsumerGroup(sender, out var _, out var _, out clusterId);
41+
}
42+
3543
for (var i = 0; i < committedOffsets?.Offsets.Count; i++)
3644
{
3745
var item = committedOffsets.Offsets[i];
38-
dataStreams.TrackBacklog(
39-
$"consumer_group:{GroupId},partition:{item.Partition.Value},topic:{item.Topic},type:kafka_commit",
40-
item.Offset.Value);
46+
var backlogTags = $"kafka_cluster_id:{clusterId ?? string.Empty},consumer_group:{GroupId},partition:{item.Partition.Value},topic:{item.Topic},type:kafka_commit";
47+
dataStreams.TrackBacklog(backlogTags, item.Offset.Value);
4148
}
4249
}
4350

0 commit comments

Comments
 (0)