diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py index 7e5d989eb7..0c8438b740 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py @@ -145,6 +145,8 @@ def _enrich_base_span( bootstrap_servers: Union[str, List[str]], client_id: str, topic: str, + partition: Optional[int], + key: Optional[Any], ) -> None: span.set_attribute( messaging_attributes.MESSAGING_SYSTEM, @@ -156,6 +158,17 @@ def _enrich_base_span( span.set_attribute(messaging_attributes.MESSAGING_CLIENT_ID, client_id) span.set_attribute(messaging_attributes.MESSAGING_DESTINATION_NAME, topic) + if partition is not None: + span.set_attribute( + messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID, + str(partition), + ) + + if key is not None: + span.set_attribute( + messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY, key + ) + def _enrich_send_span( span: Span, @@ -174,25 +187,16 @@ def _enrich_send_span( bootstrap_servers=bootstrap_servers, client_id=client_id, topic=topic, + partition=partition, + key=key, ) - if partition is not None: - span.set_attribute( - messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID, - str(partition), - ) - span.set_attribute(messaging_attributes.MESSAGING_OPERATION_NAME, "send") span.set_attribute( messaging_attributes.MESSAGING_OPERATION_TYPE, messaging_attributes.MessagingOperationTypeValues.PUBLISH.value, ) - if key is not None: - span.set_attribute( - messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY, str(key) - ) - def _enrich_anext_span( span: Span, @@ -213,6 +217,8 @@ def _enrich_anext_span( bootstrap_servers=bootstrap_servers, client_id=client_id, topic=topic, + partition=partition, + key=key, ) if consumer_group is not None: @@ -220,12 +226,6 @@ def _enrich_anext_span( messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME, consumer_group ) - if partition is not None: - span.set_attribute( - messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID, - str(partition), - ) - span.set_attribute( messaging_attributes.MESSAGING_OPERATION_NAME, "receive" ) @@ -234,15 +234,18 @@ def _enrich_anext_span( messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, ) - if key is not None: - span.set_attribute( - messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY, key - ) - span.set_attribute( messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET, offset ) + # https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic + # A message within Kafka is uniquely defined by its topic name, topic partition and offset. + if partition is not None: + span.set_attribute( + messaging_attributes.MESSAGING_MESSAGE_ID, + f"{topic}.{partition}.{offset}", + ) + def _get_span_name(operation: str, topic: str): return f"{topic} {operation}"