Skip to content

Commit 2d9e21c

Browse files
[pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
1 parent 0686e28 commit 2d9e21c

File tree

3 files changed

+41
-33
lines changed

3 files changed

+41
-33
lines changed

src/workflows/recipe/__init__.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
import functools
44
import logging
55
from collections.abc import Callable
6-
from opentelemetry import trace
76
from typing import Any
87

8+
from opentelemetry import trace
9+
910
from workflows.recipe.recipe import Recipe
1011
from workflows.recipe.validate import validate_recipe
1112
from workflows.recipe.wrapper import RecipeWrapper
@@ -82,16 +83,18 @@ def unwrap_recipe(header, message):
8283
environment = message.get("environment", {})
8384
if isinstance(environment, dict):
8485
recipe_id = environment.get("ID")
85-
86+
8687
if recipe_id:
8788
span.set_attribute("recipe_id", recipe_id)
88-
span.add_event("recipe.id_extracted", attributes={"recipe_id": recipe_id})
89+
span.add_event(
90+
"recipe.id_extracted", attributes={"recipe_id": recipe_id}
91+
)
8992

9093
# Extract span_id and trace_id for logging
9194
span_context = span.get_span_context()
9295
if span_context and span_context.is_valid:
93-
span_id = format(span_context.span_id, '016x')
94-
trace_id = format(span_context.trace_id, '032x')
96+
span_id = format(span_context.span_id, "016x")
97+
trace_id = format(span_context.trace_id, "032x")
9598

9699
log_extra = {
97100
"span_id": span_id,
@@ -102,12 +105,8 @@ def unwrap_recipe(header, message):
102105
if recipe_id:
103106
log_extra["recipe_id"] = recipe_id
104107

105-
logger.info(
108+
logger.info("Processing recipe message", extra=log_extra)
106109

107-
"Processing recipe message",
108-
extra=log_extra
109-
)
110-
111110
if log_extender and rw.environment and rw.environment.get("ID"):
112111
with log_extender("recipe_ID", rw.environment["ID"]):
113112
return callback(rw, header, message.get("payload"))

src/workflows/services/common_service.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@
99
import time
1010
from typing import Any
1111

12-
import workflows
13-
import workflows.logging
14-
1512
from opentelemetry import trace
13+
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
14+
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
1615
from opentelemetry.sdk.trace import TracerProvider
1716
from opentelemetry.sdk.trace.export import BatchSpanProcessor
18-
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
17+
18+
import workflows
19+
import workflows.logging
1920
from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware
20-
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
2121

2222

2323
class Status(enum.Enum):
@@ -192,30 +192,36 @@ def start_transport(self):
192192
self.transport.subscription_callback_set_intercept(
193193
self._transport_interceptor
194194
)
195-
195+
196196
# Configure OTELTracing
197-
resource = Resource.create({
198-
SERVICE_NAME: self._service_name,
199-
})
197+
resource = Resource.create(
198+
{
199+
SERVICE_NAME: self._service_name,
200+
}
201+
)
200202

201203
self.log.debug("Configuring OTELTracing")
202204
provider = TracerProvider(resource=resource)
203205
trace.set_tracer_provider(provider)
204206

205207
# Configure BatchProcessor and OTLPSpanExporter to point to OTELCollector
206208
otlp_exporter = OTLPSpanExporter(
207-
endpoint="https://otel.tracing.diamond.ac.uk:4318/v1/traces",
208-
timeout=10
209+
endpoint="https://otel.tracing.diamond.ac.uk:4318/v1/traces", timeout=10
209210
)
210211
span_processor = BatchSpanProcessor(otlp_exporter)
211212
provider.add_span_processor(span_processor)
212213

213214
# Add OTELTracingMiddleware to the transport layer
214215
tracer = trace.get_tracer(__name__)
215-
otel_middleware = OTELTracingMiddleware(tracer, service_name=self._service_name)
216+
otel_middleware = OTELTracingMiddleware(
217+
tracer, service_name=self._service_name
218+
)
216219
self._transport.add_middleware(otel_middleware)
217220

218-
self.log.debug("OTELTracingMiddleware added to transport layer of %s", self._service_name)
221+
self.log.debug(
222+
"OTELTracingMiddleware added to transport layer of %s",
223+
self._service_name,
224+
)
219225

220226
metrics = self._environment.get("metrics")
221227
if metrics:
Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1+
from __future__ import annotations
2+
3+
import functools
4+
from collections.abc import Callable
5+
16
from opentelemetry import trace
7+
from opentelemetry.propagate import extract
8+
29
from workflows.transport.middleware import BaseTransportMiddleware
3-
from collections.abc import Callable
4-
import functools
5-
from opentelemetry.propagate import inject, extract
10+
611

712
class OTELTracingMiddleware(BaseTransportMiddleware):
813
def __init__(self, tracer: trace.Tracer, service_name: str):
@@ -19,18 +24,16 @@ def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int:
1924
def wrapped_callback(header, message):
2025
# Extract trace context from message headers
2126
ctx = extract(header) if header else None
22-
27+
2328
# Start a new span with the extracted context
2429
with self.tracer.start_as_current_span(
25-
"transport.subscribe",
26-
context=ctx
30+
"transport.subscribe", context=ctx
2731
) as span:
2832
span.set_attribute("service_name", self.service_name)
2933
span.set_attribute("channel", channel)
30-
31-
34+
3235
# Call the original callback
3336
return callback(header, message)
34-
37+
3538
# Call the next middleware with the wrapped callback
36-
return call_next(channel, wrapped_callback, **kwargs)
39+
return call_next(channel, wrapped_callback, **kwargs)

0 commit comments

Comments
 (0)