Skip to content

Commit 6ba29d0

Browse files
committed
Applying Black to increase readability and Modularizing Event Loop
1 parent 63cef21 commit 6ba29d0

File tree

5 files changed

+200
-107
lines changed

5 files changed

+200
-107
lines changed

src/strands/event_loop/error_handler.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
from ..types.exceptions import ContextWindowOverflowException, ModelThrottledException
1414
from ..types.models import Model
1515
from ..types.streaming import StopReason
16-
from .message_processor import find_last_message_with_tool_results, truncate_tool_results
16+
from .message_processor import (
17+
find_last_message_with_tool_results,
18+
truncate_tool_results,
19+
)
1720

1821
logger = logging.getLogger(__name__)
1922

@@ -100,7 +103,10 @@ def handle_input_too_long_error(
100103

101104
# If we found a message with toolResult
102105
if last_message_with_tool_results is not None:
103-
logger.debug("message_index=<%s> | found message with tool results at index", last_message_with_tool_results)
106+
logger.debug(
107+
"message_index=<%s> | found message with tool results at index",
108+
last_message_with_tool_results,
109+
)
104110

105111
# Truncate the tool results in this message
106112
truncate_tool_results(messages, last_message_with_tool_results)

src/strands/event_loop/event_loop.py

Lines changed: 136 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818
from ..tools.executor import run_tools, validate_and_prepare_tools
1919
from ..types.content import Message, Messages
2020
from ..types.event_loop import ParallelToolExecutorInterface
21-
from ..types.exceptions import ContextWindowOverflowException, EventLoopException, ModelThrottledException
21+
from ..types.exceptions import (
22+
ContextWindowOverflowException,
23+
EventLoopException,
24+
ModelThrottledException,
25+
)
2226
from ..types.models import Model
2327
from ..types.streaming import Metrics, StopReason
2428
from ..types.tools import ToolConfig, ToolHandler, ToolResult, ToolUse
@@ -28,8 +32,12 @@
2832

2933
logger = logging.getLogger(__name__)
3034

35+
MAX_ATTEMPTS = 6
36+
INITIAL_DELAY = 4
37+
MAX_DELAY = 240 # 4 minutes
3138

32-
def initialize_state(**kwargs: Any) -> Any:
39+
40+
def initialize_state(kwargs: Dict[str, Any]) -> Dict[str, Any]:
3341
"""Initialize the request state if not present.
3442
3543
Creates an empty request_state dictionary if one doesn't already exist in the
@@ -51,7 +59,7 @@ def event_loop_cycle(
5159
system_prompt: Optional[str],
5260
messages: Messages,
5361
tool_config: Optional[ToolConfig],
54-
callback_handler: Any,
62+
callback_handler: Callable[..., Any],
5563
tool_handler: Optional[ToolHandler],
5664
tool_execution_handler: Optional[ParallelToolExecutorInterface] = None,
5765
**kwargs: Any,
@@ -100,10 +108,12 @@ def event_loop_cycle(
100108
# Initialize cycle state
101109
kwargs["event_loop_cycle_id"] = uuid.uuid4()
102110

103-
event_loop_metrics: EventLoopMetrics = kwargs.get("event_loop_metrics", EventLoopMetrics())
111+
event_loop_metrics: EventLoopMetrics = kwargs.get(
112+
"event_loop_metrics", EventLoopMetrics()
113+
)
104114

105115
# Initialize state and get cycle trace
106-
kwargs = initialize_state(**kwargs)
116+
kwargs = initialize_state(kwargs)
107117
cycle_start_time, cycle_trace = event_loop_metrics.start_cycle()
108118
kwargs["event_loop_cycle_trace"] = cycle_trace
109119

@@ -130,9 +140,9 @@ def event_loop_cycle(
130140
stop_reason: StopReason
131141
usage: Any
132142
metrics: Metrics
133-
max_attempts = 6
134-
initial_delay = 4
135-
max_delay = 240 # 4 minutes
143+
max_attempts = MAX_ATTEMPTS
144+
initial_delay = INITIAL_DELAY
145+
max_delay = MAX_DELAY
136146
current_delay = initial_delay
137147

138148
# Retry loop for handling throttling exceptions
@@ -145,13 +155,15 @@ def event_loop_cycle(
145155
)
146156

147157
try:
148-
stop_reason, message, usage, metrics, kwargs["request_state"] = stream_messages(
149-
model,
150-
system_prompt,
151-
messages,
152-
tool_config,
153-
callback_handler,
154-
**kwargs,
158+
stop_reason, message, usage, metrics, kwargs["request_state"] = (
159+
stream_messages(
160+
model,
161+
system_prompt,
162+
messages,
163+
tool_config,
164+
callback_handler,
165+
**kwargs,
166+
)
155167
)
156168
if model_invoke_span:
157169
tracer.end_model_invoke_span(model_invoke_span, message, usage)
@@ -177,7 +189,13 @@ def event_loop_cycle(
177189

178190
# Handle throttling errors with exponential backoff
179191
should_retry, current_delay = handle_throttling_error(
180-
e, attempt, max_attempts, current_delay, max_delay, callback_handler, kwargs
192+
e,
193+
attempt,
194+
max_attempts,
195+
current_delay,
196+
max_delay,
197+
callback_handler,
198+
kwargs,
181199
)
182200
if should_retry:
183201
continue
@@ -204,80 +222,29 @@ def event_loop_cycle(
204222

205223
# If the model is requesting to use tools
206224
if stop_reason == "tool_use":
207-
tool_uses: List[ToolUse] = []
208-
tool_results: List[ToolResult] = []
209-
invalid_tool_use_ids: List[str] = []
210-
211-
# Extract and validate tools
212-
validate_and_prepare_tools(message, tool_uses, tool_results, invalid_tool_use_ids)
213-
214-
# Check if tools are available for execution
215-
if tool_uses:
216-
if tool_handler is None:
217-
raise ValueError("toolUse present but tool handler not set")
218-
if tool_config is None:
219-
raise ValueError("toolUse present but tool config not set")
220-
221-
# Create the tool handler process callable
222-
tool_handler_process: Callable[[ToolUse], ToolResult] = partial(
223-
tool_handler.process,
224-
messages=messages,
225-
model=model,
226-
system_prompt=system_prompt,
227-
tool_config=tool_config,
228-
callback_handler=callback_handler,
229-
**kwargs,
225+
if not tool_handler:
226+
raise EventLoopException(
227+
"Model requested tool use but no tool handler provided",
228+
kwargs["request_state"],
230229
)
231230

232-
# Execute tools (parallel or sequential)
233-
run_tools(
234-
handler=tool_handler_process,
235-
tool_uses=tool_uses,
236-
event_loop_metrics=event_loop_metrics,
237-
request_state=cast(Any, kwargs["request_state"]),
238-
invalid_tool_use_ids=invalid_tool_use_ids,
239-
tool_results=tool_results,
240-
cycle_trace=cycle_trace,
241-
parent_span=cycle_span,
242-
parallel_tool_executor=tool_execution_handler,
243-
)
244-
245-
# Update state for the next cycle
246-
kwargs = prepare_next_cycle(kwargs, event_loop_metrics)
247-
248-
# Create the tool result message
249-
tool_result_message: Message = {
250-
"role": "user",
251-
"content": [{"toolResult": result} for result in tool_results],
252-
}
253-
messages.append(tool_result_message)
254-
callback_handler(message=tool_result_message)
255-
256-
if cycle_span:
257-
tracer.end_event_loop_cycle_span(
258-
span=cycle_span, message=message, tool_result_message=tool_result_message
259-
)
260-
261-
# Check if we should stop the event loop
262-
if kwargs["request_state"].get("stop_event_loop"):
263-
event_loop_metrics.end_cycle(cycle_start_time, cycle_trace)
264-
return (
265-
stop_reason,
266-
message,
267-
event_loop_metrics,
268-
kwargs["request_state"],
269-
)
270-
271-
# Recursive call to continue the conversation
272-
return recurse_event_loop(
273-
model=model,
274-
system_prompt=system_prompt,
275-
messages=messages,
276-
tool_config=tool_config,
277-
callback_handler=callback_handler,
278-
tool_handler=tool_handler,
279-
**kwargs,
280-
)
231+
# Handle tool execution
232+
return _handle_tool_execution(
233+
stop_reason,
234+
message,
235+
model,
236+
system_prompt,
237+
messages,
238+
tool_config,
239+
tool_handler,
240+
callback_handler,
241+
tool_execution_handler,
242+
event_loop_metrics,
243+
cycle_trace,
244+
cycle_span,
245+
cycle_start_time,
246+
kwargs,
247+
)
281248

282249
# End the cycle and return results
283250
event_loop_metrics.end_cycle(cycle_start_time, cycle_trace)
@@ -359,7 +326,9 @@ def recurse_event_loop(
359326
)
360327

361328

362-
def prepare_next_cycle(kwargs: Dict[str, Any], event_loop_metrics: EventLoopMetrics) -> Dict[str, Any]:
329+
def prepare_next_cycle(
330+
kwargs: Dict[str, Any], event_loop_metrics: EventLoopMetrics
331+
) -> Dict[str, Any]:
363332
"""Prepare state for the next event loop cycle.
364333
365334
Updates the keyword arguments with the current event loop metrics and stores the current cycle ID as the parent
@@ -377,3 +346,81 @@ def prepare_next_cycle(kwargs: Dict[str, Any], event_loop_metrics: EventLoopMetr
377346
kwargs["event_loop_parent_cycle_id"] = kwargs["event_loop_cycle_id"]
378347

379348
return kwargs
349+
350+
351+
def _handle_tool_execution(
352+
stop_reason: StopReason,
353+
message: Message,
354+
model: Model,
355+
system_prompt: Optional[str],
356+
messages: Messages,
357+
tool_config: ToolConfig,
358+
tool_handler: ToolHandler,
359+
callback_handler: Callable[..., Any],
360+
tool_execution_handler: Optional[ParallelToolExecutorInterface],
361+
event_loop_metrics: EventLoopMetrics,
362+
cycle_trace: Trace,
363+
cycle_span: Any,
364+
cycle_start_time: float,
365+
kwargs: Dict[str, Any],
366+
) -> Tuple[StopReason, Message, EventLoopMetrics, Dict[str, Any]]:
367+
tool_uses: List[ToolUse] = []
368+
tool_results: List[ToolResult] = []
369+
invalid_tool_use_ids: List[str] = []
370+
371+
validate_and_prepare_tools(message, tool_uses, tool_results, invalid_tool_use_ids)
372+
373+
if not tool_uses:
374+
return stop_reason, message, event_loop_metrics, kwargs["request_state"]
375+
376+
tool_handler_process = partial(
377+
tool_handler.process,
378+
messages=messages,
379+
model=model,
380+
system_prompt=system_prompt,
381+
tool_config=tool_config,
382+
callback_handler=callback_handler,
383+
**kwargs,
384+
)
385+
386+
run_tools(
387+
handler=tool_handler_process,
388+
tool_uses=tool_uses,
389+
event_loop_metrics=event_loop_metrics,
390+
request_state=cast(Any, kwargs["request_state"]),
391+
invalid_tool_use_ids=invalid_tool_use_ids,
392+
tool_results=tool_results,
393+
cycle_trace=cycle_trace,
394+
parent_span=cycle_span,
395+
parallel_tool_executor=tool_execution_handler,
396+
)
397+
398+
kwargs = prepare_next_cycle(kwargs, event_loop_metrics)
399+
400+
tool_result_message: Message = {
401+
"role": "user",
402+
"content": [{"toolResult": result} for result in tool_results],
403+
}
404+
405+
messages.append(tool_result_message)
406+
callback_handler(message=tool_result_message)
407+
408+
if cycle_span:
409+
tracer = get_tracer()
410+
tracer.end_event_loop_cycle_span(
411+
span=cycle_span, message=message, tool_result_message=tool_result_message
412+
)
413+
414+
if kwargs["request_state"].get("stop_event_loop", False):
415+
event_loop_metrics.end_cycle(cycle_start_time, cycle_trace)
416+
return stop_reason, message, event_loop_metrics, kwargs["request_state"]
417+
418+
return recurse_event_loop(
419+
model=model,
420+
system_prompt=system_prompt,
421+
messages=messages,
422+
tool_config=tool_config,
423+
callback_handler=callback_handler,
424+
tool_handler=tool_handler,
425+
**kwargs,
426+
)

src/strands/event_loop/message_processor.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,20 @@ def clean_orphaned_empty_tool_uses(messages: Messages) -> bool:
6868
tool_results.add(tool_id)
6969

7070
# Filter for orphaned empty toolUses (no corresponding toolResult)
71-
orphaned_tool_uses = {tool_id: info for tool_id, info in empty_tool_uses.items() if tool_id not in tool_results}
71+
orphaned_tool_uses = {
72+
tool_id: info
73+
for tool_id, info in empty_tool_uses.items()
74+
if tool_id not in tool_results
75+
}
7276

7377
# Apply fixes in reverse order of occurrence (to avoid index shifting)
7478
if not orphaned_tool_uses:
7579
return False
7680

7781
# Sort by message index and content index in reverse order
78-
sorted_orphaned = sorted(orphaned_tool_uses.items(), key=lambda x: (x[1][0], x[1][1]), reverse=True)
82+
sorted_orphaned = sorted(
83+
orphaned_tool_uses.items(), key=lambda x: (x[1][0], x[1][1]), reverse=True
84+
)
7985

8086
# Apply fixes
8187
for tool_id, (msg_idx, content_idx, tool_name) in sorted_orphaned:
@@ -91,13 +97,22 @@ def clean_orphaned_empty_tool_uses(messages: Messages) -> bool:
9197
# Check if this is the sole content in the message
9298
if len(messages[msg_idx]["content"]) == 1:
9399
# Replace with a message indicating the attempted tool
94-
messages[msg_idx]["content"] = [{"text": f"[Attempted to use {tool_name}, but operation was canceled]"}]
95-
logger.debug("message_index=<%s> | replaced content with context message", msg_idx)
100+
messages[msg_idx]["content"] = [
101+
{
102+
"text": f"[Attempted to use {tool_name}, but operation was canceled]"
103+
}
104+
]
105+
logger.debug(
106+
"message_index=<%s> | replaced content with context message",
107+
msg_idx,
108+
)
96109
else:
97110
# Simply remove the orphaned toolUse entry
98111
messages[msg_idx]["content"].pop(content_idx)
99112
logger.debug(
100-
"message_index=<%s>, content_index=<%s> | removed content item from message", msg_idx, content_idx
113+
"message_index=<%s>, content_index=<%s> | removed content item from message",
114+
msg_idx,
115+
content_idx,
101116
)
102117
except Exception as e:
103118
logger.warning("failed to fix orphaned tool use | %s", e)
@@ -156,7 +171,9 @@ def truncate_tool_results(messages: Messages, msg_idx: int) -> bool:
156171
if isinstance(content, dict) and "toolResult" in content:
157172
# Update status to error with informative message
158173
message["content"][i]["toolResult"]["status"] = "error"
159-
message["content"][i]["toolResult"]["content"] = [{"text": "The tool result was too large!"}]
174+
message["content"][i]["toolResult"]["content"] = [
175+
{"text": "The tool result was too large!"}
176+
]
160177
changes_made = True
161178

162179
return changes_made

0 commit comments

Comments
 (0)