-
Notifications
You must be signed in to change notification settings - Fork 50
[SVC-421] Implement primitive for spawning many calls #3052
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
737bebd
to
bd9a11f
Compare
bd9a11f
to
1f107dd
Compare
_SynchronizedQueue is used instead of asyncio.Queue so that the main thread can put | ||
items in the queue safely. | ||
|
||
Worker pattern is 10% faster than creating a new asyncio task for every input. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly makes sense to me, but I think I'd prefer not using AsyncOrSyncIterable here since the return value isn't expected to be iterated
@@ -379,6 +379,29 @@ def log_stats(): | |||
await log_debug_stats_task | |||
|
|||
|
|||
async def _feed_raw_input_queue_from_iterators( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
Worker pattern is 10% faster than creating a new asyncio task for every input. | ||
""" | ||
|
||
MAX_CONCURRENT_SPAWNS = 256 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 256 specifically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From benchmarking, 128
doesn't saturate our server rate limits for clients, and there is limited improvement above 300. I am somewhat nervous about request concurrency (from within a container, that is currently limited to 500
).
"You can't run Function.spawn_map() from an async function. Use Function.map.aio() instead." | ||
), | ||
) | ||
)[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should add a comment that this list is expected to have a single None-element once all items have been spawned, to make this understandable
from the input iterators and creates async function calls for each. | ||
|
||
We create a common generator that is shared by `_spawn_map_async` and `_spawn_map_sync` because | ||
it is easier to wrap async generators than coroutines. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd have a slight preference for making this a simple coroutine and using Runner.run
instead of internally using AsyncOrSyncIterable
here.
iirc, AsyncOrSyncIterable was mostly made to externally provide a "synchronicity-like" interface that is both async for
and for
:able (for backwards compatibility reasons where we didn't require .aio
to do async iterations...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! I'm going to get this going. This is exactly the feedback I needed ❤️
Co-authored-by: Elias Freider <freider@users.noreply.github.com>
Describe your changes
Added a method for spawning many calls in parallel. The underlying implementation creates
256
worker threads which consume a queue of inputs.Approximate performance at time of testing: ~1400 calls/s, above default user rate limits.
The function signature of the shared method
_spawn_map_generator
is an async generator.Changelog