A modular Python package for real-time agent message streaming and filtering, designed for LLM/AI agent systems. Provides composable stream filters and streaming utilities for RabbitMQ and more.
- Composable stream filters (by function call, text, source, etc.)
- Pluggable streaming backends (RabbitMQ, more coming)
- Clean separation of filter logic and streaming logic
- Environment-variable-based configuration for managers
- Extensible for new message types and streaming backends
pip install agent-stream
You can use the package in your code as follows:
from agent_stream.managers import RabbitMQManager
rabbitmq_manager = RabbitMQManager()
await rabbitmq_manager.connect()
# ... use rabbitmq_manager ...
await rabbitmq_manager.close()
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.base import Handoff
from autogen_agentchat.teams import Swarm
from agent_stream.streams import RabbitMQStream
from agent_stream.filters import FunctionCallStreamFilter, StreamChunkStreamFilter
from agent_stream.managers import RabbitMQManager
# Define your agents and team (simplified example)
leader_agent = AssistantAgent(
"leader_agent",
tools=[],
model_client=your_llm_client,
model_client_stream=True,
system_message="You are the leader.",
handoffs=[Handoff(target="coder_agent", name="request_code_implementation")]
)
coder_agent = AssistantAgent(
"coder_agent",
tools=[],
model_client=your_llm_client,
model_client_stream=True,
system_message="You are the coder.",
handoffs=[Handoff(target="leader_agent", name="provide_implementation")]
)
team = Swarm([leader_agent, coder_agent])
# Initialize RabbitMQ manager
rabbitmq_manager = RabbitMQManager()
await rabbitmq_manager.connect()
# Run the team and stream results through RabbitMQStream with filters
result = await RabbitMQStream(
stream=team.run_stream(task="Your task message"),
rabbitmq=rabbitmq_manager,
queue_name="agent-stream-demo",
filters=[
FunctionCallStreamFilter("signal_edit_file"),
FunctionCallStreamFilter("signal_read_file"),
StreamChunkStreamFilter("leader_agent"),
StreamChunkStreamFilter("coder_agent")
]
)
await rabbitmq_manager.close()
You can use SSEStream
to stream agent or LLM events to clients over HTTP using Server-Sent Events (SSE). This is useful for real-time web applications.
from agent_stream.streams import SSEStream
from agent_stream.filters import FunctionCallStreamFilter, StreamChunkStreamFilter
# Example usage in an async context (e.g., inside a FastAPI/Starlette endpoint)
async for sse_event in SSEStream(
stream=team.run_stream(task="Your task message"),
filters=[
FunctionCallStreamFilter("signal_edit_file"),
StreamChunkStreamFilter("leader_agent")
]
):
# sse_event is a string formatted for SSE, e.g. 'event: ...\ndata: ...\nsource: ...\n\n'
# You can yield this directly in a FastAPI/Starlette StreamingResponse
print(sse_event)
This will yield SSE-formatted strings for each event, which you can send to the client in real time. Each event includes the event type, data, and source fields.
RABBITMQ_HOST
,RABBITMQ_PORT
,RABBITMQ_USERNAME
,RABBITMQ_PASSWORD
(for RabbitMQManager)
- Add new filters in
filters/
- Add new streaming backends in
streams/
- Add new managers in
managers/
Filtering in agent-stream
allows you to process only the messages or events you care about from a stream of agent or LLM outputs. Filters are composable and can be combined using logical operators to create complex filtering logic.
- FunctionCallStreamFilter: Passes through only messages that represent a function/tool call with a specific name.
- TextMessageStreamFilter: Passes through only messages containing a specific text.
- StreamChunkStreamFilter: Passes through only messages from a specific agent or stream chunk.
from agent_stream.filters import FunctionCallStreamFilter, TextMessageStreamFilter, StreamChunkStreamFilter
# Filter for tool call events with a specific function name
filter1 = FunctionCallStreamFilter("signal_edit_code")
# Filter for messages containing a specific text
filter2 = TextMessageStreamFilter("error")
# Filter for messages from a specific agent
filter3 = StreamChunkStreamFilter("leader_agent")
# Combine filters with | (OR) and & (AND)
combined_filter = (filter1 | filter2) & filter3
You can pass a single filter or a list of filters to RabbitMQStream
or other streaming utilities. When multiple filters are provided, a message must pass all filters to be included (logical AND).
from agent_stream.streams import RabbitMQStream
from agent_stream.filters import FunctionCallStreamFilter, StreamChunkStreamFilter
result = await RabbitMQStream(
stream=your_async_stream,
rabbitmq=rabbitmq_manager,
queue_name="your-queue-name",
filters=[
FunctionCallStreamFilter("signal_edit_file"),
StreamChunkStreamFilter("leader_agent")
]
)
This will only stream messages that are function calls to signal_edit_file
and are from the leader_agent
.
You can create your own filters by subclassing the base filter class and implementing the __call__
method.
from agent_stream.filters import BaseStreamFilter
class CustomFilter(BaseStreamFilter):
def __call__(self, message):
# Your custom logic here
return "important" in message.get("content", "")
MIT