perf(l1): optimize snap sync insertion and healing write paths#6159
perf(l1): optimize snap sync insertion and healing write paths#6159pablodeymo wants to merge 9 commits intomainfrom
Conversation
- Fix BackendTrieDB::put_batch to call tx.put() directly instead of tx.put_batch(vec![...]) per key, eliminating ~20k Vec allocations per flush - Override put_batch_no_alloc on BackendTrieDB to encode and write nodes in a single pass without intermediate Vec collection - Merge two-pass iteration in insert_accounts into a single pass, collecting code hashes during the trie-building .inspect() callback - Add write_storage_trie_nodes_batch_sync to Store to eliminate the spawn_blocking -> block_on -> spawn_blocking double thread hop in storage healing - Pipeline healing writes by allowing 2 tasks in flight (state and storage) to overlap encoding with DB commit - Use BTreeMap entry API in both state and storage healing to prevent empty placeholders from overwriting real node data - Replace hardcoded 12-thread pool with std::thread::available_parallelism - Add tracing::debug timing instrumentation to key write paths
🤖 Kimi Code ReviewReview SummaryThis PR contains several performance optimizations and logging improvements. While most changes are reasonable, there are a few issues that need attention. Critical Issues1. Race Condition in State Healing (state.rs:286-287)The change from single-task to dual-task execution introduces a potential race condition. The comment about avoiding out-of-order deletes is being ignored. When multiple tasks run concurrently, there's no guarantee that trie node deletions and insertions will be applied in the correct order, which could corrupt the state trie. Recommendation: Revert to single-task execution or implement proper ordering guarantees. 2. Storage Healing Race Condition (storage.rs:215-216)Same issue as above - allowing 2 concurrent tasks could lead to storage trie corruption. Performance Issues3. Inefficient Encoding in put_batch_no_alloc (trie.rs:134-144)The Recommendation: // Use a pre-allocated buffer or implement a proper zero-copy approach
// Consider batching multiple nodes into a single transactionCode Quality Issues4. Redundant Work in Storage Healing (storage.rs:220-230)The code is rebuilding the BTreeMap for each account, but the input Recommendation: // Skip the BTreeMap creation if nodes are already sorted
let account_nodes: Vec<_> = nodes.into_iter().collect();5. Magic Numbers
6. Error Handling Inconsistency
Minor Issues7. Tracing LevelConsider using 8. Code DuplicationThe encoding logic in state.rs and storage.rs is nearly identical and could be extracted into a shared function. Positive Changes✅ Thread count based on available parallelism - Good use of system resources Automated review by Kimi (Moonshot AI) · custom prompt |
🤖 Codex Code ReviewFindings
Notes / Suggestions
If you want, I can scan for overlaps in healing batch key ranges to confirm whether the concurrency change is safe. Automated review by OpenAI Codex · custom prompt |
🤖 Claude Code ReviewPerfect! Now I have enough context. Let me compile my review findings. PR Review: Optimize snap sync insertion and healing write pathsSummaryThis PR addresses performance bottlenecks in snap sync by optimizing hot paths in trie node writes and healing operations. The optimizations are well-researched, clearly documented, and demonstrate good understanding of the codebase. Overall, this is high-quality work with thoughtful performance improvements. Critical Issues1. Storage healing: Potential data race with concurrent writes
|
Greptile OverviewGreptile SummaryThis PR optimizes snap sync’s hottest write paths by removing per-key allocations in trie batch writes, adding a streaming Key integration points: the trie builder ( Issues to fix before merge: (1) Confidence Score: 2/5
|
| Filename | Overview |
|---|---|
| crates/common/trie/trie_sorted.rs | Adds debug timing around flush_nodes_to_write and uses available_parallelism() to size the thread pool dynamically; no functional changes observed. |
| crates/networking/p2p/sync/healing/state.rs | Pipelines DB writes (2 in-flight) and avoids overwriting encoded nodes via BTreeMap::entry; however, current JoinSet draining logic does not surface task panics, risking silent DB write failures. |
| crates/networking/p2p/sync/healing/storage.rs | Pipelines storage healing DB writes and switches to sync Store::write_storage_trie_nodes_batch_sync in blocking tasks; uses BTreeMap::entry to avoid overwrites; changes look correct. |
| crates/networking/p2p/sync/snap_sync.rs | Merges the temp-DB scan into the trie build pass and adds timing logs, but now buffers all code hashes in-memory until after trie build, which can cause large memory growth/OOM on big snapshots. |
| crates/storage/store.rs | Extracts shared storage trie write logic and adds a sync batch writer to avoid async/blocking thread hops; behavior remains equivalent (commit at end). |
| crates/storage/trie.rs | Optimizes put_batch to avoid per-key Vec allocation and adds a streaming put_batch_no_alloc override; also adds debug timing. Main behavior is preserved. |
Sequence Diagram
sequenceDiagram
participant Snap as snap_sync::insert_accounts
participant TmpDB as RocksDB(temp accounts)
participant Trie as trie_sorted::trie_from_sorted_accounts_wrap
participant TrieDB as BackendTrieDB(TrieDB)
participant HealS as healing::state::heal_state_trie
participant HealSt as healing::storage::heal_storage_trie
participant Store as storage::Store
Snap->>TmpDB: ingest_external_file(SSTs)
Snap->>TmpDB: full_iterator(Start)
Snap->>Trie: build trie from sorted iterator
Trie->>TrieDB: flush_nodes_to_write()
TrieDB->>TrieDB: put_batch_no_alloc(nodes)
TrieDB->>TrieDB: begin_write
loop for each node
TrieDB->>TrieDB: encode_to_vec
TrieDB->>TrieDB: tx.put(cf, key, value)
end
TrieDB->>TrieDB: tx.commit
Snap->>Snap: collect code_hashes during inspect()
Snap->>Store: code_hash_collector.add/flush (after trie build)
HealS->>HealS: collect nodes_to_write
HealS->>HealS: spawn_blocking (encode + db write)
HealS->>Store: open_direct_state_trie
Store->>TrieDB: db.put_batch(encoded)
HealSt->>HealSt: drain nodes_to_write per account
HealSt->>HealSt: spawn_blocking (encode + db write)
HealSt->>Store: write_storage_trie_nodes_batch_sync(updates)
Store->>Store: begin_write
Store->>Store: txn.put/delete (per node)
Store->>Store: txn.commit
Lines of code reportTotal lines added: Detailed view |
| @@ -976,6 +969,9 @@ async fn insert_accounts( | |||
| .account_tries_inserted | |||
| .fetch_add(1, Ordering::Relaxed); | |||
| let account_state = AccountState::decode(v).expect("We should have accounts here"); | |||
| if account_state.code_hash != *EMPTY_KECCACK_HASH { | |||
| code_hashes.push(account_state.code_hash); | |||
| } | |||
There was a problem hiding this comment.
Unbounded code hash buffer
insert_accounts now collects all non-empty code_hash values into code_hashes: Vec<H256> and only flushes after trie_from_sorted_accounts_wrap finishes. On large snapshots this can grow to millions of hashes and OOM before the trie build completes. This used to be bounded by flush_if_needed() during the scan. Consider flushing incrementally during the iterator pass (e.g., push+flush when the collector is full) or using a bounded buffer.
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/sync/snap_sync.rs
Line: 960:974
Comment:
**Unbounded code hash buffer**
`insert_accounts` now collects all non-empty `code_hash` values into `code_hashes: Vec<H256>` and only flushes after `trie_from_sorted_accounts_wrap` finishes. On large snapshots this can grow to millions of hashes and OOM before the trie build completes. This used to be bounded by `flush_if_needed()` during the scan. Consider flushing incrementally during the iterator pass (e.g., push+flush when the collector is full) or using a bounded buffer.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Fixed in 08ba6f92c. Removed the intermediate Vec<H256> entirely — code hashes are now added directly into CodeHashCollector's internal HashSet during the .inspect() callback. The HashSet deduplicates, so memory is bounded by unique contract accounts (~5M on mainnet at 32 bytes each = ~160MB) instead of growing unboundedly with duplicates.
flush_if_needed() is called once after the trie build completes. We can't call it during the iterator because trie_from_sorted_accounts_wrap is synchronous and flush_if_needed() is async.
| let store = store.clone(); | ||
| // NOTE: we keep only a single task in the background to avoid out of order deletes | ||
| if !db_joinset.is_empty() { | ||
| // Allow up to 2 tasks in flight to overlap encoding with DB commit | ||
| while db_joinset.len() >= 2 { | ||
| db_joinset | ||
| .join_next() | ||
| .await | ||
| .expect("we just checked joinset is not empty")?; | ||
| .expect("joinset is not empty")?; | ||
| } |
There was a problem hiding this comment.
JoinSet errors dropped
This loop uses join_next().await.expect(...)?; but the spawned closure returns (), so task panics/errors are effectively ignored (and ? here can only be applying to the Option, not the task result). That means DB write panics can be silently swallowed and healing can proceed as if writes succeeded. You likely want to match join_next().await and handle Err(JoinError) explicitly (and propagate/abort on panic).
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/sync/healing/state.rs
Line: 285:292
Comment:
**JoinSet errors dropped**
This loop uses `join_next().await.expect(...)?;` but the spawned closure returns `()`, so task panics/errors are effectively ignored (and `?` here can only be applying to the `Option`, not the task result). That means DB write panics can be silently swallowed and healing can proceed as if writes succeeded. You likely want to `match join_next().await` and handle `Err(JoinError)` explicitly (and propagate/abort on panic).
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
I think the suggestion from the bot is wrong, but I would check the types of the task to make it return errors.
There was a problem hiding this comment.
You're correct — my comment was wrong.
The spawned closure does return Result<(), SyncError> (you can see Ok(()) at the end of the closure). The JoinSet type is JoinSet<Result<(), SyncError>>, so join_next().await returns:
Some(Ok(Ok(())))— task succeeded, closure returned OkSome(Ok(Err(e)))— task succeeded, closure returned ErrSome(Err(JoinError))— task panicked/cancelled
The PR properly handles all three cases via the wait_for_pending_task() helper added in healing/mod.rs (lines 22-28 of the diff), which matches on the result and propagates errors correctly. This is actually a fix — before the PR, the closures used .expect() which would panic instead of returning errors.
The error handling is correct as-is. My apologies for the noise!
There was a problem hiding this comment.
Pull request overview
Optimizes snap-sync and healing write hot paths to reduce allocations, redundant iteration, and thread scheduling overhead while rebuilding / repairing the Ethereum state trie backed by RocksDB.
Changes:
- Eliminates per-key
Vecallocations in trie batch writes and adds a non-allocatingput_batch_no_allocfast path forBackendTrieDB. - Collapses
insert_accountsfrom a two-pass temp-DB scan into a single pass during trie building, with added timing instrumentation. - Improves healing write throughput by allowing up to 2 in-flight DB write tasks, deduping parent-delete markers via
BTreeMap::entry, and removing asyncblock_onbridging for storage healing writes.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/storage/trie.rs | Removes per-key batch allocations and adds put_batch_no_alloc override; adds timing logs. |
| crates/storage/store.rs | Adds sync storage-trie batch write path and shared inner implementation to avoid extra thread hops. |
| crates/common/trie/trie_sorted.rs | Adds flush timing and uses available_parallelism() for thread pool sizing. |
| crates/networking/p2p/sync/snap_sync.rs | Single-pass account insertion (collect code hashes during trie build) + timing logs. |
| crates/networking/p2p/sync/healing/state.rs | Allows 2 in-flight write tasks, fixes parent-delete overwrites via entry, adds timing logs. |
| crates/networking/p2p/sync/healing/storage.rs | Allows 2 in-flight write tasks, fixes parent-delete overwrites via entry, switches to sync DB write, adds timing logs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let start = std::time::Instant::now(); | ||
| let mut code_hashes: Vec<H256> = Vec::new(); | ||
| let iter = db.full_iterator(rocksdb::IteratorMode::Start); |
There was a problem hiding this comment.
code_hashes collects every non-empty account code hash into a Vec during the trie-build pass. On large networks this can grow very large (and includes duplicates), increasing peak RSS and risking OOM compared to the previous streaming flush.
Consider avoiding the intermediate Vec by either (a) adding hashes directly into CodeHashCollector during .inspect() (no await), and flushing/spawning file dumps in a bounded way after the trie build, or (b) sending hashes to a background writer via a channel so the buffer can be flushed incrementally while the trie is being built.
| if !db_joinset.is_empty() { | ||
| // Allow up to 2 tasks in flight to overlap encoding with DB commit | ||
| while db_joinset.len() >= 2 { | ||
| db_joinset.join_next().await; |
There was a problem hiding this comment.
The join_next().await result is ignored here. If a background DB write task panics or returns an error, this will be silently dropped and healing may continue/finish without persisting data.
Handle the returned Option<Result<_, JoinError>> (and propagate/log failures) when draining the JoinSet, similar to how state healing uses expect(...) + ? on join_next().
| db_joinset.join_next().await; | |
| match db_joinset.join_next().await { | |
| Some(Ok(_)) => { | |
| // successfully completed background DB write task | |
| } | |
| Some(Err(e)) => { | |
| error!(?e, "background DB write task failed during storage healing"); | |
| } | |
| None => { | |
| // no more tasks to join | |
| break; | |
| } | |
| } |
There was a problem hiding this comment.
Fixed in 08ba6f92c. The join_next() result is now checked and JoinError is propagated as SyncError::JoinHandle(e), which will surface task panics instead of silently dropping them.
|
Re: 8. Code Duplication The shared part between
Extracting a shared function for a 4-line loop that operates on different input/output types would add a function boundary and import without meaningful deduplication. Keeping them inline is simpler and more readable. |
The original single-task constraint exists to prevent out-of-order deletes: batch N generates empty markers for ancestor paths of its committed nodes, and batch N+1 may write real node data at those same ancestor paths. If RocksDB serializes batch N's commit after batch N+1's, the empty marker overwrites the valid branch node, corrupting the trie.
|
Re: 1. Storage healing: Potential data race with concurrent writes Good catch — this is a real race condition. Here's the concrete scenario:
The original comment ("avoid out of order deletes") was there for exactly this reason. Reverted to single-task writes in |
…rror handling - Remove intermediate Vec<H256> for code hashes in insert_accounts. Instead, add hashes directly into CodeHashCollector's HashSet during the .inspect() callback. The HashSet deduplicates, bounding memory to unique contract accounts (~5M on mainnet = ~160MB) instead of growing with duplicates. flush_if_needed() is called once after the sync trie build completes. - Handle JoinSet errors in storage healing: propagate JoinError from db_joinset.join_next() instead of silently dropping task panics.
|
@greptile-apps review again |
Greptile OverviewGreptile SummaryThis PR focuses on reducing snap-sync/healing write-path overhead by removing avoidable allocations and iterations, and adding lightweight timing instrumentation. Key changes:
Issue to fix before merge:
Confidence Score: 4/5
|
| Filename | Overview |
|---|---|
| crates/common/trie/trie_sorted.rs | Adds timing debug logs to flushes and uses available_parallelism() for thread pool size; no functional changes found. |
| crates/networking/p2p/sync/healing/state.rs | Fixes parent-path deletion markers using BTreeMap::entry, adds timing, but still drops JoinSet task errors on join_all() at completion/stale exits. |
| crates/networking/p2p/sync/healing/storage.rs | Switches storage batch encoding to dedup via BTreeMap and uses sync DB write; however db_joinset.join_all().await ignores possible JoinErrors from DB tasks. |
| crates/networking/p2p/sync/snap_sync.rs | Merges account scan passes and adds timing; code hash collection now happens during trie build with a single flush afterward (per prior thread fix). |
| crates/storage/store.rs | Extracts shared storage trie node write logic and adds a synchronous variant to avoid block_on inside spawn_blocking; no correctness issues found. |
| crates/storage/trie.rs | Optimizes BackendTrieDB::put_batch to avoid per-key Vec alloc and adds a non-allocating batch write override with timing logs; behavior preserved. |
Sequence Diagram
sequenceDiagram
participant HealState as heal_state_trie/heal_storage_trie
participant JS as db_joinset
participant Block as spawn_blocking
participant Store as Store
participant DB as TrieDB/BackendTrieDB
loop Healing loop
HealState->>HealState: accumulate nodes_to_write
alt flush threshold OR done OR stale
opt previous task exists
HealState->>JS: join_next().await
JS-->>HealState: Result<(), JoinError>
end
HealState->>Block: spawn_blocking(encode + write)
Block->>Store: open trie / begin_write
Store->>DB: put_batch / write_storage_trie_nodes_batch_sync
DB-->>Store: commit
Store-->>Block: Ok
Block-->>JS: task completes
end
end
alt exit (done/stale)
HealState->>JS: join_all().await
JS-->>HealState: "JoinErrors currently ignored"
end
Additional Comments (2)
Prompt To Fix With AIThis is a comment left during a code review.
Path: crates/networking/p2p/sync/healing/storage.rs
Line: 250:252
Comment:
**Errors from DB tasks ignored**
`db_joinset.join_all().await;` (and the equivalent on the stale path) drops `JoinError`s from the `spawn_blocking` DB write tasks, so a panic in the write closure can be silently ignored and healing can return `Ok(true/false)` as if data was committed. This should propagate/abort on `JoinError` (at least panic) similarly to the earlier `join_next()` handling.
How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix With AIThis is a comment left during a code review.
Path: crates/networking/p2p/sync/healing/state.rs
Line: 323:326
Comment:
**JoinSet results are dropped**
`db_joinset.join_all().await;` discards `JoinError`s from the `spawn_blocking` batch write tasks. If any DB write panics, state healing can still complete and return `Ok(paths.is_empty())` with missing data. Consider iterating `join_next().await` and returning an error on `Err(JoinError)` (and/or logging panics) so write failures don’t get swallowed.
How can I resolve this? If you propose a fix, please make it concise. |
…in healing state.rs used .expect().? while storage.rs used pattern matching — now both use a shared helper that eliminates the .expect() panic path.
…awn_blocking closures - Closures now return Result<(), SyncError> instead of panicking on DB errors - Replace join_all() with drain_pending_tasks() that checks each task result - Skip spawning no-op tasks when the write buffer is empty
|
Addressed the remaining concern about
Commits: |
|
Re: Greptile's two additional comments about Both are fixed in commits
|
|
@greptile-apps review again |
Greptile OverviewGreptile SummaryThis PR optimizes snap-sync and healing write hot paths by (1) removing per-key Vec allocations in Overall the changes fit cleanly into the existing storage/trie abstractions and the healing pipeline (single in-flight DB write), and the added tracing helps validate perf wins in production runs. The main merge blockers are remaining Confidence Score: 3/5
|
| Filename | Overview |
|---|---|
| crates/common/trie/trie_sorted.rs | Adds debug timing around flush_nodes_to_write and switches trie build ThreadPool size to available_parallelism(). |
| crates/networking/p2p/sync/healing/mod.rs | Introduces JoinSet helpers wait_for_pending_task/drain_pending_tasks for propagating DB write task errors. |
| crates/networking/p2p/sync/healing/state.rs | Changes healing DB write batching to use BTreeMap entry markers, adds instrumentation, and moves JoinSet handling to shared helpers; still does sync decoding inside iterator with panics/ignores. |
| crates/networking/p2p/sync/healing/storage.rs | Fixes parent-marker overwrite via BTreeMap entry API, switches to sync Store write inside spawn_blocking, and adopts JoinSet helpers for error propagation. |
| crates/networking/p2p/sync/snap_sync.rs | Merges account scan passes by collecting code hashes during trie build; introduces debug timing but adds unconditional expect()s on RocksDB and RLP decode. |
| crates/storage/store.rs | Extracts storage trie node write inner loop and adds a synchronous write_storage_trie_nodes_batch_sync variant used by healing. |
| crates/storage/trie.rs | Optimizes BackendTrieDB put_batch to avoid per-key Vec allocations and adds put_batch_no_alloc override with debug timing; still clones Nibbles per node. |
Sequence Diagram
sequenceDiagram
participant Snap as snap_sync::insert_accounts
participant TempDB as RocksDB temp accounts DB
participant Trie as trie_from_sorted_accounts_wrap
participant Collector as CodeHashCollector
participant StateDB as BackendTrieDB
participant Heal as healing::{state,storage}
participant JS as JoinSet
Snap->>TempDB: ingest_external_file(snapshots)
Snap->>TempDB: full_iterator(Start)
Snap->>Trie: build trie from sorted accounts
Trie-->>Snap: iter.inspect(decode AccountState)
Snap->>Collector: add(code_hash)
Snap->>StateDB: put_batch_no_alloc(nodes)
Snap->>Collector: flush_if_needed() (after trie build)
Heal->>JS: spawn_blocking(DB write task)
Heal->>JS: wait_for_pending_task(join_next)
Heal->>StateDB: put_batch(encoded nodes)
Heal->>JS: drain_pending_tasks() on done/stale
| .account_tries_inserted | ||
| .fetch_add(1, Ordering::Relaxed); | ||
| let account_state = AccountState::decode(v).expect("We should have accounts here"); |
There was a problem hiding this comment.
Panics on bad input
insert_accounts now hard-panics on any temp RocksDB iterator error (.expect(...)) and on any malformed account value (AccountState::decode(v).expect(...)). These are external inputs (snapshot files / DB iterator) and can be corrupted; panicking will crash the sync process instead of returning SyncError like the rest of this function does.
Consider propagating these as SyncError::RocksDBError / SyncError::Rlp instead of using expect inside the iterator (map_err(...)?).
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/sync/snap_sync.rs
Line: 972:974
Comment:
**Panics on bad input**
`insert_accounts` now hard-panics on any temp RocksDB iterator error (`.expect(...)`) and on any malformed account value (`AccountState::decode(v).expect(...)`). These are external inputs (snapshot files / DB iterator) and can be corrupted; panicking will crash the sync process instead of returning `SyncError` like the rest of this function does.
Consider propagating these as `SyncError::RocksDBError` / `SyncError::Rlp` instead of using `expect` inside the iterator (`map_err(...)?`).
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
This absolutely shouldn't happen. The only scenario is where the user has a faulty file system.
|
|
||
| // Re-export shared types for external use | ||
| #[allow(unused_imports)] | ||
| pub use types::{HealingQueueEntry, StateHealingQueue}; | ||
|
|
||
| /// Waits for a pending task in the JoinSet to complete, propagating any error. | ||
| /// Used to ensure only a single background DB write is in flight at a time. | ||
| async fn wait_for_pending_task( |
There was a problem hiding this comment.
Incorrect JoinSet None case
In wait_for_pending_task, the None arm is treated as success (Some(Ok(Ok(()))) | None => {}), but None from join_next().await can only happen when the JoinSet is empty. Since this function only calls join_next() after checking !joinset.is_empty(), hitting None indicates a logic invariant violation and would silently drop a pending write.
It’s safer to handle None as unreachable!()/error (or remove it) so a broken invariant doesn’t get masked.
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/sync/healing/mod.rs
Line: 15:22
Comment:
**Incorrect JoinSet None case**
In `wait_for_pending_task`, the `None` arm is treated as success (`Some(Ok(Ok(()))) | None => {}`), but `None` from `join_next().await` can only happen when the JoinSet is empty. Since this function only calls `join_next()` after checking `!joinset.is_empty()`, hitting `None` indicates a logic invariant violation and would silently drop a pending write.
It’s safer to handle `None` as `unreachable!()`/error (or remove it) so a broken invariant doesn’t get masked.
How can I resolve this? If you propose a fix, please make it concise.join_next() cannot return None after an is_empty() check, so treat it as an invariant violation rather than silently ignoring it.
|
Re: Greptile's two latest comments: 1. Panics on bad input in These are pre-existing — line 969 already has a 2. Incorrect JoinSet Valid point. Fixed in commit |
|
@greptile-apps review it again |
Greptile OverviewGreptile SummaryThis PR optimizes snap sync’s hottest write paths by (1) removing per-key Vec allocations in The changes fit into the codebase’s existing architecture by keeping trie insertion logic within the Confidence Score: 3/5
|
| Filename | Overview |
|---|---|
| crates/networking/p2p/sync/snap_sync.rs | Merged temp-DB scan into trie-build pass and added timings; still contains expect() panics on RocksDB iterator errors and RLP decode in the new iterator pipeline. |
| crates/storage/trie.rs | Optimized trie batch write path by removing per-key Vec allocation, added put_batch_no_alloc override, and added debug timing; change is localized and preserves commit semantics. |
| crates/storage/store.rs | Extracted shared storage-trie write inner loop and added synchronous wrapper for use inside spawn_blocking, reducing thread-hopping without altering write/delete behavior. |
| crates/networking/p2p/sync/healing/mod.rs | Introduced JoinSet helpers to wait/drain background DB tasks with proper error propagation; None case is now treated as unreachable after is_empty check. |
| crates/networking/p2p/sync/healing/state.rs | Fixed parent-marker overwrite bug using entry().or_insert, avoided spawning no-op DB tasks, added timing, and switched to helper-based JoinSet error handling. |
| crates/networking/p2p/sync/healing/storage.rs | Applied same parent-marker overwrite fix via BTreeMap entry, switched to synchronous store write inside spawn_blocking, and adopted shared JoinSet draining with error propagation. |
| crates/common/trie/trie_sorted.rs | Added debug timing for flush_nodes_to_write and replaced hardcoded thread count with available_parallelism() fallback; no functional trie logic changes. |
| CHANGELOG.md | Added perf changelog entry describing snap sync insertion/healing write-path optimizations. |
Sequence Diagram
sequenceDiagram
participant Snap as snap_sync::insert_accounts
participant Rocks as Temp RocksDB iterator
participant Trie as trie_sorted::trie_from_sorted_accounts_wrap
participant DB as BackendTrieDB/Store
participant HealS as healing::state
participant HealSt as healing::storage
participant JS as JoinSet<Result<(), SyncError>>
Snap->>Rocks: full_iterator(Start)
Snap->>Trie: trie_from_sorted_accounts_wrap(db, iter.map/inspect)
loop for each (k,v)
Rocks-->>Trie: (key,value)
Trie-->>Snap: inspect(k,v)
Snap->>Snap: AccountState::decode(v)
Snap->>Snap: code_hash_collector.add(code_hash)
Snap->>Snap: storage_accounts insert(storage_root)
end
Trie-->>Snap: compute_state_root
Snap->>Snap: code_hash_collector.flush_if_needed().await
par Healing state writes
HealS->>JS: wait_for_pending_task(join_next)
HealS->>JS: spawn_blocking(encode + db.put_batch)
JS->>DB: open_direct_state_trie + put_batch + commit
HealS->>JS: drain_pending_tasks()
and Healing storage writes
HealSt->>JS: wait_for_pending_task(join_next)
HealSt->>JS: spawn_blocking(encode + write_storage_trie_nodes_batch_sync)
JS->>DB: begin_write + put/delete + commit
HealSt->>JS: drain_pending_tasks()
end
| @@ -976,6 +972,9 @@ async fn insert_accounts( | |||
| .account_tries_inserted | |||
| .fetch_add(1, Ordering::Relaxed); | |||
| let account_state = AccountState::decode(v).expect("We should have accounts here"); | |||
There was a problem hiding this comment.
Panics on iterator/rlp decode
insert_accounts still has expect(...) calls inside the RocksDB iterator (.map(|k| k.expect(...)) and AccountState::decode(v).expect(...)). These are external inputs (temp RocksDB iterator + snapshot content), so a single iterator error or malformed RLP will crash the whole sync instead of returning a SyncError like the rest of the function does. This should be converted to map_err(...)? / Result propagation so snap sync can fail gracefully with a useful error.
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/sync/snap_sync.rs
Line: 966:974
Comment:
**Panics on iterator/rlp decode**
`insert_accounts` still has `expect(...)` calls inside the RocksDB iterator (`.map(|k| k.expect(...))` and `AccountState::decode(v).expect(...)`). These are external inputs (temp RocksDB iterator + snapshot content), so a single iterator error or malformed RLP will crash the whole sync instead of returning a `SyncError` like the rest of the function does. This should be converted to `map_err(...)?` / `Result` propagation so snap sync can fail gracefully with a useful error.
How can I resolve this? If you propose a fix, please make it concise.
Benchmark Block Execution Results Comparison Against Main
|
| } | ||
| scope(|s| { | ||
| let pool = ThreadPool::new(12, s); | ||
| let thread_count = std::thread::available_parallelism() |
There was a problem hiding this comment.
This should be at least more than 2. If there is only a single thread it will wait on the write buffers getting freed.
| let store = store.clone(); | ||
| // NOTE: we keep only a single task in the background to avoid out of order deletes | ||
| if !db_joinset.is_empty() { | ||
| // Allow up to 2 tasks in flight to overlap encoding with DB commit | ||
| while db_joinset.len() >= 2 { | ||
| db_joinset | ||
| .join_next() | ||
| .await | ||
| .expect("we just checked joinset is not empty")?; | ||
| .expect("joinset is not empty")?; | ||
| } |
There was a problem hiding this comment.
I think the suggestion from the bot is wrong, but I would check the types of the task to make it return errors.
| .account_tries_inserted | ||
| .fetch_add(1, Ordering::Relaxed); | ||
| let account_state = AccountState::decode(v).expect("We should have accounts here"); |
There was a problem hiding this comment.
This absolutely shouldn't happen. The only scenario is where the user has a faulty file system.
|
|
||
| /// Drains all pending tasks from the JoinSet, propagating the first error encountered. | ||
| async fn drain_pending_tasks( | ||
| joinset: &mut JoinSet<Result<(), SyncError>>, |
There was a problem hiding this comment.
When drain_pending_tasks encounters the first error, it returns immediately and remaining tasks stay in the JoinSet. On drop, those tasks get cancelled mid-execution. Since WriteBatch::commit is atomic at the RocksDB level, partial writes shouldn't happen — but any tasks that haven't called commit yet will lose their work silently.
Is that acceptable here? The caller (healing loop) presumably retries from scratch on error, so losing in-flight work is fine. Just want to confirm that's the intent, since the old join_all would have waited for all tasks to complete (though it didn't propagate errors).
| let account_count = to_write.len(); | ||
| let mut encoded_to_write = vec![]; | ||
| for (hashed_account, nodes) in to_write { | ||
| let mut account_nodes = std::collections::BTreeMap::new(); |
There was a problem hiding this comment.
nit: std::collections::BTreeMap::new() — this could use a use std::collections::BTreeMap; import at the top of the closure or file, matching the pattern in state.rs where BTreeMap is already imported.
| let start = std::time::Instant::now(); | ||
| // We collect code hashes directly into the collector's HashSet during the trie | ||
| // build pass. The collector deduplicates, so memory is bounded by unique contract | ||
| // accounts (~5M on mainnet = ~160MB). We can't call flush_if_needed() here because |
There was a problem hiding this comment.
The comment says ~5M entries = ~160MB, but HashSet<H256> has per-entry overhead beyond the raw 32-byte key — typically ~72 bytes/entry on 64-bit (key + hash + bucket metadata + load factor headroom). At 5M entries that's closer to 350-400MB peak during the synchronous trie build.
This might be fine for the target hardware running snap sync, but the comment understates actual memory usage. Worth either correcting the estimate or noting that it's the raw key size only.
Motivation
During snap sync, the node downloads the entire Ethereum state (accounts, storage slots, bytecodes) and rebuilds the state trie locally. This process involves writing tens of millions of trie nodes to RocksDB. Profiling revealed several bottlenecks in the write path that caused unnecessary memory allocations, redundant iterations, and thread scheduling overhead. These optimizations target the hot paths to reduce wall-clock time for both initial state insertion and the subsequent healing phase.
Bottleneck Summary
BackendTrieDB::put_batchVecper key (~20k allocs per flush)insert_accounts(rocksdb)put_batch_no_allocdefaultinsertoverwrites real node data with empty markers;block_ondouble thread hop.expect()panics instead of returning errors;join_all()swallows task failurestrie_from_sorted_accounts_wrapChanges Implemented
Phase 0: Instrumentation
Added
tracing::debug!timing to key code paths for ongoing performance monitoring. No correctness changes.Step 0.1 — Time
BackendTrieDB::put_batchandput_batch_no_alloccrates/storage/trie.rsStep 0.2 — Time
flush_nodes_to_writecrates/common/trie/trie_sorted.rsStep 0.3 — Time healing batch writes
crates/networking/p2p/sync/healing/state.rs,crates/networking/p2p/sync/healing/storage.rsspawn_blocking.Step 0.4 — Time
insert_accountscrates/networking/p2p/sync/snap_sync.rsPhase 1: High-Impact Fixes
Step 1.1 — Fix
BackendTrieDB::put_batchper-key allocationcrates/storage/trie.rstx.put_batch(table, vec![(key, value)]), allocating a newVecfor every single key-value pair. With ~20,000 nodes per flush, this meant ~20,000 unnecessary heap allocations per flush cycle.tx.put(table, &key, &value)directly, which callsbatch.put_cfon the RocksDBWriteBatchwithout any intermediate allocation. TheStorageWriteBatchtrait already had aputmethod, andRocksDBWriteTxalready overrides it to callself.batch.put_cfdirectly.Vecallocations per flush. With multiple flushes per sync, this removes millions of allocations over a full sync.Step 1.2 — Override
put_batch_no_alloconBackendTrieDBcrates/storage/trie.rsTrieDBtrait (crates/common/trie/db.rs) encodes all nodes via.map(|node| (path.clone(), node.encode_to_vec())).collect(), building a fullVec<(Nibbles, Vec<u8>)>in memory, then passes it toput_batchwhich previously allocated anotherVecper key.BackendTrieDBthat opens a single write transaction, iterates over the&[(Nibbles, Node)]slice, encodes each node and writes it directly viatx.put()in one pass, then commits once. No intermediate collection.Vecallocation (which at 20k nodes * ~100 bytes = ~2MB per flush) and removes the encode-then-collect-then-write pipeline in favor of encode-and-write-immediately.Step 1.3 — Merge two-pass iteration in
insert_accountscrates/networking/p2p/sync/snap_sync.rsinsert_accountsfunction iterated over all accounts in the temporary RocksDB twice: once to collect code hashes, and again to build the trie. Both passes decodedAccountStatefrom RLP. For mainnet with ~250M+ accounts, this doubled the I/O and CPU cost.CodeHashCollector's internalHashSetduring the.inspect()callback of the trie build pass, which already decodesAccountState. TheHashSetdeduplicates, so memory is bounded by unique contract accounts (~5M on mainnet = ~160MB). Aftertrie_from_sorted_accounts_wrapreturns, remaining code hashes are flushed viaflush_if_needed().flush_if_needed()inside the synchronous iterator). This slightly delays bytecode download start but eliminates a full scan of the temp DB (~250M+ reads).Phase 2: Medium-Impact Fixes
Step 2.1 — Remove
block_onbridging in storage healingcrates/storage/store.rs,crates/networking/p2p/sync/healing/storage.rsspawn_blockingclosure →spawned_rt::tasks::block_on→async write_storage_trie_nodes_batch→tokio::task::spawn_blocking(internally). This created a double thread hop: the outerspawn_blockingmoves work to a blocking thread, thenblock_onparks that thread waiting for the async function, which itself spawns another blocking task. Two context switches for no benefit.write_storage_trie_nodes_batch_synctoStorethat shares the same inner logic (extracted towrite_storage_trie_nodes_inner) but runs entirely synchronously. The healing code now calls this sync version directly from itsspawn_blockingclosure.Step 2.2 — Use
entryAPI to fix healing data loss bugcrates/networking/p2p/sync/healing/state.rs,crates/networking/p2p/sync/healing/storage.rsinsert(path.slice(0, i), vec![])was used to mark parent nodes for deletion. However,BTreeMap::insertalways overwrites. If a real node at path P was processed before a longer path P' (where P is an ancestor prefix of P'), the parent marker loop would overwrite the real node data at P with an emptyvec![], silently deleting a valid node. This would cause extra healing cycles to re-download those nodes.entry(path.slice(0, i)).or_insert(vec![]), which only inserts the empty placeholder if no entry exists yet. This preserves any real node data that was already stored for that path. In storage healing, replaced the intermediateVecof tuples with aBTreeMapto enable the same deduplication.Step 2.3 — Proper error propagation in healing background tasks
crates/networking/p2p/sync/healing/mod.rs,crates/networking/p2p/sync/healing/state.rs,crates/networking/p2p/sync/healing/storage.rsspawn_blockingclosures used.expect()for DB errors, which panics instead of returning a recoverable error. A transient DB failure would crash the entire sync process.join_all()was used to drain remaining tasks at loop exit. If a task panicked (from.expect()),join_all()propagates the panic rather than returning aResult. No way to handle the error gracefully..expect("...").?while storage.rs used pattern matching.Result<(), SyncError>with?instead of.expect(). BothStoreErrorandTrieErrorconvert toSyncErrorvia existingFromimpls.healing/mod.rs:wait_for_pending_task()(waits for one in-flight task before spawning the next) anddrain_pending_tasks()(drains all remaining tasks at loop exit). Both propagate task errors andJoinErrorasResult.if !to_write.is_empty()guard to skip spawning no-op tasks when the buffer is empty (happens whenis_done/is_staletriggers a flush with nothing to write).Phase 3: Lower-Impact Fixes
Step 3.1 — Use
available_parallelismintrie_from_sorted_accounts_wrapcrates/common/trie/trie_sorted.rsThreadPool::new(12, s)hardcoded 12 threads for the trie building thread pool, regardless of the machine's actual CPU count.std::thread::available_parallelism().map(|n| n.get()).unwrap_or(8). On machines with fewer cores (e.g., 4-core CI runners), this avoids oversubscription. On machines with more cores (e.g., 32-core servers), this takes full advantage of available parallelism.Files Modified
crates/storage/trie.rsput_batchper-key allocation; addedput_batch_no_allocoverride; added timingcrates/storage/store.rswrite_storage_trie_nodes_batch_sync; extracted sharedwrite_storage_trie_nodes_innercrates/common/trie/trie_sorted.rsavailable_parallelismcrates/networking/p2p/sync/snap_sync.rsHashSetdirectly; added timingcrates/networking/p2p/sync/healing/mod.rswait_for_pending_taskanddrain_pending_tasksshared helperscrates/networking/p2p/sync/healing/state.rsentryAPI fix;Resultreturn from closures;drain_pending_tasks; empty batch guard; timingcrates/networking/p2p/sync/healing/storage.rsentryAPI fix; sync DB write;Resultreturn from closures;drain_pending_tasks; empty batch guard; timingHow to Test
Automated Tests (all passing)
cargo test -p ethrex-trie— 55/55 passcargo test -p ethrex-p2p --features rocksdb— 38/38 passcargo test -p ethrex-test snap_server— 12/12 passcargo clippy -p ethrex-trie -p ethrex-storage -p ethrex-p2p --features rocksdb— 0 warningsRecommended Manual Validation
To fully validate, the changes require a live snap sync run:
healing_done = truewith no extra cyclesRisk Assessment
put_batch→putbatch.put_cfcall, just without Vec wrapperput_batch_no_allocoverrideHashSet, flushed after trie buildentryAPIinsertbehaviordrain_pending_tasksjoin_all()with explicit error checking per task