Skip to content

Commit

Permalink
fix(confluent-kafka): Fixed lint errors
Browse files Browse the repository at this point in the history
  • Loading branch information
javferrod committed Jul 9, 2023
1 parent 4c9cce6 commit ac9ec26
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ def poll(self, timeout=-1): # pylint: disable=useless-super-delegation
return super().poll(timeout)

# This method is deliberately implemented in order to allow wrapt to wrap this function
def consume(self, *args, **kwargs): # pylint: disable=useless-super-delegation
def consume(
self, *args, **kwargs
): # pylint: disable=useless-super-delegation
return super().consume(*args, **kwargs)


Expand Down Expand Up @@ -184,7 +186,11 @@ def commit(self, *args, **kwargs):

def consume(self, *args, **kwargs):
return ConfluentKafkaInstrumentor.wrap_consume(
self._consumer.consume, self, self._tracer, args, kwargs,
self._consumer.consume,
self,
self._tracer,
args,
kwargs,
)

def get_watermark_offsets(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

# pylint: disable=no-name-in-module

from opentelemetry.semconv.trace import SpanAttributes, MessagingDestinationKindValues
from opentelemetry.semconv.trace import (
SpanAttributes,
MessagingDestinationKindValues,
)
from opentelemetry.test.test_base import TestBase
from .utils import MockConsumer, MockedMessage

Expand Down Expand Up @@ -106,19 +109,16 @@ def test_context_getter(self) -> None:
context_setter.set(carrier_list, "key1", "val1")
self.assertEqual(context_getter.get(carrier_list, "key1"), ["val1"])
self.assertEqual(["key1"], context_getter.keys(carrier_list))

def test_poll(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
mocked_messages = [
MockedMessage("topic-10", 0, 0, []),
MockedMessage("topic-20", 2, 4, []),
MockedMessage("topic-30", 1, 3, []),
]
expected_spans= [
{
"name": "recv",
"attributes": {}
},
expected_spans = [
{"name": "recv", "attributes": {}},
{
"name": "topic-10 process",
"attributes": {
Expand All @@ -128,12 +128,9 @@ def test_poll(self) -> None:
SpanAttributes.MESSAGING_DESTINATION: "topic-10",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-10.0.0",
}
},
{
"name": "recv",
"attributes": {}
},
},
{"name": "recv", "attributes": {}},
{
"name": "topic-20 process",
"attributes": {
Expand All @@ -143,12 +140,9 @@ def test_poll(self) -> None:
SpanAttributes.MESSAGING_DESTINATION: "topic-20",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-20.2.4",
}
},
{
"name": "recv",
"attributes": {}
},
},
{"name": "recv", "attributes": {}},
{
"name": "topic-30 process",
"attributes": {
Expand All @@ -158,32 +152,29 @@ def test_poll(self) -> None:
SpanAttributes.MESSAGING_DESTINATION: "topic-30",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-30.1.3",
}
},
{
"name": "recv",
"attributes": {}
},
},
{"name": "recv", "attributes": {}},
]

consumer = MockConsumer(
mocked_messages,
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
}
},
)
span_list = self.memory_exporter.clear()
consumer = instrumentation.instrument_consumer(consumer)
consumer.poll()
consumer.poll()
consumer.poll()
consumer.poll()

span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)

def test_consume(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
mocked_messages = [
Expand All @@ -194,61 +185,49 @@ def test_consume(self) -> None:
MockedMessage("topic-3", 0, 3, []),
MockedMessage("topic-2", 0, 1, []),
]
expected_spans= [
{
"name": "recv",
"attributes": {}
},
expected_spans = [
{"name": "recv", "attributes": {}},
{
"name": "topic-1 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-1",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
}
},
{
"name": "recv",
"attributes": {}
},
},
{"name": "recv", "attributes": {}},
{
"name": "topic-2 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-2",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
}
},
{
"name": "recv",
"attributes": {}
},
},
{"name": "recv", "attributes": {}},
{
"name": "topic-3 process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-3",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
}
},
{
"name": "recv",
"attributes": {}
},
},
{"name": "recv", "attributes": {}},
]

consumer = MockConsumer(
mocked_messages,
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
}
},
)

span_list = self.memory_exporter.clear()
consumer = instrumentation.instrument_consumer(consumer)
consumer.consume(3)
Expand All @@ -259,7 +238,11 @@ def test_consume(self) -> None:
self._compare_spans(span_list, expected_spans)

def _compare_spans(self, spans, expected_spans):
for (span, expected_span) in zip(spans, expected_spans):
self.assertEqual(expected_span['name'], span.name)
for attribute_key, expected_attribute_value in expected_span['attributes'].items():
self.assertEqual(expected_attribute_value, span.attributes[attribute_key])
for span, expected_span in zip(spans, expected_spans):
self.assertEqual(expected_span["name"], span.name)
for attribute_key, expected_attribute_value in expected_span[
"attributes"
].items():
self.assertEqual(
expected_attribute_value, span.attributes[attribute_key]
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@


class MockConsumer(Consumer):

def __init__(self, queue, config):
self._queue = queue
super().__init__(config)

def consume(self, num_messages=1, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg
def consume(
self, num_messages=1, *args, **kwargs
): # pylint: disable=keyword-arg-before-vararg
messages = self._queue[:num_messages]
self._queue = self._queue[num_messages:]
return messages

def poll(self, timeout=None):
if len(self._queue) > 0:
return self._queue.pop(0)
Expand Down

0 comments on commit ac9ec26

Please sign in to comment.