Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 134 additions & 77 deletions src/strands/event_loop/event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@

logger = logging.getLogger(__name__)

MAX_ATTEMPTS = 6
INITIAL_DELAY = 4
MAX_DELAY = 240 # 4 minutes


def initialize_state(**kwargs: Any) -> Any:
"""Initialize the request state if not present.
Expand All @@ -51,7 +55,7 @@ def event_loop_cycle(
system_prompt: Optional[str],
messages: Messages,
tool_config: Optional[ToolConfig],
callback_handler: Any,
callback_handler: Callable[..., Any],
tool_handler: Optional[ToolHandler],
tool_execution_handler: Optional[ParallelToolExecutorInterface] = None,
**kwargs: Any,
Expand Down Expand Up @@ -130,13 +134,9 @@ def event_loop_cycle(
stop_reason: StopReason
usage: Any
metrics: Metrics
max_attempts = 6
initial_delay = 4
max_delay = 240 # 4 minutes
current_delay = initial_delay

# Retry loop for handling throttling exceptions
for attempt in range(max_attempts):
for attempt in range(MAX_ATTEMPTS):
model_id = model.config.get("model_id") if hasattr(model, "config") else None
model_invoke_span = tracer.start_model_invoke_span(
parent_span=cycle_span,
Expand Down Expand Up @@ -177,7 +177,7 @@ def event_loop_cycle(

# Handle throttling errors with exponential backoff
should_retry, current_delay = handle_throttling_error(
e, attempt, max_attempts, current_delay, max_delay, callback_handler, kwargs
e, attempt, MAX_ATTEMPTS, INITIAL_DELAY, MAX_DELAY, callback_handler, kwargs
)
if should_retry:
continue
Expand All @@ -204,80 +204,35 @@ def event_loop_cycle(

# If the model is requesting to use tools
if stop_reason == "tool_use":
tool_uses: List[ToolUse] = []
tool_results: List[ToolResult] = []
invalid_tool_use_ids: List[str] = []

# Extract and validate tools
validate_and_prepare_tools(message, tool_uses, tool_results, invalid_tool_use_ids)

# Check if tools are available for execution
if tool_uses:
if tool_handler is None:
raise ValueError("toolUse present but tool handler not set")
if tool_config is None:
raise ValueError("toolUse present but tool config not set")

# Create the tool handler process callable
tool_handler_process: Callable[[ToolUse], ToolResult] = partial(
tool_handler.process,
messages=messages,
model=model,
system_prompt=system_prompt,
tool_config=tool_config,
callback_handler=callback_handler,
**kwargs,
if not tool_handler:
raise EventLoopException(
Exception("Model requested tool use but no tool handler provided"),
kwargs["request_state"],
)

# Execute tools (parallel or sequential)
run_tools(
handler=tool_handler_process,
tool_uses=tool_uses,
event_loop_metrics=event_loop_metrics,
request_state=cast(Any, kwargs["request_state"]),
invalid_tool_use_ids=invalid_tool_use_ids,
tool_results=tool_results,
cycle_trace=cycle_trace,
parent_span=cycle_span,
parallel_tool_executor=tool_execution_handler,
if tool_config is None:
raise EventLoopException(
Exception("Model requested tool use but no tool config provided"),
kwargs["request_state"],
)

# Update state for the next cycle
kwargs = prepare_next_cycle(kwargs, event_loop_metrics)

# Create the tool result message
tool_result_message: Message = {
"role": "user",
"content": [{"toolResult": result} for result in tool_results],
}
messages.append(tool_result_message)
callback_handler(message=tool_result_message)

if cycle_span:
tracer.end_event_loop_cycle_span(
span=cycle_span, message=message, tool_result_message=tool_result_message
)

# Check if we should stop the event loop
if kwargs["request_state"].get("stop_event_loop"):
event_loop_metrics.end_cycle(cycle_start_time, cycle_trace)
return (
stop_reason,
message,
event_loop_metrics,
kwargs["request_state"],
)

# Recursive call to continue the conversation
return recurse_event_loop(
model=model,
system_prompt=system_prompt,
messages=messages,
tool_config=tool_config,
callback_handler=callback_handler,
tool_handler=tool_handler,
**kwargs,
)
# Handle tool execution
return _handle_tool_execution(
stop_reason,
message,
model,
system_prompt,
messages,
tool_config,
tool_handler,
callback_handler,
tool_execution_handler,
event_loop_metrics,
cycle_trace,
cycle_span,
cycle_start_time,
kwargs,
)

# End the cycle and return results
event_loop_metrics.end_cycle(cycle_start_time, cycle_trace)
Expand Down Expand Up @@ -377,3 +332,105 @@ def prepare_next_cycle(kwargs: Dict[str, Any], event_loop_metrics: EventLoopMetr
kwargs["event_loop_parent_cycle_id"] = kwargs["event_loop_cycle_id"]

return kwargs


def _handle_tool_execution(
stop_reason: StopReason,
message: Message,
model: Model,
system_prompt: Optional[str],
messages: Messages,
tool_config: ToolConfig,
tool_handler: ToolHandler,
callback_handler: Callable[..., Any],
tool_execution_handler: Optional[ParallelToolExecutorInterface],
event_loop_metrics: EventLoopMetrics,
cycle_trace: Trace,
cycle_span: Any,
cycle_start_time: float,
kwargs: Dict[str, Any],
) -> Tuple[StopReason, Message, EventLoopMetrics, Dict[str, Any]]:
tool_uses: List[ToolUse] = []
tool_results: List[ToolResult] = []
invalid_tool_use_ids: List[str] = []

"""
Handles the execution of tools requested by the model during an event loop cycle.

