-
Notifications
You must be signed in to change notification settings - Fork 2.3k
perf: process chunks in par for get logs in block range eth_getLogs
#16675
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
crates/rpc/rpc/src/eth/filter.rs
Outdated
|
|
||
| // adjust based on system capabilities | ||
| //TODO: configurable, 4 is a good default for both I/O and CPU balance | ||
| let concurrency = std::cmp::min(ranges.len(), 4); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The get_logs_in_block_range_inner fn involves both I/O opts (fetching headers, receipts) and CPU opts (filtering logs). Therefore I left a value of 4 provides a balance that works well for mixed ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a constant here for the concurrency value? and move to its definition the comment about why 4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is possible, do you want me to do it? Or you will take care of it once the PR #16441 is merged?
crates/rpc/rpc/src/eth/filter.rs
Outdated
| for (from, to) in | ||
| BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range) | ||
| // for small ranges (less than 2 chunks) | ||
| if ranges.len() <= 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still keep the original sequential impl for small ranges because if there is only one, its better to process directly.
mattsse
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, all of this makes sense, but we're also working on similar specialization #16441
this pr would introduce conflicts, so I'm parking this for a bit
fgimenez
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great, smol nit
i think the changes in this PR can complement very well the approach proposed on #16441:
- for recent blocks, we can keep using
CachedMode - for old blocks, small range, use range queries
- for old blocks, large range, we can combine both range queries and the parallelization introduced in this PR
this way we get the best of both PRs, fewer, more efficient operations executed in parallel
we would need to merge #16441 first though, i can take care of fixing conflicts here when done
crates/rpc/rpc/src/eth/filter.rs
Outdated
|
|
||
| // adjust based on system capabilities | ||
| //TODO: configurable, 4 is a good default for both I/O and CPU balance | ||
| let concurrency = std::cmp::min(ranges.len(), 4); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a constant here for the concurrency value? and move to its definition the comment about why 4.
@fgimenez Thank you for the awesome work! Having it merged is more than what we could ask for. I'll happily rebase and complete this PR afterwards 🙏. |
@thaodt sure, #16441 introduced two modes for getting logs, |
nice. I will rework this PR |
02d3934 to
15368ae
Compare
|
Hi @fgimenez i rebased and push updates, can you please take a look if this is right as you meant? 🙏 |
fgimenez
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks very good! just one question about avoiding to check the cache
crates/rpc/rpc/src/eth/filter.rs
Outdated
| let mut chunk_results = Vec::new(); | ||
|
|
||
| for header in chunk_headers { | ||
| // First check if already cached to avoid unnecessary provider calls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i believe at this point we don't need to check if the results are cached, RangeMode will be activated for requests of logs older than what we have in the cache, wdyt? would it make sense to directly fetch from the provider
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hey I made changes as per your review, can you please take another look? 🙏
fgimenez
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, thx! pending @mattsse
crates/rpc/rpc/src/eth/filter.rs
Outdated
| let task = async move { | ||
| let mut chunk_results = Vec::new(); | ||
|
|
||
| for header in chunk_headers { | ||
| // Fetch directly from provider - RangeMode is used for older blocks unlikely to | ||
| // be cached | ||
| let receipts = | ||
| match filter_inner.provider().receipts_by_block(header.hash().into())? { | ||
| Some(receipts) => Arc::new(receipts), | ||
| None => continue, // No receipts found | ||
| }; | ||
|
|
||
| if !receipts.is_empty() { | ||
| chunk_results.push(ReceiptBlockResult { | ||
| receipts, | ||
| recovered_block: None, | ||
| header, | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this most likely doesn't improve anything because this isn't actually async so what ends up happening is that these chunks are just processed sequentially because receipts_by_block is blocking
| match filter_inner.provider().receipts_by_block(header.hash().into())? { | ||
| Some(receipts) => Arc::new(receipts), | ||
| None => continue, // No receipts found | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we'd need to spawn_blocking this here in particular or the entire async scope
crates/rpc/rpc/src/eth/filter.rs
Outdated
| // Process results and maintain order | ||
| let mut ordered_results: Vec<Vec<ReceiptBlockResult<Eth::Provider>>> = | ||
| (0..results.len()).map(|_| Vec::new()).collect(); | ||
| for result in results { | ||
| match result { | ||
| Ok((chunk_idx, chunk_results)) => { | ||
| ordered_results[chunk_idx] = chunk_results; | ||
| } | ||
| Err(e) => return Err(e), | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks a bit weird, join_all already returns in order?
mattsse
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
last nit
crates/rpc/rpc/src/eth/filter.rs
Outdated
| let mut header_chunks = Vec::new(); | ||
| for chunk_start in (0..range_headers.len()).step_by(chunk_size) { | ||
| let chunk_end = std::cmp::min(chunk_start + chunk_size, range_headers.len()); | ||
| header_chunks.push(range_headers[chunk_start..chunk_end].to_vec()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a fan of using slice indexing here
this is basically just
use itertools::Itertools;
for chunk_headers in range_headers
.into_iter()
.chunks(chunk_size) {}
|
What do the benchmarks say about the improvements from this PR for which workloads? |
Here is the benchmark results for 10k blocks (bench is run on our RISE devnet): Before----- High Volume Test Statistics -----
Number of tests: 100
Min Latency: 1.052575305s
Mean Latency: 1.423230211s
Median Latency: 1.422722982s
Max Latency: 1.845138906s
Standard Deviation: 185.028061msAfter (@fgimenez 's PR and this complement PR)----- High Volume Test Statistics -----
Number of tests: 100
Min Latency: 273.061434ms
Mean Latency: 407.566407ms
Median Latency: 436.268471ms
Max Latency: 491.721147ms
Standard Deviation: 70.529748ms |
Description
the current
eth_getLogsbehavior supports two types of block filtering:AtBlockHash: Gets logs for a specific block by hashRange: Gets logs for a range of blocksFor range queries, it processes blocks in chunks of max_headers_range (1000 blocks) and uses bloom filters to quickly skip blocks that can't contain matching logs, then fetches receipts and logs only for blocks that might contain matches.
The current implementation of
get_logs_in_block_range_inner()processes block ranges sequentially, which can lead to performance bottlenecks when querying logs across large block ranges. This is particularly noticeable in scenarios like:Solution
This PR introduces parallel processing of block ranges while maintaining the correct ordering of results and limit concurrency to a reasonable value (4) to avoid overwhelming system resources.
process_block_range()fn to encapsulate the logic for processing a single block range.Please help me review it. Thanks 🙏