-
Notifications
You must be signed in to change notification settings - Fork 866
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Distributed Tracing: Producer OpenTelemetry instrumentation #1838
base: master
Are you sure you want to change the base?
Distributed Tracing: Producer OpenTelemetry instrumentation #1838
Conversation
|
@mhowlett I'd love your feedback on this implementation proposal 😊 |
i'd love to have time to give it to you.. i'll ping product about this feature. |
if (activity == null) | ||
return null; | ||
|
||
using (activity) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why have this using
? Won't that cause a stopped Activity
to be returned?
|
||
using (activity) | ||
{ | ||
activity?.AddDefaultOpenTelemetryTags(topicPartition, message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need the null-conditional here (activity?
) because line 41 already verified it is non-null.
TopicPartition topicPartition, | ||
Message<TKey, TValue> message) | ||
{ | ||
activity?.AddTag(OpenTelemetryMessaging.SYSTEM, "kafka"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need any of the null-conditionals in here because this is only ever called for non-null activity.
Message<TKey, TValue> message) | ||
{ | ||
activity?.AddTag(OpenTelemetryMessaging.SYSTEM, "kafka"); | ||
activity?.AddTag(OpenTelemetryMessaging.DESTINATION, topicPartition.Topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally we use Activity.SetTag as opposed to AddTag
. SetTag
implements the OTel spec logic for null
behavior, de-dupe, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
open-telemetry/opentelemetry-dotnet#5173 Based on this, we may be able to suggest AddTag is certain scenarios as well to save some perf.
int messagePayloadBytes = Encoding.UTF8.GetByteCount(message.Value.ToString()); | ||
activity?.AddTag(OpenTelemetryMessaging.MESSAGE_PAYLOAD_SIZE_BYTES, messagePayloadBytes.ToString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Is
message.Value.ToString()
going to be very expensive? Ideally getting the size would either be cheap or it would be an opt-in thing for users who need it enough to pay for it. messagePayloadBytes.ToString()
why not just push the numeric value? Basically you can add anint
as a tag it doesn't need to be forced into astring
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at the very least, check activity.IsAllDataRequested before doing anything which is non-trivial in terms of cost. This ensures that the cost is paid only if the activity has a chance of being exported to some telemetry destination.
|
||
internal static Activity Start<TKey, TValue>(TopicPartition topicPartition, Message<TKey, TValue> message) | ||
{ | ||
Activity activity = ActivitySource.StartActivity(ActivityName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit of an unknown here. StartActivity
is when sampling happens. The OpenTelemetry specification states...
The API documentation MUST state that adding attributes at span creation is preferred to calling SetAttribute later, as samplers can only consider information already present during span creation.
In this code there are no attributes/tags on start so users won't be able to make interesting sampling decisions based on details about the spans/activity instances. This is kind of an ongoing discussion, not sure if there will eventually be a list of attributes that will be required or it will always be optional. Just sharing the state of things 😄
/cc @cijothomas
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For things which are known already, I'd suggest passing it to StartActivityCall.
One example for sql, where the db system is known (and static), and is passed at startActivity time itself.
https://github.com/open-telemetry/opentelemetry-dotnet/blob/main/src/OpenTelemetry.Instrumentation.SqlClient/Implementation/SqlClientDiagnosticListener.cs#L64
/// </summary> | ||
internal static class Producer | ||
{ | ||
private const string ActivityName = ActivitySourceName + ".MessageProduced"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ActivitySourceName should not be prefixed for ActivityName.
} | ||
} | ||
|
||
private static Activity AddDefaultOpenTelemetryTags<TKey, TValue>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is another area with some unknowns. The semantic conventions are still marked as Experimental
. Meaning: They are likely to change. There is this whole concept of a schema url that should be attached to instrumentation before it can be declared stable. One option could be we create the Activity
here and then provide a callback instrumentation (such as OpenTelemetry) or users can use in order to add the tags that make sense for their domain. The nice thing about that approach would be Confluent Kafka doesn't need to worry about a spec that is changing 😄 That approach (more or less) is what we have done with dotnet itself. The runtime starts things but OpenTelemetry uses hooks in its instrumentation to add the tags according to the spec.
{ | ||
/// <summary> | ||
/// Provides the OpenTelemetry messaging attributes. | ||
/// The complete list of messaging attributes specification is available here: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#messaging-attributes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the conventions in OTel are still work-in-progress. I'd suggest to mark this overall feature as experimental/non-stable, from the perspective that the name of the tags could change in the future, until the otel conventions become stable.
// Prepare the activity events listener | ||
string activityName = "Confluent.Kafka.MessageProduced"; | ||
ActivityEventsRecorder eventsRecorder = new(activityName); | ||
ActivityListener listener = eventsRecorder.BuildActivityListener(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another alternative way to test would be to use OpenTelemetry's InMemoryExporter. Example : https://github.com/open-telemetry/opentelemetry-dotnet/blob/main/test/OpenTelemetry.Instrumentation.SqlClient.Tests/SqlClientTests.cs#L90
Thanks for this @fedeoliv! I did a quick review and left some comments about where we are with the spec, etc. Happy to help out however I can. |
@@ -907,6 +916,8 @@ internal Producer(ProducerBuilder<TKey, TValue> builder) | |||
ex); | |||
} | |||
|
|||
Activity activity = Diagnostics.Producer.Start(topicPartition, message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use using and remove activity.stop in finally?
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData, | ||
ActivityStarted = activity => | ||
{ | ||
if (activity.DisplayName == activityName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
string.equals for string comparison
Does it look like development has stopped? |
@mhowlett It's been 5 months, is there any update on whether and how Kafka is going to support distributed tracing? |
Folks, any update on this? it's been months @mhowlett |
Is context propagation implemented as well? Maybe I missed it, but I can't find any code using propagator.Inject()/propagator.ExtractTraceIdAndState()... |
Can we get an update? |
@anchitj any news on this? |
This PR introduces a distributed tracing instrumentation for the Producer by leveraging the use of
Activity
objects with tags mapping the OpenTelemetry messaging attributes specification.The implementation is inspired by the great discussion on issue #1269 and the @CodeBlanch's proposal on the PR #1278.