Skip to content

models - bedrock - daemon thread #593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

pgrayy
Copy link
Member

@pgrayy pgrayy commented Aug 1, 2025

Description

Running Bedrock model provider in a daemon thread so that it terminates immediately with the main thread if the main thread goes down. This is particularly useful for agent-builder which has special controls around keyboard interrupts.

Related Issues

#560

Type of Change

Bug fix

Testing

How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli

  • I ran hatch run prepare
  • hatch run test-integ
  • strands: Ran a local version of agent-builder that I am updating with non-blocking threads. Ran against this updated version of the Bedrock model provider and pressed ctrl-c. The CLI now exits immediately as before.
  • python test_threading: Wrote the following script for testing ctrl-c response directly
import asyncio
from strands import Agent

async def main():
    agent = Agent()
    await agent.invoke_async("What is 2+2? Please think through the answer")

asyncio.run(main())

Checklist

  • I have read the CONTRIBUTING document
  • I have added any necessary tests that prove my fix is effective or my feature works
  • I have updated the documentation accordingly
  • I have added an appropriate example to the documentation to outline the feature, or no new docs are needed
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.


thread = asyncio.to_thread(self._stream, callback, messages, tool_specs, system_prompt)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asyncio.to_thread uses a thread pool executor that is not directly configurable. Consequently, we have to create our own threads in order to control the daemon mode.


thread = asyncio.to_thread(self._stream, callback, messages, tool_specs, system_prompt)
task = asyncio.create_task(thread)
threading.Thread(target=target, daemon=True).start()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One down side here is that we create a new thread on every stream invocation rather than pull from a pool of existing threads. In this context however, the impact should be negligible as bedrock.converse_stream is a long running call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not as much concerned about the cost (time/space) of creating the new thread. But for highly concurrent use cases this concerns me because of the failure mode.

In the previous case it seems we would have waited until a thread became in the ThreadPoolExecutor allowing the request to succeed. In the current case we are introducing a new thread exhaustion failure mode.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really good call out and should be thought through carefully. I'll note that I avoided use of a ThreadPoolExecutor here because there is no easy way to configure daemon mode. We would have to derive our own version with some overrides.


while True:
event = await queue.get()
if event is None:
break
if isinstance(event, Exception):
raise event
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python threads do not propagate raised exceptions to the main thread automatically. Consequently, we need to explicitly communicate exceptions which we can do so through our existing queue.

@pgrayy pgrayy marked this pull request as ready for review August 1, 2025 18:11
loop.call_soon_threadsafe(queue.put_nowait, event)
if event is None:
return
def target() -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should this be private?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff cutoff above but this is a nested function of stream and so is private by that nature.


thread = asyncio.to_thread(self._stream, callback, messages, tool_specs, system_prompt)
task = asyncio.create_task(thread)
threading.Thread(target=target, daemon=True).start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not as much concerned about the cost (time/space) of creating the new thread. But for highly concurrent use cases this concerns me because of the failure mode.

In the previous case it seems we would have waited until a thread became in the ThreadPoolExecutor allowing the request to succeed. In the current case we are introducing a new thread exhaustion failure mode.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants