Skip to content

Conversation

@mkmeral
Copy link

@mkmeral mkmeral commented Nov 12, 2025

Callable IO Refactoring

Overview

Refactored the bidirectional streaming IO system from a protocol-based approach to a callable-based approach, enabling truly non-blocking, concurrent input/output handling with proper backpressure management.

What Changed

Before: Protocol-Based IO

class BidiIO(Protocol):
    async def start() -> dict
    async def send() -> dict
    async def receive(event: dict) -> None
    def stop() -> None

# Usage
audio_io = AudioIO()
await agent.run(io_channels=[audio_io])

After: Callable-Based IO

class BidiInput(Protocol):
    def __call__() -> Awaitable[BidiInputEvent]

class BidiOutput(Protocol):
    def __call__(event: BidiOutputEvent) -> Awaitable[None]

# Usage
audio_io = AudioIO()
await audio_io.start()
await agent.run(
    inputs=[audio_io.input],
    outputs=[audio_io.output]
)
await audio_io.stop()

Key Benefits

1. True Non-Blocking Operation

  • Multiple inputs run concurrently without blocking each other
  • Multiple outputs run concurrently without blocking each other
  • Audio, text, websocket, etc. can all work simultaneously

2. Automatic Backpressure Handling

  • Each output has its own queue (max 100 events)
  • Slow outputs drop events instead of blocking fast outputs
  • Critical outputs (audio playback) unaffected by slow outputs (network upload)

3. Better Error Isolation

  • Errors in one input don't crash other inputs
  • Errors in one output don't crash other outputs
  • Proper exception logging for debugging

4. Cleaner API

  • Separate inputs and outputs parameters
  • Support for multiple inputs and outputs naturally
  • Better type hints with Protocol

Architecture

┌─────────────────────────────────────────────────────────────┐
│                        BidiAgent                            │
│                                                             │
│  ┌──────────────────┐              ┌──────────────────┐   │
│  │  Input Readers   │              │  Output Writers  │   │
│  │                  │              │                  │   │
│  │  ┌────────────┐  │              │  ┌────────────┐ │   │
│  │  │ Audio In   │──┼──┐        ┌──┼─▶│ Audio Out  │ │   │
│  │  └────────────┘  │  │        │  │  └────────────┘ │   │
│  │  ┌────────────┐  │  │        │  │  ┌────────────┐ │   │
│  │  │ Text In    │──┼──┼─Queue──┼──┼─▶│ Logger     │ │   │
│  │  └────────────┘  │  │        │  │  └────────────┘ │   │
│  │  ┌────────────┐  │  │        │  │  ┌────────────┐ │   │
│  │  │ WebSocket  │──┼──┘        └──┼─▶│ UI Display │ │   │
│  │  └────────────┘  │              │  └────────────┘ │   │
│  └──────────────────┘              └──────────────────┘   │
│         ▲                                    ▲             │
│         │                                    │             │
│         └────────── Agent Loop ──────────────┘             │
└─────────────────────────────────────────────────────────────┘

Example Usage

Basic Audio IO

from strands.experimental.bidirectional_streaming import BidiAgent, AudioIO

audio_io = AudioIO(audio_config={"input_sample_rate": 16000})
await audio_io.start()

agent = BidiAgent(model="nova-sonic", tools=[get_time])
await agent.run(
    inputs=[audio_io.input],
    outputs=[audio_io.output]
)

await audio_io.stop()

Multiple Inputs and Outputs

# Multiple input sources
audio_io = AudioIO()
text_input = ConsoleInput()
websocket_input = WebSocketInput()

# Multiple output destinations
logger = FileLogger()
ui_display = UIDisplay()

await agent.run(
    inputs=[
        audio_io.input,      # Microphone
        text_input,          # Console
        websocket_input,     # WebSocket
    ],
    outputs=[
        audio_io.output,     # Speakers (critical - fast)
        logger,              # File logging (fast)
        ui_display,          # UI updates (fast)
        network_upload,      # Cloud upload (slow - won't block others)
    ]
)

Custom IO Implementation

class CustomInput:
    async def __call__(self) -> BidiTextInputEvent:
        # Your custom input logic
        data = await read_from_source()
        return BidiTextInputEvent(text=data, role="user")

class CustomOutput:
    async def __call__(self, event: BidiOutputEvent) -> None:
        # Your custom output logic
        if isinstance(event, BidiTranscriptStreamEvent):
            await write_to_destination(event["text"])

# Use with agent
await agent.run(
    inputs=[CustomInput()],
    outputs=[CustomOutput()]
)

Implementation Details

Input Handling

  • Each input runs in its own input_reader task
  • All inputs feed into a shared queue (max 1000 events)
  • Events processed in arrival order (FIFO)
  • Errors logged but don't crash other inputs

Output Handling

  • Each output runs in its own output_writer task
  • Each output has its own queue (max 100 events)
  • Queue full = drop events for that output only
  • Errors logged but don't crash other outputs

Backpressure

  • Output queues use put_nowait() to avoid blocking
  • asyncio.QueueFull exception = drop event + log warning
  • Fast outputs continue normally when slow outputs drop events

Shutdown

  • All tasks properly cancelled on agent stop
  • CancelledError handled gracefully
  • Queues drained automatically

Testing

Unit tests cover:

  • Parameter validation (inputs/outputs required)
  • Protocol compliance (callable signatures)
  • Error handling and isolation

Run tests:

hatch test tests/strands/experimental/bidirectional_streaming/agent/test_agent.py

Migration Guide

Old Code

audio_io = AudioIO()
await agent.run(io_channels=[audio_io])

New Code

audio_io = AudioIO()
await audio_io.start()
await agent.run(
    inputs=[audio_io.input],
    outputs=[audio_io.output]
)
await audio_io.stop()

Performance Characteristics

  • Latency: Minimal - direct queue-based communication
  • Throughput: High - concurrent processing of all I/O
  • Memory: Bounded - queue size limits prevent unbounded growth
  • CPU: Efficient - async/await with proper yielding

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant