fix(sdks/python): send aio_put_stream events with grpc.aio#4232
fix(sdks/python): send aio_put_stream events with grpc.aio#4232juanluisdb wants to merge 7 commits into
Conversation
|
@juanluisdb is attempting to deploy a commit to the Hatchet Team on Vercel. A member of the Team first needs to authorize it. |
| """ | ||
| await asyncio.to_thread(self.put_stream, data) | ||
| try: | ||
| ix = self._increment_stream_index() |
There was a problem hiding this comment.
so there's one very important implementation detail here that I need to think about it a bit, which is that this is not concurrency-safe. We might need to wrap this in a lock to make sure that works properly, I'll think about this a bit 🤔
There was a problem hiding this comment.
ok, i see, the same counter is used by both put_stream and aio_put_stream right?
There was a problem hiding this comment.
Sorry for just circling back! But yes, it's the same counter
There was a problem hiding this comment.
Anyways, I think this is probably okay - I struggle to see a case where you'd put stream events concurrently are care about ordering
| async def put_stream_event( | ||
| request: PutStreamEventRequest, | ||
| metadata: tuple[tuple[str, str]], | ||
| ) -> PutStreamEventResponse: | ||
| return cast( | ||
| PutStreamEventResponse, | ||
| await aio_events_service_client.PutStreamEvent( # type: ignore[misc] | ||
| request, metadata=metadata | ||
| ), | ||
| ) |
There was a problem hiding this comment.
maybe we can factor this out so we don't re-create it on every call? small optimization
There was a problem hiding this comment.
makes sense. Updated this so aio_stream is not recreated per call, while keeping the aio stub lazy and the retry behavior the same.
44abe31 to
6184af5
Compare
|
Hey @juanluisdb - I sort of commandeered your PR to expand the fixes to a couple more problematic places 😅 I'm going to keep working on it a bit, and hoping to get it merged tomorrow or Weds after some eyes from someone else on the team too. Thanks for the fix here! |
Description
Fixes #4230.
This changes
ctx.aio_put_streamso it sendsPutStreamEventwith a lazygrpc.aioevents stub.Before this change,
ctx.aio_put_streamcalled the synchronousput_streampath throughasyncio.to_thread. Under high streaming load, those synchronous gRPC calls can keep the shared thread pool busy. The runner also uses that pool to read assigned actions from its queue, so task starts can be delayed.The stream index is still assigned before the RPC is awaited. This keeps the existing stream ordering behavior.
Type of change
What's Changed
aio_put_streamevents withgrpc.aioinstead ofasyncio.to_thread.Checklist
Changes have been:
Testing
I ran these checks locally with the project virtualenv:
🤖 AI Disclosure
I acknowledge that an LLM was used in the creation of this Pull Request, in accordance with Hatchet's AI_POLICY.md.
Details: I used Claude Code and Codex to work on this.