Skip to content

Commit a923dbf

Browse files
committed
Add commit method for ConfluentKafkaInstrumentor's ProxiedConsumer
1 parent df32e8c commit a923dbf

File tree

3 files changed

+39
-1
lines changed

3 files changed

+39
-1
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4646
([#1435](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1435))
4747
- mongo db - fix db statement capturing
4848
([#1512](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1512))
49+
- Add commit method for ConfluentKafkaInstrumentor's ProxiedConsumer
50+
([#1656](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1656))
4951

5052
## Version 1.15.0/0.36b0 (2022-12-10)
5153

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,17 @@ def __init__(self, consumer: Consumer, tracer: Tracer):
173173
def committed(self, partitions, timeout=-1):
174174
return self._consumer.committed(partitions, timeout)
175175

176+
def commit(self, message=None, offsets=None, asynchronous=True):
177+
if message is not None and offsets is not None:
178+
raise ValueError(
179+
"message and offsets are mutually exclusive for confluent_kafka.Consumer.commit"
180+
)
181+
if message is not None:
182+
return self._consumer.commit(message=message, asynchronous=asynchronous)
183+
if offsets is not None:
184+
return self._consumer.commit(offsets=offsets, asynchronous=asynchronous)
185+
return self._consumer.commit(asynchronous=asynchronous)
186+
176187
def consume(
177188
self, num_messages=1, *args, **kwargs
178189
): # pylint: disable=keyword-arg-before-vararg

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
from unittest import TestCase
1818

19-
from confluent_kafka import Consumer, Producer
19+
from confluent_kafka import Consumer, Producer, TopicPartition
2020

2121
from opentelemetry.instrumentation.confluent_kafka import (
2222
ConfluentKafkaInstrumentor,
@@ -58,3 +58,28 @@ def test_instrument_api(self) -> None:
5858

5959
consumer = instrumentation.uninstrument_consumer(consumer)
6060
self.assertEqual(consumer.__class__, Consumer)
61+
62+
def test_consumer_commit_method(self) -> None:
63+
instrumentation = ConfluentKafkaInstrumentor()
64+
65+
consumer = Consumer(
66+
{
67+
"bootstrap.servers": "localhost:29092",
68+
"group.id": "mygroup",
69+
"auto.offset.reset": "earliest",
70+
}
71+
)
72+
73+
consumer = instrumentation.instrument_consumer(consumer)
74+
self.assertEqual(consumer.__class__, ProxiedConsumer)
75+
self.assertTrue(hasattr(consumer, "commit"))
76+
# test asserts that calling consumer.commit, with values passed for both message and offsets will raise an error.
77+
valueError = "message and offsets are mutually exclusive for confluent_kafka.Consumer.commit"
78+
self.assertRaisesRegex(
79+
ValueError,
80+
valueError,
81+
consumer.commit,
82+
message={"topic": "test"},
83+
offsets=[TopicPartition(topic="test")],
84+
asynchronous=True,
85+
)

0 commit comments

Comments
 (0)