Skip to content

RunResultStreaming.stream_events() raises ValueError when reset_current=True in async environments #515

Closed
@JilinJL

Description

@JilinJL

Please read this first

  • Have you read the docs? Agents SDK docs
  • Have you searched for related issues? Yes, no existing issue was found.

Describe the bug

When using the RunResultStreaming.stream_events() method within a streaming API endpoint in frameworks like FastAPI (or any ASGI server), the following exception is raised at the end of the stream:

ValueError: <Token var= at 0x...> was created in a different Context

This error originates from self._trace.finish(reset_current=True) being executed across async task boundaries, which violates how ContextVar tokens must be reset in the same context they were set.


Debug information

  • Agents SDK version: v0.0.3
  • Python version: 3.12.2
  • Framework: FastAPI + Uvicorn

My codes:

# the agent_func code
      if stream:
          return Runner.run_streamed(
              starting_agent=agent,
              input=input_items,
          ).stream_events()
# Here is the server layer:
@doc_agent_router.post("/agent/run")
async def run_agent(request: AgentRequest):
    try:
        agent_func = agent_map.get(request.mode)
        if not agent_func:
            raise ValueError("Invalid mode.")
            agent_kwargs = {
                "query": request.query,
                "original_content": request.content,
                "stream": request.stream
            }

        print("\n params :\n", agent_kwargs)
        result = await agent_func(**agent_kwargs)

        if request.stream:
            async def event_stream() -> AsyncGenerator[str, None]:
                async for chunk in result:
                    yield sse_event(chunk)
                yield sse_done()
            
            return StreamingResponse(
                event_stream(), 
                media_type="text/event-stream"
            )
        else:
            return BaseResponse(code=200, data=result)
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"SLevel Internal server error: {str(e)}")
# stream_events
# .venv/lib/python3.12/site-packages/agents/result.py
@dataclass
class RunResultStreaming(RunResultBase):
    """The result of an agent run in streaming mode. You can use the `stream_events` method to
    receive semantic events as they are generated.

    The streaming method will raise:
    - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.
    - A GuardrailTripwireTriggered exception if a guardrail is tripped.
    """

    current_agent: Agent[Any]
    """The current agent that is running."""

    current_turn: int
    """The current turn number."""

    max_turns: int
    """The maximum number of turns the agent can run for."""

    final_output: Any
    """The final output of the agent. This is None until the agent has finished running."""

    _current_agent_output_schema: AgentOutputSchema | None = field(repr=False)

    _trace: Trace | None = field(repr=False)

    is_complete: bool = False
    """Whether the agent has finished running."""

    # Queues that the background run_loop writes to
    _event_queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel] = field(
        default_factory=asyncio.Queue, repr=False
    )
    _input_guardrail_queue: asyncio.Queue[InputGuardrailResult] = field(
        default_factory=asyncio.Queue, repr=False
    )

    # Store the asyncio tasks that we're waiting on
    _run_impl_task: asyncio.Task[Any] | None = field(default=None, repr=False)
    _input_guardrails_task: asyncio.Task[Any] | None = field(default=None, repr=False)
    _output_guardrails_task: asyncio.Task[Any] | None = field(default=None, repr=False)
    _stored_exception: Exception | None = field(default=None, repr=False)

    @property
    def last_agent(self) -> Agent[Any]:
        """The last agent that was run. Updates as the agent run progresses, so the true last agent
        is only available after the agent run is complete.
        """
        return self.current_agent

    async def stream_events(self) -> AsyncIterator[StreamEvent]:
        """Stream deltas for new items as they are generated. We're using the types from the
        OpenAI Responses API, so these are semantic events: each event has a `type` field that
        describes the type of the event, along with the data for that event.

        This will raise:
        - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.
        - A GuardrailTripwireTriggered exception if a guardrail is tripped.
        """
        while True:
            self._check_errors()
            if self._stored_exception:
                logger.debug("Breaking due to stored exception")
                self.is_complete = True
                break

            if self.is_complete and self._event_queue.empty():
                break

            try:
                item = await self._event_queue.get()
            except asyncio.CancelledError:
                break

            if isinstance(item, QueueCompleteSentinel):
                self._event_queue.task_done()
                # Check for errors, in case the queue was completed due to an exception
                self._check_errors()
                break

            yield item
            self._event_queue.task_done()

        if self._trace:
            self._trace.finish(reset_current=True)  # here is

        self._cleanup_tasks()

        if self._stored_exception:
            raise self._stored_exception
  ...

Repro steps

Use the following minimal example with FastAPI:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai_agents import create_openai_agent
from openai_agents.executor import AgentExecutor

app = FastAPI()
agent = create_openai_agent(tools=[])

@app.get("/stream")
async def stream():
    executor = AgentExecutor(agent=agent)
    result = await executor.run_stream(query="Hello!", stream=True)

    async def stream_gen():
        async for event in result.stream_events():  # ❗ Error occurs here after stream ends
            yield event.delta.text

    return StreamingResponse(stream_gen(), media_type="text/event-stream")

### Root Cause

This happens because `self._trace.finish(reset_current=True)` attempts to reset a `ContextVar` from a different asynchronous context than the one it was created in.

### Result
After successful streaming, the endpoint raises a ValueError.

### Expected behavior
The stream should close cleanly without internal exceptions. The tracing system should finalize safely, even across asynchronous contexts.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions