Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(confluent-kafka): Add instrumentation to consume method #1786

Merged
merged 25 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
edb9ccc
feat(confluent-kafka): Add instrumentation to consume method
javferrod May 6, 2023
f6e5f98
feat(confluent-kafka): Add instrumentation to consume method
javferrod May 6, 2023
9f4b4ee
Changed email
javferrod May 6, 2023
fbfc055
chore(changelog): Added pull request link
javferrod May 6, 2023
f042dea
fix(confluent-kafka): Fix wrong partition and offset if
javferrod May 20, 2023
7e25294
chore(confluent-kafka): Add test for poll and consume
javferrod May 20, 2023
2d11cb4
Merge branch 'main' into issue-1747
javferrod May 20, 2023
3a31f6b
chore(confluent-kafka): Removed unused import
javferrod May 20, 2023
49b65c5
fix(confluent-kafka): Removed non relevant argument in poll
javferrod May 20, 2023
b0d3ac1
Merge branch 'main' into issue-1747
javferrod Jun 10, 2023
e31e21c
fix(confluent-kafka): Fixed identation
javferrod Jun 26, 2023
b7d08dc
Merge branch 'issue-1747' of https://github.com/javferrod/opentelemet…
javferrod Jun 26, 2023
7a86555
fix(confluent-kafka): Removed unused pylint directive
javferrod Jun 27, 2023
4c9cce6
fix(confluent-kafka): Fixed some lint errors
javferrod Jun 27, 2023
2138a8f
Merge branch 'main' into issue-1747
ocelotl Jul 6, 2023
ac9ec26
fix(confluent-kafka): Fixed lint errors
javferrod Jul 9, 2023
8a1cf98
Merge branch 'issue-1747' of https://github.com/javferrod/opentelemet…
javferrod Jul 9, 2023
8959aae
Merge branch 'main' into issue-1747
ocelotl Jul 28, 2023
6ce77cf
Merge branch 'main' into issue-1747
ocelotl Aug 10, 2023
a552778
fix((confluent-kafka): Removed unused span_list
javferrod Aug 13, 2023
8a76540
Merge branch 'issue-1747' of https://github.com/javferrod/opentelemet…
javferrod Aug 13, 2023
1815057
fix((confluent-kafka): Extracted some shared logic between poll and c…
javferrod Aug 13, 2023
51167dd
fix(confluent-kafka): changed function order
javferrod Aug 19, 2023
296be7f
Merge branch 'main' into issue-1747
javferrod Aug 19, 2023
e275ec2
Merge branch 'main' into issue-1747
ocelotl Aug 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add optional distro and configurator selection for auto-instrumentation
([#1823](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1823))

### Added
- `opentelemetry-instrumentation-kafka-python` Add instrumentation to `consume` method
([#1786](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1786))

## Version 1.18.0/0.39b0 (2023-05-10)

- Update runtime metrics to follow semantic conventions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
from .utils import (
KafkaPropertiesExtractor,
_enrich_span,
_get_links_from_records,
_get_span_name,
_kafka_getter,
_kafka_setter,
Expand All @@ -137,6 +138,12 @@ def __init__(self, config):
def poll(self, timeout=-1): # pylint: disable=useless-super-delegation
return super().poll(timeout)

# This method is deliberately implemented in order to allow wrapt to wrap this function
def consume(
self, *args, **kwargs
): # pylint: disable=useless-super-delegation
return super().consume(*args, **kwargs)


class ProxiedProducer(Producer):
def __init__(self, producer: Producer, tracer: Tracer):
Expand Down Expand Up @@ -177,10 +184,14 @@ def committed(self, partitions, timeout=-1):
def commit(self, *args, **kwargs):
return self._consumer.commit(*args, **kwargs)

def consume(
self, num_messages=1, *args, **kwargs
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
): # pylint: disable=keyword-arg-before-vararg
return self._consumer.consume(num_messages, *args, **kwargs)
def consume(self, *args, **kwargs):
return ConfluentKafkaInstrumentor.wrap_consume(
self._consumer.consume,
self,
self._tracer,
args,
kwargs,
)

def get_watermark_offsets(
self, partition, timeout=-1, *args, **kwargs
Expand Down Expand Up @@ -275,6 +286,11 @@ def _inner_wrap_poll(func, instance, args, kwargs):
func, instance, self._tracer, args, kwargs
)

def _inner_wrap_consume(func, instance, args, kwargs):
return ConfluentKafkaInstrumentor.wrap_consume(
func, instance, self._tracer, args, kwargs
)

wrapt.wrap_function_wrapper(
AutoInstrumentedProducer,
"produce",
Expand All @@ -287,6 +303,12 @@ def _inner_wrap_poll(func, instance, args, kwargs):
_inner_wrap_poll,
)

wrapt.wrap_function_wrapper(
AutoInstrumentedConsumer,
"consume",
_inner_wrap_consume,
)

def _uninstrument(self, **kwargs):
confluent_kafka.Producer = self._original_kafka_producer
confluent_kafka.Consumer = self._original_kafka_consumer
Expand Down Expand Up @@ -336,13 +358,7 @@ def wrap_poll(func, instance, tracer, args, kwargs):
):
record = func(*args, **kwargs)
if record:
links = []
ctx = propagate.extract(record.headers(), getter=_kafka_getter)
if ctx:
for item in ctx.values():
if hasattr(item, "get_span_context"):
links.append(Link(context=item.get_span_context()))

links = _get_links_from_records([record])
instance._current_consume_span = tracer.start_span(
name=f"{record.topic()} process",
links=links,
Expand All @@ -361,3 +377,35 @@ def wrap_poll(func, instance, tracer, args, kwargs):
)

return record

@staticmethod
def wrap_consume(func, instance, tracer, args, kwargs):
if instance._current_consume_span:
context.detach(instance._current_context_token)
instance._current_context_token = None
instance._current_consume_span.end()
instance._current_consume_span = None

with tracer.start_as_current_span(
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
javferrod marked this conversation as resolved.
Show resolved Hide resolved
):
records = func(*args, **kwargs)
if len(records) > 0:
links = _get_links_from_records(records)
instance._current_consume_span = tracer.start_span(
name=f"{records[0].topic()} process",
links=links,
kind=SpanKind.CONSUMER,
)

_enrich_span(
javferrod marked this conversation as resolved.
Show resolved Hide resolved
instance._current_consume_span,
records[0].topic(),
operation=MessagingOperationValues.PROCESS,
)

instance._current_context_token = context.attach(
trace.set_span_in_context(instance._current_consume_span)
)

return records
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from logging import getLogger
from typing import List, Optional

from opentelemetry import propagate
from opentelemetry.trace import SpanKind, Link
from opentelemetry.propagators import textmap
from opentelemetry.semconv.trace import (
MessagingDestinationKindValues,
Expand Down Expand Up @@ -94,7 +96,7 @@ def _enrich_span(
span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka")
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic)

if partition:
if partition is not None:
span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition)

span.set_attribute(
Expand All @@ -109,13 +111,25 @@ def _enrich_span(

# https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic
# A message within Kafka is uniquely defined by its topic name, topic partition and offset.
if partition and offset and topic:
if partition is not None and offset is not None and topic:
span.set_attribute(
SpanAttributes.MESSAGING_MESSAGE_ID,
f"{topic}.{partition}.{offset}",
)


def _get_links_from_records(records):
javferrod marked this conversation as resolved.
Show resolved Hide resolved
links = []
for record in records:
ctx = propagate.extract(record.headers(), getter=_kafka_getter)
if ctx:
for item in ctx.values():
if hasattr(item, "get_span_context"):
links.append(Link(context=item.get_span_context()))

return links


_kafka_setter = KafkaContextSetter()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@

# pylint: disable=no-name-in-module

from unittest import TestCase
from opentelemetry.semconv.trace import (
SpanAttributes,
MessagingDestinationKindValues,
)
from opentelemetry.test.test_base import TestBase
from .utils import MockConsumer, MockedMessage

from confluent_kafka import Consumer, Producer

Expand All @@ -29,7 +34,7 @@
)


class TestConfluentKafka(TestCase):
class TestConfluentKafka(TestBase):
def test_instrument_api(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()

Expand Down Expand Up @@ -104,3 +109,140 @@ def test_context_getter(self) -> None:
context_setter.set(carrier_list, "key1", "val1")
self.assertEqual(context_getter.get(carrier_list, "key1"), ["val1"])
self.assertEqual(["key1"], context_getter.keys(carrier_list))

def test_poll(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
mocked_messages = [
MockedMessage("topic-10", 0, 0, []),
MockedMessage("topic-20", 2, 4, []),
MockedMessage("topic-30", 1, 3, []),
]
expected_spans = [
{"name": "recv", "attributes": {}},
{
"name": "topic-10 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_KAFKA_PARTITION: 0,
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-10",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-10.0.0",
},
},
{"name": "recv", "attributes": {}},
{
"name": "topic-20 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_KAFKA_PARTITION: 2,
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-20",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-20.2.4",
},
},
{"name": "recv", "attributes": {}},
{
"name": "topic-30 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_KAFKA_PARTITION: 1,
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-30",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-30.1.3",
},
},
{"name": "recv", "attributes": {}},
]

consumer = MockConsumer(
mocked_messages,
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
},
)
span_list = self.memory_exporter.clear()
consumer = instrumentation.instrument_consumer(consumer)
consumer.poll()
consumer.poll()
consumer.poll()
consumer.poll()

span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)

def test_consume(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
mocked_messages = [
MockedMessage("topic-1", 0, 0, []),
MockedMessage("topic-1", 2, 1, []),
MockedMessage("topic-1", 3, 2, []),
MockedMessage("topic-2", 0, 0, []),
MockedMessage("topic-3", 0, 3, []),
MockedMessage("topic-2", 0, 1, []),
]
expected_spans = [
{"name": "recv", "attributes": {}},
{
"name": "topic-1 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-1",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
},
},
{"name": "recv", "attributes": {}},
{
"name": "topic-2 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-2",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
},
},
{"name": "recv", "attributes": {}},
{
"name": "topic-3 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-3",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
},
},
{"name": "recv", "attributes": {}},
]

consumer = MockConsumer(
mocked_messages,
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
},
)

span_list = self.memory_exporter.clear()
javferrod marked this conversation as resolved.
Show resolved Hide resolved
consumer = instrumentation.instrument_consumer(consumer)
consumer.consume(3)
consumer.consume(1)
consumer.consume(2)
consumer.consume(1)
span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)

def _compare_spans(self, spans, expected_spans):
for span, expected_span in zip(spans, expected_spans):
self.assertEqual(expected_span["name"], span.name)
for attribute_key, expected_attribute_value in expected_span[
"attributes"
].items():
self.assertEqual(
expected_attribute_value, span.attributes[attribute_key]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from confluent_kafka import Consumer


class MockConsumer(Consumer):
def __init__(self, queue, config):
self._queue = queue
super().__init__(config)

def consume(
self, num_messages=1, *args, **kwargs
): # pylint: disable=keyword-arg-before-vararg
messages = self._queue[:num_messages]
self._queue = self._queue[num_messages:]
return messages

def poll(self, timeout=None):
if len(self._queue) > 0:
return self._queue.pop(0)
return None


class MockedMessage:
def __init__(self, topic: str, partition: int, offset: int, headers):
self._topic = topic
self._partition = partition
self._offset = offset
self._headers = headers

def topic(self):
return self._topic

def partition(self):
return self._partition

def offset(self):
return self._offset

def headers(self):
return self._headers
Loading