Skip to content

Commit 0b25489

Browse files
committed
instrumentation/kafka: fix handling consumer iteration if transaction not sampled
Handle the case where if the transaction is not sampled capture_span will return None instead of span. While at it fix handling of checking for KAFKA_HOST in tests. Fix #2073
1 parent 3e5ad6c commit 0b25489

File tree

2 files changed

+23
-4
lines changed

2 files changed

+23
-4
lines changed

elasticapm/instrumentation/packages/kafka.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ def call(self, module, method, wrapped, instance, args, kwargs):
143143
try:
144144
result = wrapped(*args, **kwargs)
145145
except StopIteration:
146-
span.cancel()
146+
if span:
147+
span.cancel()
147148
raise
148149
if span and not isinstance(span, DroppedSpan):
149150
topic = result[0]

tests/instrumentation/kafka_tests.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,10 @@
4545

4646
pytestmark = [pytest.mark.kafka]
4747

48-
if "KAFKA_HOST" not in os.environ:
48+
KAFKA_HOST = os.environ.get("KAFKA_HOST")
49+
if not KAFKA_HOST:
4950
pytestmark.append(pytest.mark.skip("Skipping kafka tests, no KAFKA_HOST environment variable set"))
5051

51-
KAFKA_HOST = os.environ["KAFKA_HOST"]
52-
5352

5453
@pytest.fixture(scope="function")
5554
def topics():
@@ -233,3 +232,22 @@ def test_kafka_poll_unsampled_transaction(instrument, elasticapm_client, consume
233232
elasticapm_client.end_transaction("foo")
234233
spans = elasticapm_client.events[SPAN]
235234
assert len(spans) == 0
235+
236+
237+
def test_kafka_consumer_unsampled_transaction_handles_stop_iteration(
238+
instrument, elasticapm_client, producer, consumer, topics
239+
):
240+
def delayed_send():
241+
time.sleep(0.2)
242+
producer.send("test", key=b"foo", value=b"bar")
243+
244+
thread = threading.Thread(target=delayed_send)
245+
thread.start()
246+
transaction = elasticapm_client.begin_transaction("foo")
247+
transaction.is_sampled = False
248+
for item in consumer:
249+
pass
250+
thread.join()
251+
elasticapm_client.end_transaction("foo")
252+
spans = elasticapm_client.events[SPAN]
253+
assert len(spans) == 0

0 commit comments

Comments
 (0)