Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BREAK: Introduce otel middleware and require kstream >= 0.17 #7

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
BREAK: Introduce otel middleware and require kstream >= 0.17
  • Loading branch information
woile committed Oct 2, 2024
commit eba7f8c65e2410ee84cbad0ffde9f4fae0c66703
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

Version: `0.3.0`

> [!IMPORTANT]
> This instrumentation works only with [ksterams middlewares](https://kpn.github.io/kstreams/middleware/) after `v0.17.0`

## Installation

```sh
Expand Down
52 changes: 26 additions & 26 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ python = "^3.8"
opentelemetry-api = "^1.27.0"
opentelemetry-instrumentation = "^0.48b0"
opentelemetry-semantic-conventions = "^0.48b0"
kstreams = { version = ">=0.12.0", optional = true }
kstreams = { version = ">=0.17.0", optional = true }

[tool.poetry.group.dev.dependencies]
ruff = "^0.6"
Expand All @@ -31,6 +31,9 @@ opentelemetry-test-utils = "^0.48b0"
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.ruff.lint]
select = ["I"]

[tool.ruff.lint.isort]
known-first-party = ["opentelemetry_instrumentation_kstreams", "tests"]

Expand Down
19 changes: 12 additions & 7 deletions src/opentelemetry_instrumentation_kstreams/instrumentor.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
from typing import Any, Collection

from kstreams import StreamEngine, Stream
from kstreams import Stream, StreamEngine
from opentelemetry import trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore
from opentelemetry.instrumentation.utils import unwrap
from wrapt import wrap_function_wrapper

from .package import _instruments
from .version import __version__
from .utils import (
from .wrappers import (
# _wrap_getone,
_wrap_build_stream_middleware_stack,
_wrap_send,
_wrap_getone,
)

from .package import _instruments


class KStreamsInstrumentor(BaseInstrumentor):
"""Instrument kstreams with OpenTelemetry.
Expand All @@ -37,8 +38,12 @@ def _instrument(self, **kwargs: Any):
schema_url="https://opentelemetry.io/schemas/1.11.0",
)
wrap_function_wrapper(StreamEngine, "send", _wrap_send(tracer))
wrap_function_wrapper(Stream, "getone", _wrap_getone(tracer))
wrap_function_wrapper(
StreamEngine,
"build_stream_middleware_stack",
_wrap_build_stream_middleware_stack(tracer),
)

def _uninstrument(self, **kwargs: Any):
unwrap(StreamEngine, "send")
unwrap(Stream, "getone")
unwrap(Stream, "build_stream_middleware_stack")
111 changes: 111 additions & 0 deletions src/opentelemetry_instrumentation_kstreams/middlewares.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from typing import Any, Optional

from kstreams import (
ConsumerRecord,
middleware,
)
from kstreams.backends.kafka import Kafka
from opentelemetry import context, propagate, trace
from opentelemetry.context.context import Context

# Enable after 0.49 is released
# from opentelemetry.semconv._incubating.attributes import messaging_attributes as SpanAttributes
from opentelemetry.trace import SpanKind, Tracer

from . import utils
from .utils import (
KStreamsKafkaExtractor,
_get_span_name,
_kstreams_getter,
)


class OpenTelemetryMiddleware(middleware.BaseMiddleware):
"""
Middleware for integrating OpenTelemetry tracing with Kafka Streams.

This middleware extracts tracing information from Kafka consumer records and
creates spans for tracing the processing of these records.

Attributes:
tracer: The OpenTelemetry tracer instance used for creating spans.

Methods:
__call__(cr: ConsumerRecord) -> Any:
Asynchronously processes a Kafka consumer record, creating and enriching
an OpenTelemetry span with tracing information.
"""

def __init__(self, *, tracer: Optional[Tracer] = None, **kwargs) -> None:
super().__init__(**kwargs)
if tracer is None:
tracer = trace.get_tracer(__name__)

# The current tracer instance
self.tracer = tracer

# Initialize variables computed once which are injected into the span
if not isinstance(self.stream.backend, Kafka):
raise NotImplementedError("Only Kafka backend is supported for now")
self.bootstrap_servers = KStreamsKafkaExtractor.extract_bootstrap_servers(
self.stream.backend
)
self.consumer_group = KStreamsKafkaExtractor.extract_consumer_group(
self.stream.consumer
)
self.client_id = KStreamsKafkaExtractor.extract_consumer_client_id(self.stream)

async def __call__(self, cr: ConsumerRecord) -> Any:
"""
Asynchronously processes a ConsumerRecord by creating and managing a span.

Args:
cr (ConsumerRecord): The consumer record to be processed.

Returns:
Any: The result of the next call in the processing chain.

This method performs the following steps:
1. Extracts the context from the record headers.
2. Starts a new span with the extracted context.
3. Enriches the span with base and record-specific information.
4. Optionally sets the consumer group attribute (currently commented out).
5. Calls the next processing function in the chain.
6. Detaches the context token.
"""
tracer = self.tracer
record = cr
bootstrap_servers = self.bootstrap_servers
client_id = self.client_id
span_name = _get_span_name("receive", record.topic)
extracted_context: Context = propagate.extract(
record.headers, getter=_kstreams_getter
)

with tracer.start_as_current_span(
span_name,
context=extracted_context,
end_on_exit=True,
kind=SpanKind.CONSUMER,
) as span:
new_context = trace.set_span_in_context(span, extracted_context)
context_token = context.attach(new_context)

utils._enrich_base_span(
span,
bootstrap_servers,
record.topic,
client_id,
)
utils._enrich_span_with_record_info(
span, record.topic, record.partition, record.offset
)

# TODO: enable after 0.49 is released
# if self.consumer_group is not None:
# span.set_attribute(
# SpanAttributes.MESSAGING_CONSUMER_GROUP_NAME, self.consumer_group
# )

await self.next_call(cr)
context.detach(context_token)
2 changes: 1 addition & 1 deletion src/opentelemetry_instrumentation_kstreams/package.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
_instruments = ("kstreams >= 0.13.0",)
_instruments = ("kstreams >= 0.17.0",)
Loading
Loading