Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/backend/base/langflow/services/tracing/arize_phoenix.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def __init__(
self.propagator.inject(carrier=self.carrier)

self.child_spans: dict[str, Span] = {}
self._context_tokens: dict[str, object] = {}

except Exception as e: # noqa: BLE001
logger.error("[Arize/Phoenix] Error Setting Up Tracer: %s", str(e), exc_info=True)
Expand Down Expand Up @@ -263,6 +264,13 @@ def add_trace(
elif component_name == "ChatOutput":
self.chat_output_value = processed_inputs["input_value"]

# Attach child span to OTel context so LangChain auto-instrumented
# spans (via LangChainInstrumentor) become children of this span
from opentelemetry import context as otel_context
from opentelemetry.trace import set_span_in_context

self._context_tokens[trace_id] = otel_context.attach(set_span_in_context(child_span))

Comment on lines +267 to +273
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

head -n 320 src/backend/base/langflow/services/tracing/arize_phoenix.py | tail -n +100

Repository: langflow-ai/langflow

Length of output: 9260


🏁 Script executed:

sed -n '260,320p' src/backend/base/langflow/services/tracing/arize_phoenix.py

Repository: langflow-ai/langflow

Length of output: 2442


🏁 Script executed:

sed -n '100,120p' src/backend/base/langflow/services/tracing/arize_phoenix.py

Repository: langflow-ai/langflow

Length of output: 1254


🌐 Web query:

OpenTelemetry Python contextvars attach detach token execution context async

💡 Result:

In OpenTelemetry Python, context propagation (across sync code and async/await) is implemented via a runtime context (typically contextvars), and the core API is:

  • opentelemetry.context.attach(ctx) -> Token: makes ctx the current context for the caller’s current execution unit (thread / async task) and returns a token representing the previous context. [1]
  • opentelemetry.context.detach(token): restores the context that was current before the matching attach(). Always pair it with attach() (use try/finally). [1][2]

Typical safe pattern (including async):

from opentelemetry import context, baggage

async def handler():
    ctx = baggage.set_baggage("user.id", "123")
    token = context.attach(ctx)
    try:
        await do_work()          # context preserved across async/await
        await do_more_work()
    finally:
        context.detach(token)    # restore previous context

If you don’t detach() after modifying/attaching context, you can get errors due to unmatched attach/detach (the SDK can detect leaked contexts). [3]

Sources: OpenTelemetry Python context API docs [1], OpenTelemetry Python context attach/detach guidance [2], example + warning about failing to detach [3].

[1] (opentelemetry-python.readthedocs.io)
[2] (deepwiki.com)
[3] (docs.honeycomb.io)

Citations:


🌐 Web query:

OpenTelemetry Python context.attach context.detach same thread async task

💡 Result:

In OpenTelemetry Python, context.attach(ctx)/context.detach(token) are implemented on top of Python contextvars, so the returned token is only valid in the same “current Context” (i.e., the same logical execution unit) where it was created. If you detach() in a different contextvars context, Python raises ValueError: Token was created in a different Context (or similar). [1][2]

What “same thread” means with asyncio

Even if everything runs on the same OS thread, different asyncio.Tasks can have different contextvars contexts, and the token must be detached in the same task / same context that did the attach. OpenTelemetry describes the scope as “same thread or async task.” [3]

Common ways this breaks

  • You attach() in one coroutine/task, but detach() runs:
    • in a different task (e.g., via create_task, callbacks, background tasks), or
    • during async-generator cleanup/GeneratorExit where finalization happens under a different context. This often shows up exactly as the “created in a different Context” error in streaming/generator scenarios. [4]

Safe pattern

Do detach(token) in a finally in the same coroutine call path that did attach():

from opentelemetry import context

token = context.attach(new_ctx)
try:
    await do_work()
finally:
    context.detach(token)

If you need the same context in a new task, propagate the context value into the task at creation time (rather than moving the token across tasks), and use task-local attach/detach there.

Sources
[1] OpenTelemetry Python Context API (attach/detach) (opentelemetry-python.readthedocs.io)
[2] PEP 567: Token.reset() raises ValueError if used in a different Context (peps.python.org)
[3] OpenTelemetry Python context behavior “same thread or async task” (deepwiki.com)
[4] Example async streaming/generator teardown causing “different Context” token detach error (github.com)

Citations:


Wrap token detach in try/finally to prevent context leaks on exceptions.

OpenTelemetry's attach()/detach() pattern requires detach to happen even if attribute-setting operations fail. Currently, exceptions during the output/log/status attribute operations (lines 306–314) skip the detach, leaving the context attached and causing resource leaks. Additionally, if add_trace() and end_trace() execute in different async tasks or contexts, the detach will raise ValueError because the token is context-scoped. Wrap the detach in a try/finally block within end_trace():

Suggested fix
try:
    self._set_span_status(child_span, error)
finally:
    token = self._context_tokens.pop(trace_id, None)
    if token is not None:
        from opentelemetry import context as otel_context
        otel_context.detach(token)
    child_span.end(end_time=self._get_current_timestamp())
    self.child_spans.pop(trace_id)

Also applies to: 305–311

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/base/langflow/services/tracing/arize_phoenix.py` around lines 267
- 273, The context detach currently can be skipped or raise if exceptions occur;
modify end_trace() so that the span status/attribute setting (calls to
_set_span_status, any output/log/status operations) are executed inside a try
block and the context token detach and span cleanup happen in a finally: pop the
token from self._context_tokens using trace_id, call otel_context.detach(token)
only if token is not None (handle possible ValueError by swallowing/ignoring
it), then call child_span.end(end_time=self._get_current_timestamp()) and remove
the span from self.child_spans; this ensures attach() made in add_trace() is
always detached even on errors.

self.child_spans[trace_id] = child_span

@override
Expand Down Expand Up @@ -293,6 +301,14 @@ def end_trace(
child_span.set_attribute("logs", self._safe_json_dumps(processed_logs))

self._set_span_status(child_span, error)

# Detach from OTel context before ending the span
token = self._context_tokens.pop(trace_id, None)
if token is not None:
from opentelemetry import context as otel_context

otel_context.detach(token)

child_span.end(end_time=self._get_current_timestamp())
self.child_spans.pop(trace_id)

Expand Down
Loading