Skip to content

[SVC-421] Implement primitive for spawning many calls #3052

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

gongy
Copy link
Contributor

@gongy gongy commented Apr 22, 2025

Describe your changes

  • Provide Linear issue reference (e.g. MOD-1234) if available.

Added a method for spawning many calls in parallel. The underlying implementation creates 256 worker threads which consume a queue of inputs.

Approximate performance at time of testing: ~1400 calls/s, above default user rate limits.

The function signature of the shared method _spawn_map_generator is an async generator.

  • Async case: polled by an async coroutine in the main thread
  • Sync case: converted into a sync iterator and polled in the main thread

Changelog

  • Added a method for spawning many calls in parallel.

@gongy gongy force-pushed the richard/spawn-map branch from 737bebd to bd9a11f Compare April 22, 2025 22:06
@gongy gongy force-pushed the richard/spawn-map branch from bd9a11f to 1f107dd Compare April 22, 2025 22:08
@gongy gongy requested review from freider and mwaskom April 22, 2025 22:08
@gongy gongy changed the title [CLI-375] Implement primitive for spawning many calls [SVC-421] Implement primitive for spawning many calls Apr 22, 2025
_SynchronizedQueue is used instead of asyncio.Queue so that the main thread can put
items in the queue safely.

Worker pattern is 10% faster than creating a new asyncio task for every input.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting!

Copy link
Contributor

@freider freider left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly makes sense to me, but I think I'd prefer not using AsyncOrSyncIterable here since the return value isn't expected to be iterated

@@ -379,6 +379,29 @@ def log_stats():
await log_debug_stats_task


async def _feed_raw_input_queue_from_iterators(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

Worker pattern is 10% faster than creating a new asyncio task for every input.
"""

MAX_CONCURRENT_SPAWNS = 256
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 256 specifically?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From benchmarking, 128 doesn't saturate our server rate limits for clients, and there is limited improvement above 300. I am somewhat nervous about request concurrency (from within a container, that is currently limited to 500).

"You can't run Function.spawn_map() from an async function. Use Function.map.aio() instead."
),
)
)[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should add a comment that this list is expected to have a single None-element once all items have been spawned, to make this understandable

from the input iterators and creates async function calls for each.

We create a common generator that is shared by `_spawn_map_async` and `_spawn_map_sync` because
it is easier to wrap async generators than coroutines.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd have a slight preference for making this a simple coroutine and using Runner.run instead of internally using AsyncOrSyncIterable here.

iirc, AsyncOrSyncIterable was mostly made to externally provide a "synchronicity-like" interface that is both async for and for:able (for backwards compatibility reasons where we didn't require .aio to do async iterations...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I'm going to get this going. This is exactly the feedback I needed ❤️

Co-authored-by: Elias Freider <freider@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants