Closed
Description
Please read this first
- ✅ Have you read the docs? Agents SDK docs
- ✅ Have you searched for related issues? Yes, no existing issue was found.
Describe the bug
When using the RunResultStreaming.stream_events()
method within a streaming API endpoint in frameworks like FastAPI (or any ASGI server), the following exception is raised at the end of the stream:
ValueError: <Token var= at 0x...> was created in a different Context
This error originates from self._trace.finish(reset_current=True)
being executed across async task boundaries, which violates how ContextVar
tokens must be reset in the same context they were set.
Debug information
- Agents SDK version:
v0.0.3
- Python version:
3.12.2
- Framework:
FastAPI + Uvicorn
My codes:
# the agent_func code
if stream:
return Runner.run_streamed(
starting_agent=agent,
input=input_items,
).stream_events()
# Here is the server layer:
@doc_agent_router.post("/agent/run")
async def run_agent(request: AgentRequest):
try:
agent_func = agent_map.get(request.mode)
if not agent_func:
raise ValueError("Invalid mode.")
agent_kwargs = {
"query": request.query,
"original_content": request.content,
"stream": request.stream
}
print("\n params :\n", agent_kwargs)
result = await agent_func(**agent_kwargs)
if request.stream:
async def event_stream() -> AsyncGenerator[str, None]:
async for chunk in result:
yield sse_event(chunk)
yield sse_done()
return StreamingResponse(
event_stream(),
media_type="text/event-stream"
)
else:
return BaseResponse(code=200, data=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"SLevel Internal server error: {str(e)}")
# stream_events
# .venv/lib/python3.12/site-packages/agents/result.py
@dataclass
class RunResultStreaming(RunResultBase):
"""The result of an agent run in streaming mode. You can use the `stream_events` method to
receive semantic events as they are generated.
The streaming method will raise:
- A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.
- A GuardrailTripwireTriggered exception if a guardrail is tripped.
"""
current_agent: Agent[Any]
"""The current agent that is running."""
current_turn: int
"""The current turn number."""
max_turns: int
"""The maximum number of turns the agent can run for."""
final_output: Any
"""The final output of the agent. This is None until the agent has finished running."""
_current_agent_output_schema: AgentOutputSchema | None = field(repr=False)
_trace: Trace | None = field(repr=False)
is_complete: bool = False
"""Whether the agent has finished running."""
# Queues that the background run_loop writes to
_event_queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel] = field(
default_factory=asyncio.Queue, repr=False
)
_input_guardrail_queue: asyncio.Queue[InputGuardrailResult] = field(
default_factory=asyncio.Queue, repr=False
)
# Store the asyncio tasks that we're waiting on
_run_impl_task: asyncio.Task[Any] | None = field(default=None, repr=False)
_input_guardrails_task: asyncio.Task[Any] | None = field(default=None, repr=False)
_output_guardrails_task: asyncio.Task[Any] | None = field(default=None, repr=False)
_stored_exception: Exception | None = field(default=None, repr=False)
@property
def last_agent(self) -> Agent[Any]:
"""The last agent that was run. Updates as the agent run progresses, so the true last agent
is only available after the agent run is complete.
"""
return self.current_agent
async def stream_events(self) -> AsyncIterator[StreamEvent]:
"""Stream deltas for new items as they are generated. We're using the types from the
OpenAI Responses API, so these are semantic events: each event has a `type` field that
describes the type of the event, along with the data for that event.
This will raise:
- A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.
- A GuardrailTripwireTriggered exception if a guardrail is tripped.
"""
while True:
self._check_errors()
if self._stored_exception:
logger.debug("Breaking due to stored exception")
self.is_complete = True
break
if self.is_complete and self._event_queue.empty():
break
try:
item = await self._event_queue.get()
except asyncio.CancelledError:
break
if isinstance(item, QueueCompleteSentinel):
self._event_queue.task_done()
# Check for errors, in case the queue was completed due to an exception
self._check_errors()
break
yield item
self._event_queue.task_done()
if self._trace:
self._trace.finish(reset_current=True) # here is
self._cleanup_tasks()
if self._stored_exception:
raise self._stored_exception
...
Repro steps
Use the following minimal example with FastAPI:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai_agents import create_openai_agent
from openai_agents.executor import AgentExecutor
app = FastAPI()
agent = create_openai_agent(tools=[])
@app.get("/stream")
async def stream():
executor = AgentExecutor(agent=agent)
result = await executor.run_stream(query="Hello!", stream=True)
async def stream_gen():
async for event in result.stream_events(): # ❗ Error occurs here after stream ends
yield event.delta.text
return StreamingResponse(stream_gen(), media_type="text/event-stream")
### Root Cause
This happens because `self._trace.finish(reset_current=True)` attempts to reset a `ContextVar` from a different asynchronous context than the one it was created in.
### Result
After successful streaming, the endpoint raises a ValueError.
### Expected behavior
The stream should close cleanly without internal exceptions. The tracing system should finalize safely, even across asynchronous contexts.