-
Notifications
You must be signed in to change notification settings - Fork 11.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
indexer-alt: try_for_each_spawned #20379
Merged
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## Description Create a `try_for_each_spawned` which works similarly to `try_for_each_concurrent` but spawns a new tokio task for each unit of work that it schedules based on values being pulled from the stream (but no more than the `limit` set on the max concurrency. This does introduce some slight overhead if we run it with a limit of `1` (i.e. no concurrency) but otherwise will give the tokio runtime an opportunity to schedule each unit of work on a different thread (assuming the runtime is multi-threaded). The interface is almost a drop-in replacement for `try_for_each_concurrent` but with the following slight changes: - The input stream does not need to be a `Result` itself. - The future and returned error type must have `'static` lifetimes because they are being passed to the tokio runtime and we can't easily determine that they will not outlive the caller. This addresses one of the main remaining differences between the existing indexer and `sui-indexer-alt` (`sui-data-ingestion-core`'s worker pool works roughly like this new abstraction, but without the support for streams etc). ## Test plan New unit tests: ``` sui$ cargo nextest run -p sui-indexer-alt -- task::tests ``` Running the indexer locally also makes sure the system is still operable with the change. Finally, I created some benchmarks in a repository to test that `try_for_each_concurrent` does not take advantage of parallelism and `try_for_each_spawned` does: https://gist.github.com/amnn/6c6f198693d46d1f6d30bd7ef7be001d Benchmarking results show that on a job counting the primes up to i for 2 <= i < 50,000, the concurrent and sequential implementations complete in 2.7s. `try_for_each_spawned` without parallelism does the same work in 3.2s (slightly slower because of the overhead creating new tasks), and `try_for_each_spawned` with parallelism of `16` completes in about 400ms (an 8x improvement). The most expensive individual task costs about 3ms to run on its own (counting the primes up to 50,000), which is roughly in line with the cost of processing. The benchmarks also include an implementation that does the same thing as `try_for_each_spawned` but built out of existing stream extension libraries (`StreamExt::map`, and `StreamExt::buffered`) which have similar performance characteristics but don't have the same behaviour w.r.t. propagating panics and cancellations.
The latest updates on your projects. Learn more about Vercel for Git ↗︎
3 Skipped Deployments
|
amnn
temporarily deployed
to
sui-typescript-aws-kms-test-env
November 21, 2024 23:11 — with
GitHub Actions
Inactive
bmwill
reviewed
Nov 21, 2024
amnn
temporarily deployed
to
sui-typescript-aws-kms-test-env
November 21, 2024 23:45 — with
GitHub Actions
Inactive
amnn
temporarily deployed
to
sui-typescript-aws-kms-test-env
November 22, 2024 00:15 — with
GitHub Actions
Inactive
bmwill
approved these changes
Nov 22, 2024
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
Create a
try_for_each_spawned
which works similarly totry_for_each_concurrent
but spawns a new tokio task for each unit of work that it schedules based on values being pulled from the stream (but no more than thelimit
set on the max concurrency.This does introduce some slight overhead if we run it with a limit of
1
(i.e. no concurrency) but otherwise will give the tokio runtime an opportunity to schedule each unit of work on a different thread (assuming the runtime is multi-threaded).The interface is almost a drop-in replacement for
try_for_each_concurrent
but with the following slight changes:Result
itself.'static
lifetimes because they are being passed to the tokio runtime and we can't easily determine that they will not outlive the caller.This addresses one of the main remaining differences between the existing indexer and
sui-indexer-alt
(sui-data-ingestion-core
's worker pool works roughly like this new abstraction, but without the support for streams etc).Test plan
New unit tests:
Running the indexer locally also makes sure the system is still operable with the change.
Finally, I created some benchmarks in a repository to test that
try_for_each_concurrent
does not take advantage of parallelism andtry_for_each_spawned
does:https://gist.github.com/amnn/6c6f198693d46d1f6d30bd7ef7be001d
Benchmarking results show that on a job counting the primes up to i for 2 <= i < 50,000, the concurrent and sequential implementations complete in 2.7s.
try_for_each_spawned
without parallelism does the same work in 3.2s (slightly slower because of the overhead creating new tasks), andtry_for_each_spawned
with parallelism of16
completes in about 400ms (an 8x improvement).The most expensive individual task costs about 3ms to run on its own (counting the primes up to 50,000), which is roughly in line with the cost of processing.
The benchmarks also include an implementation that does the same thing as
try_for_each_spawned
but built out of existing stream extension libraries (StreamExt::map
, andStreamExt::buffered
) which have similar performance characteristics but don't have the same behaviour w.r.t. propagating panics and cancellations.Release notes
Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required.
For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates.