Skip to content

Commit

Permalink
feat(tree): schedule block removal on disk reorgs (#10603)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected authored Sep 4, 2024
1 parent 22e9c1d commit 98b214f
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 14 deletions.
20 changes: 12 additions & 8 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use reth_chain_state::ExecutedBlock;
use reth_db::Database;
use reth_errors::ProviderError;
use reth_primitives::B256;
use reth_provider::{writer::UnifiedStorageWriter, ProviderFactory, StaticFileProviderFactory};
use reth_provider::{
writer::UnifiedStorageWriter, BlockHashReader, ProviderFactory, StaticFileProviderFactory,
};
use reth_prune::{Pruner, PrunerError, PrunerOutput};
use std::{
sync::mpsc::{Receiver, SendError, Sender},
Expand Down Expand Up @@ -67,9 +69,9 @@ where
while let Ok(action) = self.incoming.recv() {
match action {
PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
self.on_remove_blocks_above(new_tip_num)?;
let result = self.on_remove_blocks_above(new_tip_num)?;
// we ignore the error because the caller may or may not care about the result
let _ = sender.send(());
let _ = sender.send(result);
}
PersistenceAction::SaveBlocks(blocks, sender) => {
let result = self.on_save_blocks(blocks)?;
Expand All @@ -87,17 +89,18 @@ where
Ok(())
}

fn on_remove_blocks_above(&self, new_tip_num: u64) -> Result<(), PersistenceError> {
fn on_remove_blocks_above(&self, new_tip_num: u64) -> Result<Option<B256>, PersistenceError> {
debug!(target: "tree::persistence", ?new_tip_num, "Removing blocks");
let start_time = Instant::now();
let provider_rw = self.provider.provider_rw()?;
let sf_provider = self.provider.static_file_provider();

let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?;
UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?;

self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
Ok(())
Ok(new_tip_hash)
}

fn on_save_blocks(&self, blocks: Vec<ExecutedBlock>) -> Result<Option<B256>, PersistenceError> {
Expand Down Expand Up @@ -143,7 +146,7 @@ pub enum PersistenceAction {
///
/// This will first update checkpoints from the database, then remove actual block data from
/// static files.
RemoveBlocksAbove(u64, oneshot::Sender<()>),
RemoveBlocksAbove(u64, oneshot::Sender<Option<B256>>),

/// Prune associated block data before the given block number, according to already-configured
/// prune modes.
Expand Down Expand Up @@ -216,11 +219,12 @@ impl PersistenceHandle {
/// Tells the persistence service to remove blocks above a certain block number. The removed
/// blocks are returned by the service.
///
/// When the operation completes, `()` is returned in the receiver end of the sender argument.
/// When the operation completes, the new tip hash is returned in the receiver end of the sender
/// argument.
pub fn remove_blocks_above(
&self,
block_num: u64,
tx: oneshot::Sender<()>,
tx: oneshot::Sender<Option<B256>>,
) -> Result<(), SendError<PersistenceAction>> {
self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
}
Expand Down
72 changes: 66 additions & 6 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use reth_rpc_types::{
use reth_stages_api::ControlFlow;
use reth_trie::{updates::TrieUpdates, HashedPostState};
use std::{
cmp::Ordering,
collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque},
fmt::Debug,
ops::Bound,
Expand Down Expand Up @@ -530,6 +531,7 @@ where
last_persisted_block_hash: header.hash(),
last_persisted_block_number: best_block_number,
rx: None,
remove_above_state: None,
};

let (tx, outgoing) = tokio::sync::mpsc::unbounded_channel();
Expand Down Expand Up @@ -1050,14 +1052,21 @@ where
/// If we're currently awaiting a response this will try to receive the response (non-blocking)
/// or send a new persistence action if necessary.
fn advance_persistence(&mut self) -> Result<(), TryRecvError> {
if self.should_persist() && !self.persistence_state.in_progress() {
let blocks_to_persist = self.get_canonical_blocks_to_persist();
if blocks_to_persist.is_empty() {
debug!(target: "engine", "Returned empty set of blocks to persist");
} else {
if !self.persistence_state.in_progress() {
if let Some(new_tip_num) = self.persistence_state.remove_above_state.take() {
debug!(target: "engine", ?new_tip_num, "Removing blocks using persistence task");
let (tx, rx) = oneshot::channel();
let _ = self.persistence.save_blocks(blocks_to_persist, tx);
let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
self.persistence_state.start(rx);
} else if self.should_persist() {
let blocks_to_persist = self.get_canonical_blocks_to_persist();
if blocks_to_persist.is_empty() {
debug!(target: "engine", "Returned empty set of blocks to persist");
} else {
let (tx, rx) = oneshot::channel();
let _ = self.persistence.save_blocks(blocks_to_persist, tx);
self.persistence_state.start(rx);
}
}
}

Expand Down Expand Up @@ -1794,13 +1803,55 @@ where
None
}

/// This determines whether or not we should remove blocks from the chain, based on a canonical
/// chain update.
///
/// If the chain update is a reorg:
/// * is the new chain behind the last persisted block, or
/// * if the root of the new chain is at the same height as the last persisted block, is it a
/// different block
///
/// If either of these are true, then this returns the height of the first block. Otherwise,
/// this returns [`None`]. This should be used to check whether or not we should be sending a
/// remove command to the persistence task.
fn find_disk_reorg(&self, chain_update: &NewCanonicalChain) -> Option<u64> {
let NewCanonicalChain::Reorg { new, old: _ } = chain_update else { return None };

let BlockNumHash { number: new_num, hash: new_hash } =
new.first().map(|block| block.block.num_hash())?;

match new_num.cmp(&self.persistence_state.last_persisted_block_number) {
Ordering::Greater => {
// new number is above the last persisted block so the reorg can be performed
// entirely in memory
None
}
Ordering::Equal => {
// new number is the same, if the hash is the same then we should not need to remove
// any blocks
(self.persistence_state.last_persisted_block_hash != new_hash).then_some(new_num)
}
Ordering::Less => {
// this means we are below the last persisted block and must remove on disk blocks
Some(new_num)
}
}
}

/// Invoked when we the canonical chain has been updated.
///
/// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain) {
trace!(target: "engine", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
let start = Instant::now();

// schedule a remove_above call if we have an on-disk reorg
if let Some(height) = self.find_disk_reorg(&chain_update) {
// calculate the new tip by subtracting one from the lowest part of the chain
let new_tip_num = height.saturating_sub(1);
self.persistence_state.schedule_removal(new_tip_num);
}

// update the tracked canonical head
self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());

Expand Down Expand Up @@ -2308,6 +2359,9 @@ pub struct PersistenceState {
///
/// This tracks the chain height that is persisted on disk
last_persisted_block_number: u64,
/// The block above which blocks should be removed from disk, because there has been an on disk
/// reorg.
remove_above_state: Option<u64>,
}

impl PersistenceState {
Expand All @@ -2322,6 +2376,12 @@ impl PersistenceState {
self.rx = Some((rx, Instant::now()));
}

/// Sets the `remove_above_state`, to the new tip number specified.
fn schedule_removal(&mut self, new_tip_num: u64) {
// TODO: what about multiple on-disk reorgs in a row?
self.remove_above_state = Some(new_tip_num);
}

/// Sets state for a finished persistence task.
fn finish(&mut self, last_persisted_block_hash: B256, last_persisted_block_number: u64) {
trace!(target: "engine", block= %last_persisted_block_number, hash=%last_persisted_block_hash, "updating persistence state");
Expand Down

0 comments on commit 98b214f

Please sign in to comment.