Skip to content

Add confluent kafka docs #1668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ asyncpg>=0.12.0
boto~=2.0
botocore~=1.0
celery>=4.0
confluent-kafka>= 1.8.2,< 2.0.0
elasticsearch>=2.0,<9.0
flask~=2.0
falcon~=2.0
Expand Down
27 changes: 8 additions & 19 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,25 +126,14 @@ def getlistcfg(strval):
]


if "class_references" in mcfg:
class_references = getlistcfg(mcfg["class_references"])
for class_reference in class_references:
nitpick_ignore.append(
(
"py:class",
class_reference,
)
)

if "anys" in mcfg:
anys = getlistcfg(mcfg["anys"])
for _any in anys:
nitpick_ignore.append(
(
"any",
_any,
)
)
ignore_categories = ["py-class", "py-func", "py-exc", "any"]

for category in ignore_categories:
if category in mcfg:
items = getlistcfg(mcfg[category])
for item in items:
nitpick_ignore.append((category.replace("-", ":"), item))


# Add any paths that contain templates here, relative to this directory.
templates_path = ["_templates"]
Expand Down
7 changes: 7 additions & 0 deletions docs/instrumentation/confluent_kafka/confluent_kafka.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.. include:: ../../../instrumentation/opentelemetry-instrumentation-confluent-kafka/README.rst

.. automodule:: opentelemetry.instrumentation.confluent_kafka
:members:
:undoc-members:
:show-inheritance:
:noindex:
23 changes: 21 additions & 2 deletions docs/nitpick-exceptions.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[default]
class_references=
py-class=
; TODO: Understand why sphinx is not able to find this local class
opentelemetry.propagators.textmap.CarrierT
opentelemetry.propagators.textmap.Setter
Expand All @@ -11,6 +11,8 @@ class_references=
opentelemetry.propagators.textmap.Getter
; - AWSXRayPropagator
opentelemetry.sdk.trace.id_generator.IdGenerator
opentelemetry.instrumentation.confluent_kafka.ProxiedProducer
opentelemetry.instrumentation.confluent_kafka.ProxiedConsumer
; - AwsXRayIdGenerator
TextMapPropagator
CarrierT
Expand All @@ -26,8 +28,16 @@ class_references=
httpx.AsyncByteStream
httpx.Response
yarl.URL
cimpl.Producer
cimpl.Consumer
func
Message
TopicPartition
callable
Consumer
confluent_kafka.Message

anys=
any=
; API
opentelemetry.propagators.textmap.TextMapPropagator.fields
; - AWSXRayPropagator
Expand All @@ -44,3 +54,12 @@ anys=
; - instrumentation.*
Setter
httpx
;
py-func=
poll
flush
Message.error

py-exc=
KafkaException
KafkaError
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
# limitations under the License.

"""
Instrument `confluent-kafka-python` to report instrumentation-confluent-kafka produced and consumed messages
Instrument confluent-kafka-python to report instrumentation-confluent-kafka produced and consumed messages

Usage
-----

..code:: python
.. code-block:: python

from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
from confluent_kafka import Producer, Consumer
Expand All @@ -30,24 +30,21 @@
conf1 = {'bootstrap.servers': "localhost:9092"}
producer = Producer(conf1)
producer.produce('my-topic',b'raw_bytes')

conf2 = {'bootstrap.servers': "localhost:9092",
'group.id': "foo",
'auto.offset.reset': 'smallest'}
conf2 = {'bootstrap.servers': "localhost:9092", 'group.id': "foo", 'auto.offset.reset': 'smallest'}
# report a span of type consumer with the default settings
consumer = Consumer(conf2)

def basic_consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
running = True
while running:
msg = consumer.poll(timeout=1.0)
if msg is None: continue

if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}}\n")
sys.stderr.write(f"{msg.topic() [{msg.partition()}] reached end at offset {msg.offset()}}")
elif msg.error():
raise KafkaException(msg.error())
else:
Expand All @@ -57,19 +54,26 @@ def basic_consume_loop(consumer, topics):
consumer.close()

basic_consume_loop(consumer, "my-topic")
---

The _instrument method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider

instrument_producer (Callable) - a function with extra user-defined logic to be performed before sending the message
this function signature is:

def instrument_producer(producer: Producer, tracer_provider=None)

instrument_consumer (Callable) - a function with extra user-defined logic to be performed after consuming a message
this function signature is:

def instrument_consumer(consumer: Consumer, tracer_provider=None)
for example:

.. code:: python

The `_instrument` method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider
instrument_producer (Callable) - a function with extra user-defined logic to be performed before sending the message
this function signature is:
def instrument_producer(producer: Producer, tracer_provider=None)
instrument_consumer (Callable) - a function with extra user-defined logic to be performed after consuming a message
this function signature is:
def instrument_consumer(consumer: Consumer, tracer_provider=None)
for example:
.. code: python
from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor

from confluent_kafka import Producer, Consumer

inst = ConfluentKafkaInstrumentor()
Expand All @@ -85,15 +89,12 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
p = inst.instrument_producer(p, tracer_provider)
c = inst.instrument_consumer(c, tracer_provider=tracer_provider)


# Using kafka as normal now will automatically generate spans,
# including user custom attributes added from the hooks
conf = {'bootstrap.servers': "localhost:9092"}
p.produce('my-topic',b'raw_bytes')
msg = c.poll()


API
___
"""
from typing import Collection
Expand Down