Skip to content

Commit

Permalink
add MESSAGING_MESSAGE_ID
Browse files Browse the repository at this point in the history
  • Loading branch information
dimastbk committed Sep 14, 2024
1 parent 1e7af5e commit e7ae511
Showing 1 changed file with 25 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ def _enrich_base_span(
bootstrap_servers: Union[str, List[str]],
client_id: str,
topic: str,
partition: Optional[int],
key: Optional[Any],
) -> None:
span.set_attribute(
messaging_attributes.MESSAGING_SYSTEM,
Expand All @@ -156,6 +158,17 @@ def _enrich_base_span(
span.set_attribute(messaging_attributes.MESSAGING_CLIENT_ID, client_id)
span.set_attribute(messaging_attributes.MESSAGING_DESTINATION_NAME, topic)

if partition is not None:
span.set_attribute(
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID,
str(partition),
)

if key is not None:
span.set_attribute(
messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY, key
)


def _enrich_send_span(
span: Span,
Expand All @@ -174,25 +187,16 @@ def _enrich_send_span(
bootstrap_servers=bootstrap_servers,
client_id=client_id,
topic=topic,
partition=partition,
key=key,
)

if partition is not None:
span.set_attribute(
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID,
str(partition),
)

span.set_attribute(messaging_attributes.MESSAGING_OPERATION_NAME, "send")
span.set_attribute(
messaging_attributes.MESSAGING_OPERATION_TYPE,
messaging_attributes.MessagingOperationTypeValues.PUBLISH.value,
)

if key is not None:
span.set_attribute(
messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY, str(key)
)


def _enrich_anext_span(
span: Span,
Expand All @@ -213,19 +217,15 @@ def _enrich_anext_span(
bootstrap_servers=bootstrap_servers,
client_id=client_id,
topic=topic,
partition=partition,
key=key,
)

if consumer_group is not None:
span.set_attribute(
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME, consumer_group
)

if partition is not None:
span.set_attribute(
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID,
str(partition),
)

span.set_attribute(
messaging_attributes.MESSAGING_OPERATION_NAME, "receive"
)
Expand All @@ -234,15 +234,18 @@ def _enrich_anext_span(
messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
)

if key is not None:
span.set_attribute(
messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY, key
)

span.set_attribute(
messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET, offset
)

# https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic
# A message within Kafka is uniquely defined by its topic name, topic partition and offset.
if partition is not None:
span.set_attribute(
messaging_attributes.MESSAGING_MESSAGE_ID,
f"{topic}.{partition}.{offset}",
)


def _get_span_name(operation: str, topic: str):
return f"{topic} {operation}"
Expand Down

0 comments on commit e7ae511

Please sign in to comment.