Skip to content

Conversation

@pgrayy
Copy link
Collaborator

@pgrayy pgrayy commented Nov 19, 2025

Description

Properly handle start, stop, cancellation, and error propagation in Agent loop.

Goal

agent.receive runs indefinitely. With the changes made in this PR, users can now control exiting (cleanly) using the following patterns:

async def send(agent):
    while True:
        # get input event from user
        await agent.send(<EVENT>)

async def receive(agent):
    async for event in agent.receive():
        # output event

async def main():
    agent = BidiAgent()
    agent.start()  # could use a with context as well instead of the start, try/finally, stop

    try:
        # TaskGroup automatically waits for all tasks and will cancel all if exception is encountered
        async with asyncio.TaskGroup() as task_group:
            task_group.create_task(send(agent))
            task_group.create_task(receive(agent))
    except KeyboardInterrupt:
        pass
    finally:
        agent.stop()

Or

async def send(agent):
    while True:
        # get input event from user
        await agent.send(<EVENT>)

async def main():
    agent = BidiAgent()
    agent.start()  # could use a with context as well instead of the start, try/finally, stop

    send_task = asyncio.create_task(send(agent))
    try:
        async for event in agent.receive():
             if <SOME CONDITION>:
                 break

       send_task.cancel()
       await send_task
    finally:
        agent.stop()

Or

async def main():
    agent = BidiAgent()
    audio_io = BidiAgentIO()
    
    run_task = asyncio.create_task(agent.run([audio_io.input()], [audio_io.output()])
    try:
        await run_task
    except KeyboardInterrupt:
        run_task.cancel()
        await run_task

Testing

  • Ran hatch run prepare: Lint and mypy checks passing.
  • Ran the following script:
import asyncio
import json

from strands import tool
from strands.experimental.bidi import BidiAgent
from strands.experimental.bidi.io import BidiAudioIO, BidiTextIO


@tool
async def time_tool() -> str:
    print("TIME")
    return "12:01"

async def main() -> None:
    print("MAIN - starting agent")
    agent = BidiAgent(tools=[time_tool, weather_tool])

    audio_io = BidiAudioIO(input_rate=16000, output_rate=16000)
    text_io = BidiTextIO()

    try:
        run_coro = agent.run(inputs=[audio_io.input()], outputs=[audio_io.output(), text_io.output()])
        await asyncio.wait_for(run_coro, timeout=90)
    except asyncio.TimeoutError:
        pass

    print(f"MAIN - stopping agent: {json.dumps(agent.messages, indent=2)}")


if __name__ == "__main__":
    asyncio.run(main())
  • Model is responsive.
  • Tool calling works as expected.
  • Interruptions are immediate.

Also tested that interrupts lead to an exception with the following:

@tool(context=True)
async def time_tool(tool_context: ToolContext) -> str:
    print("TIME")
    tool_context.interrupt("test_interrupt")

The result of asking the agent to run this tool is RuntimeError: interrupts=['test_interrupt'] | tool interrupts are not supported in bidi.

@github-actions github-actions bot added size/m and removed size/m labels Nov 19, 2025
@github-actions github-actions bot added size/s and removed size/m labels Nov 20, 2025
@pgrayy pgrayy changed the title async cancellation and error propagation cancellation - agent loop Nov 20, 2025
@github-actions github-actions bot added size/s and removed size/s labels Nov 20, 2025
@github-actions github-actions bot added size/s and removed size/s labels Nov 20, 2025
@github-actions github-actions bot added size/m and removed size/s labels Nov 20, 2025
@github-actions github-actions bot added size/m and removed size/m labels Nov 20, 2025
@github-actions github-actions bot added size/m and removed size/m labels Nov 22, 2025
self.hooks.invoke_callbacks(BidiAgentInitializedEvent(agent=self))

# TODO: Determine if full support is required
self._interrupt_state = _InterruptState()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are to raise an exception with a clear error message whenever a users raises an interrupt in their tools, we need to add _interrupt_state to this agent.

raise RuntimeError("loop not started | call start before receiving")

while True:
event = await self._event_queue.get()
Copy link
Collaborator Author

@pgrayy pgrayy Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will break out of this loop under two conditions:

  1. Exception raised from run_model or run_tool task.
  2. User calls cancel on their receive task that calls agent.receive (whether they call it directly or through agent.run).
    • Cancel results in an asyncio.CancelledError being raised which will break out of this await self._event_queue.get() call and the method as a whole. This is why we don't need the _stop_event check anymore.


self._tasks.add(task)

async def _run_model(self) -> None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would encourage you to "Hide Whitespace" for this section.

if event["is_final"]:
message: Message = {"role": event["role"], "content": [{"text": event["text"]}]}
self._agent.messages.append(message)
try:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping in try/except so we can place errors into the _event_queue and reraise them from receive.

)

async for event in tool_events:
if isinstance(event, ToolInterruptEvent):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without setting _interrupt_state on agent, we wouldn't get this ToolInterruptEvent. Instead, we would get an "_interrupt_state attribute doesn't exist" error.

@pgrayy pgrayy marked this pull request as ready for review November 22, 2025 19:42
@pgrayy pgrayy merged commit c4181ec into main Nov 23, 2025
3 of 13 checks passed
@pgrayy pgrayy deleted the cancellation branch November 23, 2025 14:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants