Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/strands/experimental/bidirectional_streaming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
# IO channels - Hardware abstraction
from .io.audio import AudioIO

# IO protocols - For typing custom IO implementations
from .types.io import BidiInput, BidiOutput

# Model interface (for custom implementations)
from .models.bidirectional_model import BidiModel

Expand Down Expand Up @@ -45,6 +48,9 @@
"BidiAgent",
# IO channels
"AudioIO",
# IO protocols
"BidiInput",
"BidiOutput",
# Model providers
"BidiGeminiLiveModel",
"BidiNovaSonicModel",
Expand Down
115 changes: 68 additions & 47 deletions src/strands/experimental/bidirectional_streaming/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from ..models.novasonic import BidiNovaSonicModel
from ..types.agent import BidiAgentInput
from ..types.events import BidiAudioInputEvent, BidiImageInputEvent, BidiTextInputEvent, BidiInputEvent, BidiOutputEvent
from ..types import BidiIO
from ..types.io import BidiInput, BidiOutput
from ....experimental.tools import ToolProvider

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -422,73 +422,94 @@ def active(self) -> bool:
"""
return self._agent_loop is not None and self._agent_loop.active

async def run(self, io_channels: list[BidiIO | tuple[Callable, Callable]]) -> None:
"""Run the agent using provided IO channels or transport tuples for bidirectional communication.
async def run(
self,
inputs: list[BidiInput],
outputs: list[BidiOutput],
) -> None:
"""Run the agent using provided input and output callables.

Beta limitation: Currently supports only single input and single output.
Multiple inputs/outputs will be supported in a future release.

Args:
io_channels: List containing either BidiIO instances or (sender, receiver) tuples.
- BidiIO: IO channel instance with send(), receive(), and end() methods
- tuple: (sender_callable, receiver_callable) for custom transport
inputs: List containing a single input callable that returns BidiInputEvent.
outputs: List containing a single output callable that accepts BidiOutputEvent.

Example:
```python
# With IO channel
audio_io = AudioIO(audio_config={"input_sample_rate": 16000})

agent = BidiAgent(model=model, tools=[calculator])
await agent.run(io_channels=[audio_io])

# With tuple (backward compatibility)
await agent.run(io_channels=[(sender_function, receiver_function)])
await agent.run(
inputs=[audio_io.input],
outputs=[audio_io.output]
)

await audio_io.stop()
```

Raises:
ValueError: If io_channels list is empty or contains invalid items.
Exception: Any exception from the transport layer.
ValueError: If inputs or outputs lists are empty or contain multiple items.
Exception: Any exception from the IO layer.
"""
if not io_channels:
raise ValueError("io_channels parameter cannot be empty. Provide either an IO channel or (sender, receiver) tuple.")
if not inputs:
raise ValueError("inputs parameter cannot be empty. Provide at least one input callable.")
if not outputs:
raise ValueError("outputs parameter cannot be empty. Provide at least one output callable.")

transport = io_channels[0]

# Set IO channel tracking for cleanup
if hasattr(transport, 'send') and hasattr(transport, 'receive'):
self._current_adapters = [transport] # IO channel needs cleanup
elif isinstance(transport, tuple) and len(transport) == 2:
self._current_adapters = [] # Tuple needs no cleanup
else:
raise ValueError("io_channels list must contain either BidiIO instances or (sender, receiver) tuples.")
# Beta limitation: single input/output only
if len(inputs) > 1:
raise ValueError("Beta limitation: Multiple inputs not yet supported. Please provide a single input callable.")
if len(outputs) > 1:
raise ValueError("Beta limitation: Multiple outputs not yet supported. Please provide a single output callable.")

# Auto-manage session lifecycle
if self.active:
await self._run_with_transport(transport)
await self._run_simple(inputs[0], outputs[0])
else:
async with self:
await self._run_with_transport(transport)
await self._run_simple(inputs[0], outputs[0])

async def _run_with_transport(
async def _run_simple(
self,
transport: BidiIO | tuple[Callable, Callable],
input_callable: BidiInput,
output_callable: BidiOutput,
) -> None:
"""Internal method to run send/receive loops with an active connection."""

async def receive_from_agent():
"""Receive events from agent and send to transport."""
async for event in self.receive():
if hasattr(transport, 'receive'):
await transport.receive(event)
else:
await transport[0](event)

async def send_to_agent():
"""Receive events from transport and send to agent."""
while self.active:
if hasattr(transport, 'send'):
event = await transport.send()
else:
event = await transport[1]()
await self.send(event)
"""Internal method to run send/receive loops with single input/output.

Simplified implementation for beta release.

Args:
input_callable: Single input callable.
output_callable: Single output callable.
"""

await asyncio.gather(receive_from_agent(), send_to_agent(), return_exceptions=True)
async def send_loop():
"""Read from input and send to agent."""
while self.active:
try:
event = await input_callable()
await self.send(event)
# Yield control to prevent event loop starvation
await asyncio.sleep(0)
except asyncio.CancelledError:
break
except Exception as e:
logger.warning("Error in send loop: %s", e)
await asyncio.sleep(0.1)

async def receive_loop():
"""Receive from agent and write to output."""
try:
async for event in self.receive():
await output_callable(event)
except asyncio.CancelledError:
pass
except Exception as e:
logger.warning("Error in receive loop: %s", e)

await asyncio.gather(send_loop(), receive_loop(), return_exceptions=True)

def _validate_active_connection(self) -> None:
"""Validate that an active connection exists.
Expand Down
Loading
Loading