-
Notifications
You must be signed in to change notification settings - Fork 203
feat(multiagent): introduce Swarm multi-agent orchestrator #416
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
cdb4dee
to
1397e6e
Compare
@@ -317,6 +317,9 @@ def __init__( | |||
self.hooks.add_hook(hook) | |||
self.hooks.invoke_callbacks(AgentInitializedEvent(agent=self)) | |||
|
|||
# When True, force stops the agent's event loop | |||
self.stop_event_loop = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a big fan of this existing on the agent - it's sort of a per-request state living off of the agent that complicates complicates restarting the agent.
Can we use exceptions for this case instead? StopAgentException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call, will do in a following commit
|
||
from_node: SwarmNode | ||
to_node: SwarmNode | ||
content: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Content for human readable, context for extensibility/ structured data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can content also be a ContentBlock
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove SwarmMessage in next commits
src/strands/multiagent/swarm.py
Outdated
max_iterations: int = 20 | ||
execution_timeout: float = 900.0 # Total execution timeout (seconds) | ||
node_timeout: float = 300.0 # Individual node timeout (seconds) | ||
ping_pong_check_nodes: int = 8 # Number of recent nodes to check for ping-pong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could just be a knowledge gap on my end, but not sure what ping-pong here is referencing, for both fields
"""Set list of available agents.""" | ||
self.available_nodes = nodes | ||
|
||
def add_context(self, node: SwarmNode, key: str, value: Any) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for serialization purposes, @Unshure added some checks for values to ensure that all state was serializable. Should that be done here as well? I presume context is something that would be preserved if we were persisting swarms?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would the context not live on the node itself instead of via this shared context? Esp. given that you have to pass in the node
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for serialization purposes, @Unshure added some checks for values to ensure that all state was serializable.
Trying to understand how SharedContext is different from AgentState? I can more easily see how context that is generic to the swarm can be useful, but this implementation makes it seem like you can only set context about specific nodes.
Read through the Swarm class, and I understand now that this is populated by the execution of tools from the agents, not by the user.
self.context[node.node_id] = {} | ||
self.context[node.node_id][key] = value | ||
|
||
def get_relevant_context(self, target_node: SwarmNode) -> dict[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm presuming this will come from docs, but I'm not really understanding what "relevant context" is.
"""Initialize swarm configuration.""" | ||
# Validate agents have names and create SwarmNode objects | ||
for i, node in enumerate(nodes): | ||
if not hasattr(node, "name") or not node.name: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we should/can use node.name
directly, no? No need to be extra safe with hasattr
?
|
||
return self._build_result() | ||
|
||
def _setup_swarm(self, nodes: list[Agent]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we explicitly name this as agents
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could because at the moment it is indeed just agents. I was thinking that we might expand the valid node executor types in Swarms as well, so kept it as nodes for now
logger.info("node_id=<%d> | agent has no name, dynamically generating one", node_id) | ||
|
||
node_id = str(node.name) | ||
self.nodes[node_id] = SwarmNode(node_id=node_id, executor=node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a unique-ness check here?
swarm_ref = self # Capture swarm reference | ||
|
||
@tool | ||
def get_swarm_context() -> dict[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And agents use context for more than just passing it into other tools, right?
|
||
for node in self.nodes.values(): | ||
# Use the agent's tool registry to process and register the tools | ||
node.executor.tool_registry.process_tools(swarm_tools) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be looking for duplicate names or protect against it somehow?
Probably out of scope for this pr, but it would be nice to be able to stop the event loop both after a tool is called (current behavior) and after the model is called (new behavior). |
@@ -462,7 +462,7 @@ def tool_handler(tool_use: ToolUse) -> ToolGenerator: | |||
tracer = get_tracer() | |||
tracer.end_event_loop_cycle_span(span=cycle_span, message=message, tool_result_message=tool_result_message) | |||
|
|||
if invocation_state["request_state"].get("stop_event_loop", False): | |||
if agent.stop_event_loop or invocation_state["request_state"].get("stop_event_loop", False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove the request_state stop_event_loop
logic here? I don't like that there is a special, undocumented key to enable this behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I will replace this with a StopAgentException
context: dict[str, dict[str, Any]] = field(default_factory=dict) | ||
node_history: list[SwarmNode] = field(default_factory=list) | ||
current_task: str | list[ContentBlock] | None = None | ||
available_nodes: list[SwarmNode] = field(default_factory=list) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should this be a list or a set?
return { | ||
"task": self.current_task, | ||
"node_history": [node.node_id for node in self.node_history], | ||
"shared_context": {k: v for k, v in self.context.items() if v}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When would the values in the context be None?
"""Set list of available agents.""" | ||
self.available_nodes = nodes | ||
|
||
def add_context(self, node: SwarmNode, key: str, value: Any) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for serialization purposes, @Unshure added some checks for values to ensure that all state was serializable.
Trying to understand how SharedContext is different from AgentState? I can more easily see how context that is generic to the swarm can be useful, but this implementation makes it seem like you can only set context about specific nodes.
Read through the Swarm class, and I understand now that this is populated by the execution of tools from the agents, not by the user.
|
||
from_node: SwarmNode | ||
to_node: SwarmNode | ||
content: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can content also be a ContentBlock
?
self.completion_status = Status.FAILED | ||
return False, f"execution_timeout_{config.execution_timeout}s" | ||
|
||
# 5. Check for node ping-pong (nodes passing back and forth) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# 5. Check for node ping-pong (nodes passing back and forth) | |
# 5. Check for node ping-pong 🏓 (nodes passing back and forth) |
message_history: list[SwarmMessage] = field(default_factory=list) | ||
iteration_count: int = 0 | ||
start_time: float = field(default_factory=time.time) | ||
last_node_sequence: list[SwarmNode] = field(default_factory=list) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some documentation on what these attributes represent would be helpful. I'm having a hard time wrapping my head around what this is, and how it is used with ping_pong_check_nodes
class SwarmResult(MultiAgentResult): | ||
"""Result from swarm execution - extends MultiAgentResult with swarm-specific details.""" | ||
|
||
status: Status = Status.PENDING |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: status
is already in MultiAgentResult
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good spot, thank you
node_history: list[SwarmNode] = field(default_factory=list) | ||
message_history: list[SwarmMessage] = field(default_factory=list) | ||
iteration_count: int = 0 | ||
final_result: str | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can a result also be a ContentBlock
or list[ContentBlock]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove final_result in following commits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have unit tests for this code?
if not target_node: | ||
return {"status": "error", "reason": f"agent_{target_agent_name}_not_found"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a duplicate check, handoff_to_agent
already checks for this.
previous_agent.node_id, | ||
target_node.node_id, | ||
) | ||
return {"status": "success", "target_agent": target_agent_name} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to return a tool result here? The handoff_to_agent
already does that.
|
||
logger.info("swarm task completed") | ||
|
||
def _format_context(self, context_info: dict[str, Any]) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It would be helpful to include an example of what this formatted context is supposed to look like in the docstring here.
|
||
await self._execute_swarm() | ||
|
||
if self.state.completion_status == Status.EXECUTING: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is it expected that after self._execute_swarm()
the self.state.completion_status
is not Status.COMPLETED
? Is this an error case? If so, can we at least log here?
|
||
if not isinstance(task, str): | ||
# Include additional ContentBlocks in node input | ||
node_input = node_input + task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we convert a task that is an instance of str to a Text ContentBlock, and append it here?
) | ||
|
||
# Store result in state | ||
self.state.results[node_name] = node_result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If one agent in a swarm is executed multiple times, does its result get overwritten? Is this node_result history stored somewhere?
Description
Swarm
multi-agent orchestratorstop_event_loop
boolean toAgent
classRelated Issues
#214
Documentation PR
TODO
Type of Change
New feature
Testing
How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli
hatch run prepare
Checklist
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.