-
Notifications
You must be signed in to change notification settings - Fork 263
models - bedrock - daemon thread #593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
||
thread = asyncio.to_thread(self._stream, callback, messages, tool_specs, system_prompt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asyncio.to_thread
uses a thread pool executor that is not directly configurable. Consequently, we have to create our own threads in order to control the daemon mode.
|
||
thread = asyncio.to_thread(self._stream, callback, messages, tool_specs, system_prompt) | ||
task = asyncio.create_task(thread) | ||
threading.Thread(target=target, daemon=True).start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One down side here is that we create a new thread on every stream invocation rather than pull from a pool of existing threads. In this context however, the impact should be negligible as bedrock.converse_stream
is a long running call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not as much concerned about the cost (time/space) of creating the new thread. But for highly concurrent use cases this concerns me because of the failure mode.
In the previous case it seems we would have waited until a thread became in the ThreadPoolExecutor allowing the request to succeed. In the current case we are introducing a new thread exhaustion failure mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a really good call out and should be thought through carefully. I'll note that I avoided use of a ThreadPoolExecutor here because there is no easy way to configure daemon mode. We would have to derive our own version with some overrides.
|
||
while True: | ||
event = await queue.get() | ||
if event is None: | ||
break | ||
if isinstance(event, Exception): | ||
raise event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Python threads do not propagate raised exceptions to the main thread automatically. Consequently, we need to explicitly communicate exceptions which we can do so through our existing queue.
loop.call_soon_threadsafe(queue.put_nowait, event) | ||
if event is None: | ||
return | ||
def target() -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should this be private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The diff cutoff above but this is a nested function of stream
and so is private by that nature.
|
||
thread = asyncio.to_thread(self._stream, callback, messages, tool_specs, system_prompt) | ||
task = asyncio.create_task(thread) | ||
threading.Thread(target=target, daemon=True).start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not as much concerned about the cost (time/space) of creating the new thread. But for highly concurrent use cases this concerns me because of the failure mode.
In the previous case it seems we would have waited until a thread became in the ThreadPoolExecutor allowing the request to succeed. In the current case we are introducing a new thread exhaustion failure mode.
Description
Running Bedrock model provider in a daemon thread so that it terminates immediately with the main thread if the main thread goes down. This is particularly useful for
agent-builder
which has special controls around keyboard interrupts.Related Issues
#560
Type of Change
Bug fix
Testing
How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli
hatch run prepare
hatch run test-integ
strands
: Ran a local version ofagent-builder
that I am updating with non-blocking threads. Ran against this updated version of the Bedrock model provider and pressedctrl-c
. The CLI now exits immediately as before.python test_threading
: Wrote the following script for testingctrl-c
response directlyChecklist
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.