Args:
stop_reason (StopReason): The reason the model stopped generating.
message (Message): The message from the model that may contain tool use requests.
model (Model): The model provider instance.
system_prompt (Optional[str]): The system prompt instructions for the model.
messages (Messages): The conversation history messages.
tool_config (ToolConfig): Configuration for available tools.
tool_handler (ToolHandler): Handler for tool execution.
callback_handler (Callable[..., Any]): Callback for processing events as they happen.
tool_execution_handler (Optional[ParallelToolExecutorInterface]): Optional handler for parallel tool execution.
event_loop_metrics (EventLoopMetrics): Metrics tracking object for the event loop.
cycle_trace (Trace): Trace object for the current event loop cycle.
cycle_span (Any): Span object for tracing the cycle (type may vary).
cycle_start_time (float): Start time of the current cycle.
kwargs (Dict[str, Any]): Additional keyword arguments, including request state.

Returns:
Tuple[StopReason, Message, EventLoopMetrics, Dict[str, Any]]:
- The stop reason,
- The updated message,
- The updated event loop metrics,
- The updated request state.
"""
validate_and_prepare_tools(message, tool_uses, tool_results, invalid_tool_use_ids)

if not tool_uses:
return stop_reason, message, event_loop_metrics, kwargs["request_state"]

tool_handler_process = partial(
tool_handler.process,
messages=messages,
model=model,
system_prompt=system_prompt,
tool_config=tool_config,
callback_handler=callback_handler,
**kwargs,
)

run_tools(
handler=tool_handler_process,
tool_uses=tool_uses,
event_loop_metrics=event_loop_metrics,
request_state=cast(Any, kwargs["request_state"]),
invalid_tool_use_ids=invalid_tool_use_ids,
tool_results=tool_results,
cycle_trace=cycle_trace,
parent_span=cycle_span,
parallel_tool_executor=tool_execution_handler,
)

kwargs = prepare_next_cycle(kwargs, event_loop_metrics)

tool_result_message: Message = {
"role": "user",
"content": [{"toolResult": result} for result in tool_results],
}

messages.append(tool_result_message)
callback_handler(message=tool_result_message)

if cycle_span:
tracer = get_tracer()
tracer.end_event_loop_cycle_span(span=cycle_span, message=message, tool_result_message=tool_result_message)

if kwargs["request_state"].get("stop_event_loop", False):
event_loop_metrics.end_cycle(cycle_start_time, cycle_trace)
return stop_reason, message, event_loop_metrics, kwargs["request_state"]

return recurse_event_loop(
model=model,
system_prompt=system_prompt,
messages=messages,
tool_config=tool_config,
callback_handler=callback_handler,
tool_handler=tool_handler,
**kwargs,
)