Skip to content

sui_v1.39.0_1732293455_ci

@amnn amnn tagged this 22 Nov 12:18
## Description

Create a `try_for_each_spawned` which works similarly to
`try_for_each_concurrent` but spawns a new tokio task for each unit of
work that it schedules based on values being pulled from the stream (but
no more than the `limit` set on the max concurrency.

This does introduce some slight overhead if we run it with a limit of
`1` (i.e. no concurrency) but otherwise will give the tokio runtime an
opportunity to schedule each unit of work on a different thread
(assuming the runtime is multi-threaded).

The interface is almost a drop-in replacement for
`try_for_each_concurrent` but with the following slight changes:

- The input stream does not need to be a `Result` itself.
- The future and returned error type must have `'static` lifetimes
because they are being passed to the tokio runtime and we can't easily
determine that they will not outlive the caller.

This addresses one of the main remaining differences between the
existing indexer and `sui-indexer-alt` (`sui-data-ingestion-core`'s
worker pool works roughly like this new abstraction, but without the
support for streams etc).

## Test plan

New unit tests:

```
sui$ cargo nextest run -p sui-indexer-alt -- task::tests
```

Running the indexer locally also makes sure the system is still operable
with the change.

Finally, I created some benchmarks in a repository to test that
`try_for_each_concurrent` does not take advantage of parallelism and
`try_for_each_spawned` does:

https://gist.github.com/amnn/6c6f198693d46d1f6d30bd7ef7be001d

Benchmarking results show that on a job counting the primes up to i for
2 <= i < 50,000, the concurrent and sequential implementations complete
in 2.7s.

`try_for_each_spawned` without parallelism does the same work in 3.2s
(slightly slower because of the overhead creating new tasks), and
`try_for_each_spawned` with parallelism of `16` completes in about 400ms
(an 8x improvement).

The most expensive individual task costs about 3ms to run on its own
(counting the primes up to 50,000), which is roughly in line with the
cost of processing.

The benchmarks also include an implementation that does the same thing
as `try_for_each_spawned` but built out of existing stream extension
libraries (`StreamExt::map`, and `StreamExt::buffered`) which have
similar performance characteristics but don't have the same behaviour
w.r.t. propagating panics and cancellations.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
Assets 2
Loading