From f35873d0947230b9f544ad94121030e2b05aea37 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 12 Oct 2023 10:13:01 +0300 Subject: [PATCH] refactor(prune): impl `Segment` for other prune segments (#4899) --- bin/reth/src/node/events.rs | 4 +- crates/primitives/src/prune/mod.rs | 5 - crates/prune/src/event.rs | 2 +- crates/prune/src/pruner.rs | 1523 ++--------------- crates/prune/src/segments/account_history.rs | 226 +++ crates/prune/src/segments/headers.rs | 26 +- crates/prune/src/segments/history.rs | 112 ++ crates/prune/src/segments/mod.rs | 53 +- crates/prune/src/segments/receipts.rs | 31 +- crates/prune/src/segments/receipts_by_logs.rs | 305 ++++ crates/prune/src/segments/sender_recovery.rs | 183 ++ crates/prune/src/segments/storage_history.rs | 232 +++ .../prune/src/segments/transaction_lookup.rs | 202 +++ crates/prune/src/segments/transactions.rs | 25 +- 14 files changed, 1459 insertions(+), 1470 deletions(-) create mode 100644 crates/prune/src/segments/account_history.rs create mode 100644 crates/prune/src/segments/history.rs create mode 100644 crates/prune/src/segments/receipts_by_logs.rs create mode 100644 crates/prune/src/segments/sender_recovery.rs create mode 100644 crates/prune/src/segments/storage_history.rs create mode 100644 crates/prune/src/segments/transaction_lookup.rs diff --git a/bin/reth/src/node/events.rs b/bin/reth/src/node/events.rs index c340546fce10..21e2cd76c0e9 100644 --- a/bin/reth/src/node/events.rs +++ b/bin/reth/src/node/events.rs @@ -163,8 +163,8 @@ impl NodeState { fn handle_pruner_event(&self, event: PrunerEvent) { match event { - PrunerEvent::Finished { tip_block_number, elapsed, segments } => { - info!(tip_block_number, ?elapsed, ?segments, "Pruner finished"); + PrunerEvent::Finished { tip_block_number, elapsed, stats } => { + info!(tip_block_number, ?elapsed, ?stats, "Pruner finished"); } } } diff --git a/crates/primitives/src/prune/mod.rs b/crates/primitives/src/prune/mod.rs index e4ef81a58b7c..f02fa64a6acb 100644 --- a/crates/primitives/src/prune/mod.rs +++ b/crates/primitives/src/prune/mod.rs @@ -110,9 +110,4 @@ impl PruneProgress { Self::HasMoreData } } - - /// Returns `true` if pruning has been finished. - pub fn is_finished(&self) -> bool { - matches!(self, Self::Finished) - } } diff --git a/crates/prune/src/event.rs b/crates/prune/src/event.rs index c384dacf8fc9..7599b809b608 100644 --- a/crates/prune/src/event.rs +++ b/crates/prune/src/event.rs @@ -8,6 +8,6 @@ pub enum PrunerEvent { Finished { tip_block_number: BlockNumber, elapsed: Duration, - segments: BTreeMap, + stats: BTreeMap, }, } diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 3feedd08a2b8..e67b5bc5df4d 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -5,39 +5,44 @@ use crate::{ segments::{PruneInput, Segment}, Metrics, PrunerError, PrunerEvent, }; -use rayon::prelude::*; -use reth_db::{ - abstraction::cursor::{DbCursorRO, DbCursorRW}, - database::Database, - models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress, ShardedKey}, - table::Table, - tables, - transaction::DbTxMut, - BlockNumberList, -}; -use reth_interfaces::RethResult; +use reth_db::database::Database; use reth_primitives::{ - listener::EventListeners, BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, - PruneProgress, PruneSegment, TxNumber, MINIMUM_PRUNING_DISTANCE, -}; -use reth_provider::{ - BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, PruneCheckpointWriter, - TransactionsProvider, + listener::EventListeners, BlockNumber, ChainSpec, PruneMode, PruneModes, PruneProgress, + PruneSegment, PruneSegmentError, }; +use reth_provider::{ProviderFactory, PruneCheckpointReader}; use reth_snapshot::HighestSnapshotsTracker; -use std::{collections::BTreeMap, ops::RangeInclusive, sync::Arc, time::Instant}; +use std::{collections::BTreeMap, sync::Arc, time::Instant}; use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing::{debug, error, instrument, trace}; +use tracing::{debug, trace}; /// Result of [Pruner::run] execution. pub type PrunerResult = Result; -/// Result of segment pruning. -type PruneSegmentResult = Result<(PruneProgress, usize), PrunerError>; - /// The pruner type itself with the result of [Pruner::run] pub type PrunerWithResult = (Pruner, PrunerResult); +type RunnableSegmentGetPruneTargetBlockResult = + Result, PruneSegmentError>; + +struct PrunableSegment( + Box>, + #[allow(clippy::type_complexity)] + Box RunnableSegmentGetPruneTargetBlockResult>, +); + +impl PrunableSegment { + fn new< + S: Segment + 'static, + F: Fn(&PruneModes, BlockNumber) -> RunnableSegmentGetPruneTargetBlockResult + 'static, + >( + segment: S, + f: F, + ) -> Self { + Self(Box::new(segment), Box::new(f)) + } +} + /// Pruning routine. Main pruning logic happens in [Pruner::run]. #[derive(Debug)] pub struct Pruner { @@ -100,7 +105,7 @@ impl Pruner { let provider = self.provider_factory.provider_rw()?; let mut done = true; - let mut segments = BTreeMap::new(); + let mut stats = BTreeMap::new(); // TODO(alexey): prune snapshotted segments of data (headers, transactions) let highest_snapshots = *self.highest_snapshots_tracker.borrow(); @@ -114,172 +119,92 @@ impl Pruner { .map_or(1, |previous_tip_block_number| tip_block_number - previous_tip_block_number) as usize; - if let (Some((to_block, prune_mode)), true) = - (self.modes.prune_target_block_receipts(tip_block_number)?, delete_limit > 0) - { - trace!( - target: "pruner", - segment = ?PruneSegment::Receipts, - %to_block, - ?prune_mode, - "Got target block to prune" - ); - - let segment_start = Instant::now(); - let segment = segments::Receipts::default(); - let output = segment.prune(&provider, PruneInput { to_block, delete_limit })?; - if let Some(checkpoint) = output.checkpoint { - segment.save_checkpoint(&provider, checkpoint.as_prune_checkpoint(prune_mode))?; + // TODO(alexey): this is cursed, refactor + let segments: [PrunableSegment; 5] = [ + PrunableSegment::new( + segments::Receipts::default(), + PruneModes::prune_target_block_receipts, + ), + PrunableSegment::new( + segments::TransactionLookup::default(), + PruneModes::prune_target_block_transaction_lookup, + ), + PrunableSegment::new( + segments::SenderRecovery::default(), + PruneModes::prune_target_block_sender_recovery, + ), + PrunableSegment::new( + segments::AccountHistory::default(), + PruneModes::prune_target_block_account_history, + ), + PrunableSegment::new( + segments::StorageHistory::default(), + PruneModes::prune_target_block_storage_history, + ), + ]; + + for PrunableSegment(segment, get_prune_target_block) in segments { + if delete_limit == 0 { + break } - self.metrics - .get_prune_segment_metrics(PruneSegment::Receipts) - .duration_seconds - .record(segment_start.elapsed()); - - done = done && output.done; - delete_limit = delete_limit.saturating_sub(output.pruned); - segments.insert( - PruneSegment::Receipts, - (PruneProgress::from_done(output.done), output.pruned), - ); - } else { - trace!(target: "pruner", prune_segment = ?PruneSegment::Receipts, "No target block to prune"); - } - - if !self.modes.receipts_log_filter.is_empty() { - let segment_start = Instant::now(); - let (segment_progress, deleted) = - self.prune_receipts_by_logs(&provider, tip_block_number, delete_limit)?; - self.metrics - .get_prune_segment_metrics(PruneSegment::ContractLogs) - .duration_seconds - .record(segment_start.elapsed()); - - done = done && segment_progress.is_finished(); - delete_limit = delete_limit.saturating_sub(deleted); - segments.insert(PruneSegment::ContractLogs, (segment_progress, deleted)); - } else { - trace!(target: "pruner", prune_segment = ?PruneSegment::ContractLogs, "No filter to prune"); - } - - if let (Some((to_block, prune_mode)), true) = - (self.modes.prune_target_block_transaction_lookup(tip_block_number)?, delete_limit > 0) - { - trace!( - target: "pruner", - prune_segment = ?PruneSegment::TransactionLookup, - %to_block, - ?prune_mode, - "Got target block to prune" - ); - - let segment_start = Instant::now(); - let (segment_progress, deleted) = - self.prune_transaction_lookup(&provider, to_block, prune_mode, delete_limit)?; - self.metrics - .get_prune_segment_metrics(PruneSegment::TransactionLookup) - .duration_seconds - .record(segment_start.elapsed()); - - done = done && segment_progress.is_finished(); - delete_limit = delete_limit.saturating_sub(deleted); - segments.insert(PruneSegment::TransactionLookup, (segment_progress, deleted)); - } else { - trace!( - target: "pruner", - prune_segment = ?PruneSegment::TransactionLookup, - "No target block to prune" - ); - } - if let (Some((to_block, prune_mode)), true) = - (self.modes.prune_target_block_sender_recovery(tip_block_number)?, delete_limit > 0) - { - trace!( - target: "pruner", - prune_segment = ?PruneSegment::SenderRecovery, - %to_block, - ?prune_mode, - "Got target block to prune" - ); + if let Some((to_block, prune_mode)) = + get_prune_target_block(&self.modes, tip_block_number)? + { + trace!( + target: "pruner", + segment = ?segment.segment(), + %to_block, + ?prune_mode, + "Got target block to prune" + ); - let segment_start = Instant::now(); - let (segment_progress, deleted) = - self.prune_transaction_senders(&provider, to_block, prune_mode, delete_limit)?; - self.metrics - .get_prune_segment_metrics(PruneSegment::SenderRecovery) - .duration_seconds - .record(segment_start.elapsed()); + let segment_start = Instant::now(); + let previous_checkpoint = provider.get_prune_checkpoint(segment.segment())?; + let output = segment + .prune(&provider, PruneInput { previous_checkpoint, to_block, delete_limit })?; + if let Some(checkpoint) = output.checkpoint { + segment + .save_checkpoint(&provider, checkpoint.as_prune_checkpoint(prune_mode))?; + } + self.metrics + .get_prune_segment_metrics(segment.segment()) + .duration_seconds + .record(segment_start.elapsed()); - done = done && segment_progress.is_finished(); - delete_limit = delete_limit.saturating_sub(deleted); - segments.insert(PruneSegment::SenderRecovery, (segment_progress, deleted)); - } else { - trace!( - target: "pruner", - prune_segment = ?PruneSegment::SenderRecovery, - "No target block to prune" - ); + done = done && output.done; + delete_limit = delete_limit.saturating_sub(output.pruned); + stats.insert( + segment.segment(), + (PruneProgress::from_done(output.done), output.pruned), + ); + } else { + trace!(target: "pruner", segment = ?segment.segment(), "No target block to prune"); + } } - if let (Some((to_block, prune_mode)), true) = - (self.modes.prune_target_block_account_history(tip_block_number)?, delete_limit > 0) - { - trace!( - target: "pruner", - prune_segment = ?PruneSegment::AccountHistory, - %to_block, - ?prune_mode, - "Got target block to prune" - ); - + // TODO(alexey): make it not a special case + if !self.modes.receipts_log_filter.is_empty() && delete_limit > 0 { let segment_start = Instant::now(); - let (segment_progress, deleted) = - self.prune_account_history(&provider, to_block, prune_mode, delete_limit)?; + let output = segments::ReceiptsByLogs::default().prune( + &provider, + &self.modes.receipts_log_filter, + tip_block_number, + delete_limit, + )?; self.metrics - .get_prune_segment_metrics(PruneSegment::AccountHistory) + .get_prune_segment_metrics(PruneSegment::ContractLogs) .duration_seconds .record(segment_start.elapsed()); - done = done && segment_progress.is_finished(); - delete_limit = delete_limit.saturating_sub(deleted); - segments.insert(PruneSegment::AccountHistory, (segment_progress, deleted)); - } else { - trace!( - target: "pruner", - prune_segment = ?PruneSegment::AccountHistory, - "No target block to prune" - ); - } - - if let (Some((to_block, prune_mode)), true) = - (self.modes.prune_target_block_storage_history(tip_block_number)?, delete_limit > 0) - { - trace!( - target: "pruner", - prune_segment = ?PruneSegment::StorageHistory, - %to_block, - ?prune_mode, - "Got target block to prune" + done = done && output.done; + delete_limit = delete_limit.saturating_sub(output.pruned); + stats.insert( + PruneSegment::ContractLogs, + (PruneProgress::from_done(output.done), output.pruned), ); - - let segment_start = Instant::now(); - let (segment_progress, deleted) = - self.prune_storage_history(&provider, to_block, prune_mode, delete_limit)?; - self.metrics - .get_prune_segment_metrics(PruneSegment::StorageHistory) - .duration_seconds - .record(segment_start.elapsed()); - - done = done && segment_progress.is_finished(); - delete_limit = delete_limit.saturating_sub(deleted); - segments.insert(PruneSegment::StorageHistory, (segment_progress, deleted)); } else { - trace!( - target: "pruner", - prune_segment = ?PruneSegment::StorageHistory, - "No target block to prune" - ); + trace!(target: "pruner", segment = ?PruneSegment::ContractLogs, "No filter to prune"); } if let Some(snapshots) = highest_snapshots { @@ -295,7 +220,9 @@ impl Pruner { let segment_start = Instant::now(); let segment = segments::Headers::default(); - let output = segment.prune(&provider, PruneInput { to_block, delete_limit })?; + let previous_checkpoint = provider.get_prune_checkpoint(PruneSegment::Headers)?; + let output = segment + .prune(&provider, PruneInput { previous_checkpoint, to_block, delete_limit })?; if let Some(checkpoint) = output.checkpoint { segment .save_checkpoint(&provider, checkpoint.as_prune_checkpoint(prune_mode))?; @@ -307,7 +234,7 @@ impl Pruner { done = done && output.done; delete_limit = delete_limit.saturating_sub(output.pruned); - segments.insert( + stats.insert( PruneSegment::Headers, (PruneProgress::from_done(output.done), output.pruned), ); @@ -325,7 +252,9 @@ impl Pruner { let segment_start = Instant::now(); let segment = segments::Transactions::default(); - let output = segment.prune(&provider, PruneInput { to_block, delete_limit })?; + let previous_checkpoint = provider.get_prune_checkpoint(PruneSegment::Headers)?; + let output = segment + .prune(&provider, PruneInput { previous_checkpoint, to_block, delete_limit })?; if let Some(checkpoint) = output.checkpoint { segment .save_checkpoint(&provider, checkpoint.as_prune_checkpoint(prune_mode))?; @@ -337,7 +266,7 @@ impl Pruner { done = done && output.done; delete_limit = delete_limit.saturating_sub(output.pruned); - segments.insert( + stats.insert( PruneSegment::Transactions, (PruneProgress::from_done(output.done), output.pruned), ); @@ -356,11 +285,11 @@ impl Pruner { ?elapsed, %delete_limit, %done, - ?segments, + ?stats, "Pruner finished" ); - self.listeners.notify(PrunerEvent::Finished { tip_block_number, elapsed, segments }); + self.listeners.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats }); Ok(PruneProgress::from_done(done)) } @@ -386,650 +315,13 @@ impl Pruner { false } } - - /// Get next inclusive block range to prune according to the checkpoint, `to_block` block - /// number and `limit`. - /// - /// To get the range start (`from_block`): - /// 1. If checkpoint exists, use next block. - /// 2. If checkpoint doesn't exist, use block 0. - /// - /// To get the range end: use block `to_block`. - fn get_next_block_range_from_checkpoint( - &self, - provider: &DatabaseProviderRW<'_, DB>, - prune_segment: PruneSegment, - to_block: BlockNumber, - ) -> RethResult>> { - let from_block = provider - .get_prune_checkpoint(prune_segment)? - .and_then(|checkpoint| checkpoint.block_number) - // Checkpoint exists, prune from the next block after the highest pruned one - .map(|block_number| block_number + 1) - // No checkpoint exists, prune from genesis - .unwrap_or(0); - - let range = from_block..=to_block; - if range.is_empty() { - return Ok(None) - } - - Ok(Some(range)) - } - - /// Get next inclusive tx number range to prune according to the checkpoint and `to_block` block - /// number. - /// - /// To get the range start: - /// 1. If checkpoint exists, get next block body and return its first tx number. - /// 2. If checkpoint doesn't exist, return 0. - /// - /// To get the range end: get last tx number for the provided `to_block`. - fn get_next_tx_num_range_from_checkpoint( - &self, - provider: &DatabaseProviderRW<'_, DB>, - prune_segment: PruneSegment, - to_block: BlockNumber, - ) -> RethResult>> { - let from_tx_number = provider - .get_prune_checkpoint(prune_segment)? - // Checkpoint exists, prune from the next transaction after the highest pruned one - .and_then(|checkpoint| match checkpoint.tx_number { - Some(tx_number) => Some(tx_number + 1), - _ => { - error!(target: "pruner", %prune_segment, ?checkpoint, "Expected transaction number in prune checkpoint, found None"); - None - }, - }) - // No checkpoint exists, prune from genesis - .unwrap_or(0); - - let to_tx_number = match provider.block_body_indices(to_block)? { - Some(body) => body, - None => return Ok(None), - } - .last_tx_num(); - - let range = from_tx_number..=to_tx_number; - if range.is_empty() { - return Ok(None) - } - - Ok(Some(range)) - } - - /// Prune receipts up to the provided block, inclusive, by filtering logs. Works as in inclusion - /// list, and removes every receipt not belonging to it. Respects the batch size. - #[instrument(level = "trace", skip(self, provider), target = "pruner")] - fn prune_receipts_by_logs( - &self, - provider: &DatabaseProviderRW<'_, DB>, - tip_block_number: BlockNumber, - delete_limit: usize, - ) -> PruneSegmentResult { - // Contract log filtering removes every receipt possible except the ones in the list. So, - // for the other receipts it's as if they had a `PruneMode::Distance()` of - // `MINIMUM_PRUNING_DISTANCE`. - let to_block = PruneMode::Distance(MINIMUM_PRUNING_DISTANCE) - .prune_target_block( - tip_block_number, - MINIMUM_PRUNING_DISTANCE, - PruneSegment::ContractLogs, - )? - .map(|(bn, _)| bn) - .unwrap_or_default(); - - // Get status checkpoint from latest run - let mut last_pruned_block = provider - .get_prune_checkpoint(PruneSegment::ContractLogs)? - .and_then(|checkpoint| checkpoint.block_number); - - let initial_last_pruned_block = last_pruned_block; - - let mut from_tx_number = match initial_last_pruned_block { - Some(block) => provider - .block_body_indices(block)? - .map(|block| block.last_tx_num() + 1) - .unwrap_or(0), - None => 0, - }; - - // Figure out what receipts have already been pruned, so we can have an accurate - // `address_filter` - let address_filter = - self.modes.receipts_log_filter.group_by_block(tip_block_number, last_pruned_block)?; - - // Splits all transactions in different block ranges. Each block range will have its own - // filter address list and will check it while going through the table - // - // Example: - // For an `address_filter` such as: - // { block9: [a1, a2], block20: [a3, a4, a5] } - // - // The following structures will be created in the exact order as showed: - // `block_ranges`: [ - // (block0, block8, 0 addresses), - // (block9, block19, 2 addresses), - // (block20, to_block, 5 addresses) - // ] - // `filtered_addresses`: [a1, a2, a3, a4, a5] - // - // The first range will delete all receipts between block0 - block8 - // The second range will delete all receipts between block9 - 19, except the ones with - // emitter logs from these addresses: [a1, a2]. - // The third range will delete all receipts between block20 - to_block, except the ones with - // emitter logs from these addresses: [a1, a2, a3, a4, a5] - let mut block_ranges = vec![]; - let mut blocks_iter = address_filter.iter().peekable(); - let mut filtered_addresses = vec![]; - - while let Some((start_block, addresses)) = blocks_iter.next() { - filtered_addresses.extend_from_slice(addresses); - - // This will clear all receipts before the first appearance of a contract log or since - // the block after the last pruned one. - if block_ranges.is_empty() { - let init = last_pruned_block.map(|b| b + 1).unwrap_or_default(); - if init < *start_block { - block_ranges.push((init, *start_block - 1, 0)); - } - } - - let end_block = - blocks_iter.peek().map(|(next_block, _)| *next_block - 1).unwrap_or(to_block); - - // Addresses in lower block ranges, are still included in the inclusion list for future - // ranges. - block_ranges.push((*start_block, end_block, filtered_addresses.len())); - } - - trace!( - target: "pruner", - ?block_ranges, - ?filtered_addresses, - "Calculated block ranges and filtered addresses", - ); - - let mut limit = delete_limit; - let mut done = true; - let mut last_pruned_transaction = None; - for (start_block, end_block, num_addresses) in block_ranges { - let block_range = start_block..=end_block; - - // Calculate the transaction range from this block range - let tx_range_end = match provider.block_body_indices(end_block)? { - Some(body) => body.last_tx_num(), - None => { - trace!( - target: "pruner", - ?block_range, - "No receipts to prune." - ); - continue - } - }; - let tx_range = from_tx_number..=tx_range_end; - - // Delete receipts, except the ones in the inclusion list - let mut last_skipped_transaction = 0; - let deleted; - (deleted, done) = provider.prune_table_with_range::( - tx_range, - limit, - |(tx_num, receipt)| { - let skip = num_addresses > 0 && - receipt.logs.iter().any(|log| { - filtered_addresses[..num_addresses].contains(&&log.address) - }); - - if skip { - last_skipped_transaction = *tx_num; - } - skip - }, - |row| last_pruned_transaction = Some(row.0), - )?; - trace!(target: "pruner", %deleted, %done, ?block_range, "Pruned receipts"); - - limit = limit.saturating_sub(deleted); - - // For accurate checkpoints we need to know that we have checked every transaction. - // Example: we reached the end of the range, and the last receipt is supposed to skip - // its deletion. - last_pruned_transaction = - Some(last_pruned_transaction.unwrap_or_default().max(last_skipped_transaction)); - last_pruned_block = Some( - provider - .transaction_block(last_pruned_transaction.expect("qed"))? - .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? - // If there's more receipts to prune, set the checkpoint block number to - // previous, so we could finish pruning its receipts on the - // next run. - .saturating_sub(if done { 0 } else { 1 }), - ); - - if limit == 0 { - done &= end_block == to_block; - break - } - - from_tx_number = last_pruned_transaction.expect("qed") + 1; - } - - // If there are contracts using `PruneMode::Distance(_)` there will be receipts before - // `to_block` that become eligible to be pruned in future runs. Therefore, our checkpoint is - // not actually `to_block`, but the `lowest_block_with_distance` from any contract. - // This ensures that in future pruner runs we can prune all these receipts between the - // previous `lowest_block_with_distance` and the new one using - // `get_next_tx_num_range_from_checkpoint`. - // - // Only applies if we were able to prune everything intended for this run, otherwise the - // checkpoing is the `last_pruned_block`. - let prune_mode_block = self - .modes - .receipts_log_filter - .lowest_block_with_distance(tip_block_number, initial_last_pruned_block)? - .unwrap_or(to_block); - - provider.save_prune_checkpoint( - PruneSegment::ContractLogs, - PruneCheckpoint { - block_number: Some(prune_mode_block.min(last_pruned_block.unwrap_or(u64::MAX))), - tx_number: last_pruned_transaction, - prune_mode: PruneMode::Before(prune_mode_block), - }, - )?; - Ok((PruneProgress::from_done(done), delete_limit - limit)) - } - - /// Prune transaction lookup entries up to the provided block, inclusive, respecting the batch - /// size. - #[instrument(level = "trace", skip(self, provider), target = "pruner")] - fn prune_transaction_lookup( - &self, - provider: &DatabaseProviderRW<'_, DB>, - to_block: BlockNumber, - prune_mode: PruneMode, - delete_limit: usize, - ) -> PruneSegmentResult { - let (start, end) = match self.get_next_tx_num_range_from_checkpoint( - provider, - PruneSegment::TransactionLookup, - to_block, - )? { - Some(range) => range, - None => { - trace!(target: "pruner", "No transaction lookup entries to prune"); - return Ok((PruneProgress::Finished, 0)) - } - } - .into_inner(); - let tx_range = start..=(end.min(start + delete_limit as u64 - 1)); - let tx_range_end = *tx_range.end(); - - // Retrieve transactions in the range and calculate their hashes in parallel - let hashes = provider - .transactions_by_tx_range(tx_range.clone())? - .into_par_iter() - .map(|transaction| transaction.hash()) - .collect::>(); - - // Number of transactions retrieved from the database should match the tx range count - let tx_count = tx_range.count(); - if hashes.len() != tx_count { - return Err(PrunerError::InconsistentData( - "Unexpected number of transaction hashes retrieved by transaction number range", - )) - } - - let mut last_pruned_transaction = None; - let (deleted, _) = provider.prune_table_with_iterator::( - hashes, - delete_limit, - |row| { - last_pruned_transaction = Some(last_pruned_transaction.unwrap_or(row.1).max(row.1)) - }, - )?; - let done = tx_range_end == end; - trace!(target: "pruner", %deleted, %done, "Pruned transaction lookup"); - - let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end); - - let last_pruned_block = provider - .transaction_block(last_pruned_transaction)? - .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? - // If there's more transaction lookup entries to prune, set the checkpoint block number - // to previous, so we could finish pruning its transaction lookup entries on the next - // run. - .checked_sub(if done { 0 } else { 1 }); - - provider.save_prune_checkpoint( - PruneSegment::TransactionLookup, - PruneCheckpoint { - block_number: last_pruned_block, - tx_number: Some(last_pruned_transaction), - prune_mode, - }, - )?; - - Ok((PruneProgress::from_done(done), deleted)) - } - - /// Prune transaction senders up to the provided block, inclusive. - #[instrument(level = "trace", skip(self, provider), target = "pruner")] - fn prune_transaction_senders( - &self, - provider: &DatabaseProviderRW<'_, DB>, - to_block: BlockNumber, - prune_mode: PruneMode, - delete_limit: usize, - ) -> PruneSegmentResult { - let tx_range = match self.get_next_tx_num_range_from_checkpoint( - provider, - PruneSegment::SenderRecovery, - to_block, - )? { - Some(range) => range, - None => { - trace!(target: "pruner", "No transaction senders to prune"); - return Ok((PruneProgress::Finished, 0)) - } - }; - let tx_range_end = *tx_range.end(); - - let mut last_pruned_transaction = tx_range_end; - let (deleted, done) = provider.prune_table_with_range::( - tx_range, - delete_limit, - |_| false, - |row| last_pruned_transaction = row.0, - )?; - trace!(target: "pruner", %deleted, %done, "Pruned transaction senders"); - - let last_pruned_block = provider - .transaction_block(last_pruned_transaction)? - .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? - // If there's more transaction senders to prune, set the checkpoint block number to - // previous, so we could finish pruning its transaction senders on the next run. - .checked_sub(if done { 0 } else { 1 }); - - provider.save_prune_checkpoint( - PruneSegment::SenderRecovery, - PruneCheckpoint { - block_number: last_pruned_block, - tx_number: Some(last_pruned_transaction), - prune_mode, - }, - )?; - - Ok((PruneProgress::from_done(done), deleted)) - } - - /// Prune account history up to the provided block, inclusive. - #[instrument(level = "trace", skip(self, provider), target = "pruner")] - fn prune_account_history( - &self, - provider: &DatabaseProviderRW<'_, DB>, - to_block: BlockNumber, - prune_mode: PruneMode, - delete_limit: usize, - ) -> PruneSegmentResult { - let range = match self.get_next_block_range_from_checkpoint( - provider, - PruneSegment::AccountHistory, - to_block, - )? { - Some(range) => range, - None => { - trace!(target: "pruner", "No account history to prune"); - return Ok((PruneProgress::Finished, 0)) - } - }; - let range_end = *range.end(); - - // Half of delete limit rounded up for changesets, other half for indices - let delete_limit = (delete_limit + 1) / 2; - - let mut last_changeset_pruned_block = None; - let (deleted_changesets, done) = provider - .prune_table_with_range::( - range, - delete_limit, - |_| false, - |row| last_changeset_pruned_block = Some(row.0), - )?; - trace!(target: "pruner", deleted = %deleted_changesets, %done, "Pruned account history (changesets)"); - - let last_changeset_pruned_block = last_changeset_pruned_block - // If there's more account account changesets to prune, set the checkpoint block number - // to previous, so we could finish pruning its account changesets on the next run. - .map(|block_number| if done { Some(block_number) } else { block_number.checked_sub(1) }) - .unwrap_or(Some(range_end)); - - let mut deleted_indices = 0; - if let Some(last_changeset_pruned_block) = last_changeset_pruned_block { - let processed; - (processed, deleted_indices) = self - .prune_history_indices::( - provider, - last_changeset_pruned_block, - |a, b| a.key == b.key, - |key| ShardedKey::last(key.key), - )?; - trace!(target: "pruner", %processed, deleted = %deleted_indices, %done, "Pruned account history (history)" ); - } - - provider.save_prune_checkpoint( - PruneSegment::AccountHistory, - PruneCheckpoint { - block_number: last_changeset_pruned_block, - tx_number: None, - prune_mode, - }, - )?; - - Ok((PruneProgress::from_done(done), deleted_changesets + deleted_indices)) - } - - /// Prune storage history up to the provided block, inclusive. - #[instrument(level = "trace", skip(self, provider), target = "pruner")] - fn prune_storage_history( - &self, - provider: &DatabaseProviderRW<'_, DB>, - to_block: BlockNumber, - prune_mode: PruneMode, - delete_limit: usize, - ) -> PruneSegmentResult { - let range = match self.get_next_block_range_from_checkpoint( - provider, - PruneSegment::StorageHistory, - to_block, - )? { - Some(range) => range, - None => { - trace!(target: "pruner", "No storage history to prune"); - return Ok((PruneProgress::Finished, 0)) - } - }; - let range_end = *range.end(); - - // Half of delete limit rounded up for changesets, other half for indices - let delete_limit = (delete_limit + 1) / 2; - - let mut last_changeset_pruned_block = None; - let (deleted_changesets, done) = provider - .prune_table_with_range::( - BlockNumberAddress::range(range), - delete_limit, - |_| false, - |row| last_changeset_pruned_block = Some(row.0.block_number()), - )?; - trace!(target: "pruner", deleted = %deleted_changesets, %done, "Pruned storage history (changesets)"); - - let last_changeset_pruned_block = last_changeset_pruned_block - // If there's more account storage changesets to prune, set the checkpoint block number - // to previous, so we could finish pruning its storage changesets on the next run. - .map(|block_number| if done { Some(block_number) } else { block_number.checked_sub(1) }) - .unwrap_or(Some(range_end)); - - let mut deleted_indices = 0; - if let Some(last_changeset_pruned_block) = last_changeset_pruned_block { - let processed; - (processed, deleted_indices) = self - .prune_history_indices::( - provider, - last_changeset_pruned_block, - |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key, - |key| StorageShardedKey::last(key.address, key.sharded_key.key), - )?; - trace!(target: "pruner", %processed, deleted = %deleted_indices, %done, "Pruned storage history (history)" ); - } - - provider.save_prune_checkpoint( - PruneSegment::StorageHistory, - PruneCheckpoint { - block_number: last_changeset_pruned_block, - tx_number: None, - prune_mode, - }, - )?; - - Ok((PruneProgress::from_done(done), deleted_changesets + deleted_indices)) - } - - /// Prune history indices up to the provided block, inclusive. - /// - /// Returns total number of processed (walked) and deleted entities. - fn prune_history_indices( - &self, - provider: &DatabaseProviderRW<'_, DB>, - to_block: BlockNumber, - key_matches: impl Fn(&T::Key, &T::Key) -> bool, - last_key: impl Fn(&T::Key) -> T::Key, - ) -> Result<(usize, usize), PrunerError> - where - T: Table, - T::Key: AsRef>, - { - let mut processed = 0; - let mut deleted = 0; - let mut cursor = provider.tx_ref().cursor_write::()?; - - // Prune history table: - // 1. If the shard has `highest_block_number` less than or equal to the target block number - // for pruning, delete the shard completely. - // 2. If the shard has `highest_block_number` greater than the target block number for - // pruning, filter block numbers inside the shard which are less than the target - // block number for pruning. - while let Some(result) = cursor.next()? { - let (key, blocks): (T::Key, BlockNumberList) = result; - - // If shard consists only of block numbers less than the target one, delete shard - // completely. - if key.as_ref().highest_block_number <= to_block { - cursor.delete_current()?; - deleted += 1; - if key.as_ref().highest_block_number == to_block { - // Shard contains only block numbers up to the target one, so we can skip to - // the last shard for this key. It is guaranteed that further shards for this - // sharded key will not contain the target block number, as it's in this shard. - cursor.seek_exact(last_key(&key))?; - } - } - // Shard contains block numbers that are higher than the target one, so we need to - // filter it. It is guaranteed that further shards for this sharded key will not - // contain the target block number, as it's in this shard. - else { - let new_blocks = blocks - .iter(0) - .skip_while(|block| *block <= to_block as usize) - .collect::>(); - - // If there were blocks less than or equal to the target one - // (so the shard has changed), update the shard. - if blocks.len() != new_blocks.len() { - // If there are no more blocks in this shard, we need to remove it, as empty - // shards are not allowed. - if new_blocks.is_empty() { - if key.as_ref().highest_block_number == u64::MAX { - let prev_row = cursor.prev()?; - match prev_row { - // If current shard is the last shard for the sharded key that - // has previous shards, replace it with the previous shard. - Some((prev_key, prev_value)) if key_matches(&prev_key, &key) => { - cursor.delete_current()?; - deleted += 1; - // Upsert will replace the last shard for this sharded key with - // the previous value. - cursor.upsert(key.clone(), prev_value)?; - } - // If there's no previous shard for this sharded key, - // just delete last shard completely. - _ => { - // If we successfully moved the cursor to a previous row, - // jump to the original last shard. - if prev_row.is_some() { - cursor.next()?; - } - // Delete shard. - cursor.delete_current()?; - deleted += 1; - } - } - } - // If current shard is not the last shard for this sharded key, - // just delete it. - else { - cursor.delete_current()?; - deleted += 1; - } - } else { - cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))?; - } - } - - // Jump to the last shard for this key, if current key isn't already the last shard. - if key.as_ref().highest_block_number != u64::MAX { - cursor.seek_exact(last_key(&key))?; - } - } - - processed += 1; - } - - Ok((processed, deleted)) - } } #[cfg(test)] mod tests { use crate::Pruner; - use assert_matches::assert_matches; - use itertools::{ - FoldWhile::{Continue, Done}, - Itertools, - }; - use reth_db::{ - cursor::DbCursorRO, tables, test_utils::create_test_rw_db, transaction::DbTx, - BlockNumberList, - }; - use reth_interfaces::test_utils::{ - generators, - generators::{ - random_block_range, random_changeset_range, random_eoa_account, - random_eoa_account_range, random_log, random_receipt, - }, - }; - use reth_primitives::{ - BlockNumber, PruneCheckpoint, PruneMode, PruneModes, PruneProgress, PruneSegment, - ReceiptsLogPruneConfig, TxNumber, B256, MAINNET, - }; - use reth_provider::{PruneCheckpointReader, TransactionsProvider}; - use reth_stages::test_utils::TestTransaction; - use std::{ - collections::BTreeMap, - ops::{AddAssign, Sub}, - }; + use reth_db::test_utils::create_test_rw_db; + use reth_primitives::{PruneModes, MAINNET}; use tokio::sync::watch; #[test] @@ -1052,593 +344,4 @@ mod tests { let third_block_number = second_block_number; assert!(!pruner.is_pruning_needed(third_block_number)); } - - #[test] - fn prune_transaction_lookup() { - let tx = TestTransaction::default(); - let mut rng = generators::rng(); - - let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3); - tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); - - let mut tx_hash_numbers = Vec::new(); - for block in &blocks { - for transaction in &block.body { - tx_hash_numbers.push((transaction.hash, tx_hash_numbers.len() as u64)); - } - } - tx.insert_tx_hash_numbers(tx_hash_numbers.clone()).expect("insert tx hash numbers"); - - assert_eq!( - tx.table::().unwrap().len(), - blocks.iter().map(|block| block.body.len()).sum::() - ); - assert_eq!( - tx.table::().unwrap().len(), - tx.table::().unwrap().len() - ); - - let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| { - let prune_mode = PruneMode::Before(to_block); - let pruner = Pruner::new( - tx.inner_raw(), - MAINNET.clone(), - 1, - PruneModes { transaction_lookup: Some(prune_mode), ..Default::default() }, - // Less than total amount of transaction lookup entries to prune to test the - // batching logic - 10, - watch::channel(None).1, - ); - - let next_tx_number_to_prune = tx - .inner() - .get_prune_checkpoint(PruneSegment::TransactionLookup) - .unwrap() - .and_then(|checkpoint| checkpoint.tx_number) - .map(|tx_number| tx_number + 1) - .unwrap_or_default(); - - let last_pruned_tx_number = blocks - .iter() - .take(to_block as usize) - .map(|block| block.body.len()) - .sum::() - .min(next_tx_number_to_prune as usize + pruner.delete_limit) - .sub(1); - - let last_pruned_block_number = blocks - .iter() - .fold_while((0, 0), |(_, mut tx_count), block| { - tx_count += block.body.len(); - - if tx_count > last_pruned_tx_number { - Done((block.number, tx_count)) - } else { - Continue((block.number, tx_count)) - } - }) - .into_inner() - .0; - - let provider = tx.inner_rw(); - let result = pruner.prune_transaction_lookup( - &provider, - to_block, - prune_mode, - pruner.delete_limit, - ); - provider.commit().expect("commit"); - - assert_matches!(result, Ok(_)); - let result = result.unwrap(); - assert_eq!(result, expected_result); - - let last_pruned_block_number = - last_pruned_block_number.checked_sub(if result.0.is_finished() { 0 } else { 1 }); - - assert_eq!( - tx.table::().unwrap().len(), - tx_hash_numbers.len() - (last_pruned_tx_number + 1) - ); - assert_eq!( - tx.inner().get_prune_checkpoint(PruneSegment::TransactionLookup).unwrap(), - Some(PruneCheckpoint { - block_number: last_pruned_block_number, - tx_number: Some(last_pruned_tx_number as TxNumber), - prune_mode - }) - ); - }; - - test_prune(6, (PruneProgress::HasMoreData, 10)); - test_prune(6, (PruneProgress::Finished, 2)); - test_prune(10, (PruneProgress::Finished, 8)); - } - - #[test] - fn prune_transaction_senders() { - let tx = TestTransaction::default(); - let mut rng = generators::rng(); - - let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3); - tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); - - let mut transaction_senders = Vec::new(); - for block in &blocks { - for transaction in &block.body { - transaction_senders.push(( - transaction_senders.len() as u64, - transaction.recover_signer().expect("recover signer"), - )); - } - } - tx.insert_transaction_senders(transaction_senders.clone()) - .expect("insert transaction senders"); - - assert_eq!( - tx.table::().unwrap().len(), - blocks.iter().map(|block| block.body.len()).sum::() - ); - assert_eq!( - tx.table::().unwrap().len(), - tx.table::().unwrap().len() - ); - - let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| { - let prune_mode = PruneMode::Before(to_block); - let pruner = Pruner::new( - tx.inner_raw(), - MAINNET.clone(), - 1, - PruneModes { sender_recovery: Some(prune_mode), ..Default::default() }, - // Less than total amount of transaction senders to prune to test the batching - // logic - 10, - watch::channel(None).1, - ); - - let next_tx_number_to_prune = tx - .inner() - .get_prune_checkpoint(PruneSegment::SenderRecovery) - .unwrap() - .and_then(|checkpoint| checkpoint.tx_number) - .map(|tx_number| tx_number + 1) - .unwrap_or_default(); - - let last_pruned_tx_number = blocks - .iter() - .take(to_block as usize) - .map(|block| block.body.len()) - .sum::() - .min(next_tx_number_to_prune as usize + pruner.delete_limit) - .sub(1); - - let last_pruned_block_number = blocks - .iter() - .fold_while((0, 0), |(_, mut tx_count), block| { - tx_count += block.body.len(); - - if tx_count > last_pruned_tx_number { - Done((block.number, tx_count)) - } else { - Continue((block.number, tx_count)) - } - }) - .into_inner() - .0; - - let provider = tx.inner_rw(); - let result = pruner.prune_transaction_senders( - &provider, - to_block, - prune_mode, - pruner.delete_limit, - ); - provider.commit().expect("commit"); - - assert_matches!(result, Ok(_)); - let result = result.unwrap(); - assert_eq!(result, expected_result); - - let last_pruned_block_number = - last_pruned_block_number.checked_sub(if result.0.is_finished() { 0 } else { 1 }); - - assert_eq!( - tx.table::().unwrap().len(), - transaction_senders.len() - (last_pruned_tx_number + 1) - ); - assert_eq!( - tx.inner().get_prune_checkpoint(PruneSegment::SenderRecovery).unwrap(), - Some(PruneCheckpoint { - block_number: last_pruned_block_number, - tx_number: Some(last_pruned_tx_number as TxNumber), - prune_mode - }) - ); - }; - - test_prune(6, (PruneProgress::HasMoreData, 10)); - test_prune(6, (PruneProgress::Finished, 2)); - test_prune(10, (PruneProgress::Finished, 8)); - } - - #[test] - fn prune_account_history() { - let tx = TestTransaction::default(); - let mut rng = generators::rng(); - - let blocks = random_block_range(&mut rng, 1..=5000, B256::ZERO, 0..1); - tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); - - let accounts = - random_eoa_account_range(&mut rng, 0..2).into_iter().collect::>(); - - let (changesets, _) = random_changeset_range( - &mut rng, - blocks.iter(), - accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), - 0..0, - 0..0, - ); - tx.insert_changesets(changesets.clone(), None).expect("insert changesets"); - tx.insert_history(changesets.clone(), None).expect("insert history"); - - let account_occurrences = tx.table::().unwrap().into_iter().fold( - BTreeMap::<_, usize>::new(), - |mut map, (key, _)| { - map.entry(key.key).or_default().add_assign(1); - map - }, - ); - assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1)); - - assert_eq!( - tx.table::().unwrap().len(), - changesets.iter().flatten().count() - ); - - let original_shards = tx.table::().unwrap(); - - let test_prune = |to_block: BlockNumber, - run: usize, - expected_result: (PruneProgress, usize)| { - let prune_mode = PruneMode::Before(to_block); - let pruner = Pruner::new( - tx.inner_raw(), - MAINNET.clone(), - 1, - PruneModes { account_history: Some(prune_mode), ..Default::default() }, - // Less than total amount of entries to prune to test the batching logic - 2000, - watch::channel(None).1, - ); - - let provider = tx.inner_rw(); - let result = - pruner.prune_account_history(&provider, to_block, prune_mode, pruner.delete_limit); - provider.commit().expect("commit"); - - assert_matches!(result, Ok(_)); - let result = result.unwrap(); - assert_eq!(result, expected_result); - - let changesets = changesets - .iter() - .enumerate() - .flat_map(|(block_number, changeset)| { - changeset.iter().map(move |change| (block_number, change)) - }) - .collect::>(); - - #[allow(clippy::skip_while_next)] - let pruned = changesets - .iter() - .enumerate() - .skip_while(|(i, (block_number, _))| { - *i < pruner.delete_limit / 2 * run && *block_number <= to_block as usize - }) - .next() - .map(|(i, _)| i) - .unwrap_or_default(); - - let mut pruned_changesets = changesets - .iter() - // Skip what we've pruned so far, subtracting one to get last pruned block number - // further down - .skip(pruned.saturating_sub(1)); - - let last_pruned_block_number = pruned_changesets - .next() - .map(|(block_number, _)| if result.0.is_finished() { - *block_number - } else { - block_number.saturating_sub(1) - } as BlockNumber) - .unwrap_or(to_block); - - let pruned_changesets = pruned_changesets.fold( - BTreeMap::<_, Vec<_>>::new(), - |mut acc, (block_number, change)| { - acc.entry(block_number).or_default().push(change); - acc - }, - ); - - assert_eq!( - tx.table::().unwrap().len(), - pruned_changesets.values().flatten().count() - ); - - let actual_shards = tx.table::().unwrap(); - - let expected_shards = original_shards - .iter() - .filter(|(key, _)| key.highest_block_number > last_pruned_block_number) - .map(|(key, blocks)| { - let new_blocks = blocks - .iter(0) - .skip_while(|block| *block <= last_pruned_block_number as usize) - .collect::>(); - (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) - }) - .collect::>(); - - assert_eq!(actual_shards, expected_shards); - - assert_eq!( - tx.inner().get_prune_checkpoint(PruneSegment::AccountHistory).unwrap(), - Some(PruneCheckpoint { - block_number: Some(last_pruned_block_number), - tx_number: None, - prune_mode - }) - ); - }; - - test_prune(998, 1, (PruneProgress::HasMoreData, 1000)); - test_prune(998, 2, (PruneProgress::Finished, 998)); - test_prune(1400, 3, (PruneProgress::Finished, 804)); - } - - #[test] - fn prune_storage_history() { - let tx = TestTransaction::default(); - let mut rng = generators::rng(); - - let blocks = random_block_range(&mut rng, 0..=5000, B256::ZERO, 0..1); - tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); - - let accounts = - random_eoa_account_range(&mut rng, 0..2).into_iter().collect::>(); - - let (changesets, _) = random_changeset_range( - &mut rng, - blocks.iter(), - accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), - 2..3, - 1..2, - ); - tx.insert_changesets(changesets.clone(), None).expect("insert changesets"); - tx.insert_history(changesets.clone(), None).expect("insert history"); - - let storage_occurences = tx.table::().unwrap().into_iter().fold( - BTreeMap::<_, usize>::new(), - |mut map, (key, _)| { - map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1); - map - }, - ); - assert!(storage_occurences.into_iter().any(|(_, occurrences)| occurrences > 1)); - - assert_eq!( - tx.table::().unwrap().len(), - changesets.iter().flatten().flat_map(|(_, _, entries)| entries).count() - ); - - let original_shards = tx.table::().unwrap(); - - let test_prune = |to_block: BlockNumber, - run: usize, - expected_result: (PruneProgress, usize)| { - let prune_mode = PruneMode::Before(to_block); - let pruner = Pruner::new( - tx.inner_raw(), - MAINNET.clone(), - 1, - PruneModes { storage_history: Some(prune_mode), ..Default::default() }, - // Less than total amount of entries to prune to test the batching logic - 2000, - watch::channel(None).1, - ); - - let provider = tx.inner_rw(); - let result = - pruner.prune_storage_history(&provider, to_block, prune_mode, pruner.delete_limit); - provider.commit().expect("commit"); - - assert_matches!(result, Ok(_)); - let result = result.unwrap(); - assert_eq!(result, expected_result); - - let changesets = changesets - .iter() - .enumerate() - .flat_map(|(block_number, changeset)| { - changeset.iter().flat_map(move |(address, _, entries)| { - entries.iter().map(move |entry| (block_number, address, entry)) - }) - }) - .collect::>(); - - #[allow(clippy::skip_while_next)] - let pruned = changesets - .iter() - .enumerate() - .skip_while(|(i, (block_number, _, _))| { - *i < pruner.delete_limit / 2 * run && *block_number <= to_block as usize - }) - .next() - .map(|(i, _)| i) - .unwrap_or_default(); - - let mut pruned_changesets = changesets - .iter() - // Skip what we've pruned so far, subtracting one to get last pruned block number - // further down - .skip(pruned.saturating_sub(1)); - - let last_pruned_block_number = pruned_changesets - .next() - .map(|(block_number, _, _)| if result.0.is_finished() { - *block_number - } else { - block_number.saturating_sub(1) - } as BlockNumber) - .unwrap_or(to_block); - - let pruned_changesets = pruned_changesets.fold( - BTreeMap::<_, Vec<_>>::new(), - |mut acc, (block_number, address, entry)| { - acc.entry((block_number, address)).or_default().push(entry); - acc - }, - ); - - assert_eq!( - tx.table::().unwrap().len(), - pruned_changesets.values().flatten().count() - ); - - let actual_shards = tx.table::().unwrap(); - - let expected_shards = original_shards - .iter() - .filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number) - .map(|(key, blocks)| { - let new_blocks = blocks - .iter(0) - .skip_while(|block| *block <= last_pruned_block_number as usize) - .collect::>(); - (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) - }) - .collect::>(); - - assert_eq!(actual_shards, expected_shards); - - assert_eq!( - tx.inner().get_prune_checkpoint(PruneSegment::StorageHistory).unwrap(), - Some(PruneCheckpoint { - block_number: Some(last_pruned_block_number), - tx_number: None, - prune_mode - }) - ); - }; - - test_prune(998, 1, (PruneProgress::HasMoreData, 1000)); - test_prune(998, 2, (PruneProgress::Finished, 998)); - test_prune(1400, 3, (PruneProgress::Finished, 804)); - } - - #[test] - fn prune_receipts_by_logs() { - let tx = TestTransaction::default(); - let mut rng = generators::rng(); - - let tip = 20000; - let blocks = [ - random_block_range(&mut rng, 0..=100, B256::ZERO, 1..5), - random_block_range(&mut rng, (100 + 1)..=(tip - 100), B256::ZERO, 0..1), - random_block_range(&mut rng, (tip - 100 + 1)..=tip, B256::ZERO, 1..5), - ] - .concat(); - tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); - - let mut receipts = Vec::new(); - - let (deposit_contract_addr, _) = random_eoa_account(&mut rng); - for block in &blocks { - for (txi, transaction) in block.body.iter().enumerate() { - let mut receipt = random_receipt(&mut rng, transaction, Some(1)); - receipt.logs.push(random_log( - &mut rng, - if txi == (block.body.len() - 1) { Some(deposit_contract_addr) } else { None }, - Some(1), - )); - receipts.push((receipts.len() as u64, receipt)); - } - } - tx.insert_receipts(receipts).expect("insert receipts"); - - assert_eq!( - tx.table::().unwrap().len(), - blocks.iter().map(|block| block.body.len()).sum::() - ); - assert_eq!( - tx.table::().unwrap().len(), - tx.table::().unwrap().len() - ); - - let run_prune = || { - let provider = tx.inner_rw(); - - let prune_before_block: usize = 20; - let prune_mode = PruneMode::Before(prune_before_block as u64); - let receipts_log_filter = - ReceiptsLogPruneConfig(BTreeMap::from([(deposit_contract_addr, prune_mode)])); - let pruner = Pruner::new( - tx.inner_raw(), - MAINNET.clone(), - 5, - PruneModes { - receipts_log_filter: receipts_log_filter.clone(), - ..Default::default() - }, - // Less than total amount of receipts to prune to test the batching logic - 10, - watch::channel(None).1, - ); - - let result = pruner.prune_receipts_by_logs(&provider, tip, pruner.delete_limit); - provider.commit().expect("commit"); - - assert_matches!(result, Ok(_)); - let result = result.unwrap(); - - let (pruned_block, pruned_tx) = tx - .inner() - .get_prune_checkpoint(PruneSegment::ContractLogs) - .unwrap() - .map(|checkpoint| (checkpoint.block_number.unwrap(), checkpoint.tx_number.unwrap())) - .unwrap_or_default(); - - // All receipts are in the end of the block - let unprunable = pruned_block.saturating_sub(prune_before_block as u64 - 1); - - assert_eq!( - tx.table::().unwrap().len(), - blocks.iter().map(|block| block.body.len()).sum::() - - ((pruned_tx + 1) - unprunable) as usize - ); - - result.0.is_finished() - }; - - while !run_prune() {} - - let provider = tx.inner(); - let mut cursor = provider.tx_ref().cursor_read::().unwrap(); - let walker = cursor.walk(None).unwrap(); - for receipt in walker { - let (tx_num, receipt) = receipt.unwrap(); - - // Either we only find our contract, or the receipt is part of the unprunable receipts - // set by tip - 128 - assert!( - receipt.logs.iter().any(|l| l.address == deposit_contract_addr) || - provider.transaction_block(tx_num).unwrap().unwrap() > tip - 128, - ); - } - } } diff --git a/crates/prune/src/segments/account_history.rs b/crates/prune/src/segments/account_history.rs new file mode 100644 index 000000000000..ba8ffb59e722 --- /dev/null +++ b/crates/prune/src/segments/account_history.rs @@ -0,0 +1,226 @@ +use crate::{ + segments::{ + history::prune_history_indices, PruneInput, PruneOutput, PruneOutputCheckpoint, Segment, + }, + PrunerError, +}; +use reth_db::{database::Database, models::ShardedKey, tables}; +use reth_primitives::PruneSegment; +use reth_provider::DatabaseProviderRW; +use tracing::{instrument, trace}; + +#[derive(Default)] +#[non_exhaustive] +pub(crate) struct AccountHistory; + +impl Segment for AccountHistory { + fn segment(&self) -> PruneSegment { + PruneSegment::AccountHistory + } + + #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] + fn prune( + &self, + provider: &DatabaseProviderRW<'_, DB>, + input: PruneInput, + ) -> Result { + let range = match input.get_next_block_range() { + Some(range) => range, + None => { + trace!(target: "pruner", "No account history to prune"); + return Ok(PruneOutput::done()) + } + }; + let range_end = *range.end(); + + let mut last_changeset_pruned_block = None; + let (pruned_changesets, done) = provider + .prune_table_with_range::( + range, + input.delete_limit / 2, + |_| false, + |row| last_changeset_pruned_block = Some(row.0), + )?; + trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets)"); + + let last_changeset_pruned_block = last_changeset_pruned_block + // If there's more account account changesets to prune, set the checkpoint block number + // to previous, so we could finish pruning its account changesets on the next run. + .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) + .unwrap_or(range_end); + + let (processed, pruned_indices) = prune_history_indices::( + provider, + last_changeset_pruned_block, + |a, b| a.key == b.key, + |key| ShardedKey::last(key.key), + )?; + trace!(target: "pruner", %processed, pruned = %pruned_indices, %done, "Pruned account history (history)" ); + + Ok(PruneOutput { + done, + pruned: pruned_changesets + pruned_indices, + checkpoint: Some(PruneOutputCheckpoint { + block_number: Some(last_changeset_pruned_block), + tx_number: None, + }), + }) + } +} + +#[cfg(test)] +mod tests { + use crate::segments::{AccountHistory, PruneInput, PruneOutput, Segment}; + use assert_matches::assert_matches; + use reth_db::{tables, BlockNumberList}; + use reth_interfaces::test_utils::{ + generators, + generators::{random_block_range, random_changeset_range, random_eoa_account_range}, + }; + use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256}; + use reth_provider::PruneCheckpointReader; + use reth_stages::test_utils::TestTransaction; + use std::{collections::BTreeMap, ops::AddAssign}; + + #[test] + fn prune() { + let tx = TestTransaction::default(); + let mut rng = generators::rng(); + + let blocks = random_block_range(&mut rng, 1..=5000, B256::ZERO, 0..1); + tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); + + let accounts = + random_eoa_account_range(&mut rng, 0..2).into_iter().collect::>(); + + let (changesets, _) = random_changeset_range( + &mut rng, + blocks.iter(), + accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), + 0..0, + 0..0, + ); + tx.insert_changesets(changesets.clone(), None).expect("insert changesets"); + tx.insert_history(changesets.clone(), None).expect("insert history"); + + let account_occurrences = tx.table::().unwrap().into_iter().fold( + BTreeMap::<_, usize>::new(), + |mut map, (key, _)| { + map.entry(key.key).or_default().add_assign(1); + map + }, + ); + assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1)); + + assert_eq!( + tx.table::().unwrap().len(), + changesets.iter().flatten().count() + ); + + let original_shards = tx.table::().unwrap(); + + let test_prune = |to_block: BlockNumber, run: usize, expected_result: (bool, usize)| { + let prune_mode = PruneMode::Before(to_block); + let input = PruneInput { + previous_checkpoint: tx + .inner() + .get_prune_checkpoint(PruneSegment::AccountHistory) + .unwrap(), + to_block, + delete_limit: 2000, + }; + let segment = AccountHistory::default(); + + let provider = tx.inner_rw(); + let result = segment.prune(&provider, input).unwrap(); + assert_matches!( + result, + PruneOutput {done, pruned, checkpoint: Some(_)} + if (done, pruned) == expected_result + ); + segment + .save_checkpoint( + &provider, + result.checkpoint.unwrap().as_prune_checkpoint(prune_mode), + ) + .unwrap(); + provider.commit().expect("commit"); + + let changesets = changesets + .iter() + .enumerate() + .flat_map(|(block_number, changeset)| { + changeset.iter().map(move |change| (block_number, change)) + }) + .collect::>(); + + #[allow(clippy::skip_while_next)] + let pruned = changesets + .iter() + .enumerate() + .skip_while(|(i, (block_number, _))| { + *i < input.delete_limit / 2 * run && *block_number <= to_block as usize + }) + .next() + .map(|(i, _)| i) + .unwrap_or_default(); + + let mut pruned_changesets = changesets + .iter() + // Skip what we've pruned so far, subtracting one to get last pruned block number + // further down + .skip(pruned.saturating_sub(1)); + + let last_pruned_block_number = pruned_changesets + .next() + .map(|(block_number, _)| if result.done { + *block_number + } else { + block_number.saturating_sub(1) + } as BlockNumber) + .unwrap_or(to_block); + + let pruned_changesets = pruned_changesets.fold( + BTreeMap::<_, Vec<_>>::new(), + |mut acc, (block_number, change)| { + acc.entry(block_number).or_default().push(change); + acc + }, + ); + + assert_eq!( + tx.table::().unwrap().len(), + pruned_changesets.values().flatten().count() + ); + + let actual_shards = tx.table::().unwrap(); + + let expected_shards = original_shards + .iter() + .filter(|(key, _)| key.highest_block_number > last_pruned_block_number) + .map(|(key, blocks)| { + let new_blocks = blocks + .iter(0) + .skip_while(|block| *block <= last_pruned_block_number as usize) + .collect::>(); + (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) + }) + .collect::>(); + + assert_eq!(actual_shards, expected_shards); + + assert_eq!( + tx.inner().get_prune_checkpoint(PruneSegment::AccountHistory).unwrap(), + Some(PruneCheckpoint { + block_number: Some(last_pruned_block_number), + tx_number: None, + prune_mode + }) + ); + }; + + test_prune(998, 1, (false, 1000)); + test_prune(998, 2, (true, 998)); + test_prune(1400, 3, (true, 804)); + } +} diff --git a/crates/prune/src/segments/headers.rs b/crates/prune/src/segments/headers.rs index 91ae3fa81d4d..3ede8ff5842e 100644 --- a/crates/prune/src/segments/headers.rs +++ b/crates/prune/src/segments/headers.rs @@ -14,16 +14,18 @@ use tracing::{instrument, trace}; #[non_exhaustive] pub(crate) struct Headers; -impl Segment for Headers { - const SEGMENT: PruneSegment = PruneSegment::Headers; +impl Segment for Headers { + fn segment(&self) -> PruneSegment { + PruneSegment::Headers + } #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] - fn prune( + fn prune( &self, provider: &DatabaseProviderRW<'_, DB>, input: PruneInput, ) -> Result { - let block_range = match input.get_next_block_range(provider, Self::SEGMENT)? { + let block_range = match input.get_next_block_range() { Some(range) => range, None => { trace!(target: "pruner", "No headers to prune"); @@ -101,7 +103,7 @@ mod tests { use assert_matches::assert_matches; use reth_db::tables; use reth_interfaces::test_utils::{generators, generators::random_header_range}; - use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, B256}; + use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256}; use reth_provider::PruneCheckpointReader; use reth_stages::test_utils::TestTransaction; @@ -119,12 +121,19 @@ mod tests { let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| { let prune_mode = PruneMode::Before(to_block); - let input = PruneInput { to_block, delete_limit: 10 }; + let input = PruneInput { + previous_checkpoint: tx + .inner() + .get_prune_checkpoint(PruneSegment::Headers) + .unwrap(), + to_block, + delete_limit: 10, + }; let segment = Headers::default(); let next_block_number_to_prune = tx .inner() - .get_prune_checkpoint(Headers::SEGMENT) + .get_prune_checkpoint(PruneSegment::Headers) .unwrap() .and_then(|checkpoint| checkpoint.block_number) .map(|block_number| block_number + 1) @@ -161,7 +170,7 @@ mod tests { headers.len() - (last_pruned_block_number + 1) as usize ); assert_eq!( - tx.inner().get_prune_checkpoint(Headers::SEGMENT).unwrap(), + tx.inner().get_prune_checkpoint(PruneSegment::Headers).unwrap(), Some(PruneCheckpoint { block_number: Some(last_pruned_block_number), tx_number: None, @@ -179,6 +188,7 @@ mod tests { let tx = TestTransaction::default(); let input = PruneInput { + previous_checkpoint: None, to_block: 1, // Less than total number of tables for `Headers` segment delete_limit: 2, diff --git a/crates/prune/src/segments/history.rs b/crates/prune/src/segments/history.rs new file mode 100644 index 000000000000..bb3352a396a0 --- /dev/null +++ b/crates/prune/src/segments/history.rs @@ -0,0 +1,112 @@ +use reth_db::{ + cursor::{DbCursorRO, DbCursorRW}, + database::Database, + models::ShardedKey, + table::Table, + transaction::DbTxMut, + BlockNumberList, +}; +use reth_interfaces::db::DatabaseError; +use reth_primitives::BlockNumber; +use reth_provider::DatabaseProviderRW; + +/// Prune history indices up to the provided block, inclusive. +/// +/// Returns total number of processed (walked) and deleted entities. +pub(crate) fn prune_history_indices( + provider: &DatabaseProviderRW<'_, DB>, + to_block: BlockNumber, + key_matches: impl Fn(&T::Key, &T::Key) -> bool, + last_key: impl Fn(&T::Key) -> T::Key, +) -> Result<(usize, usize), DatabaseError> +where + DB: Database, + T: Table, + T::Key: AsRef>, +{ + let mut processed = 0; + let mut deleted = 0; + let mut cursor = provider.tx_ref().cursor_write::()?; + + // Prune history table: + // 1. If the shard has `highest_block_number` less than or equal to the target block number + // for pruning, delete the shard completely. + // 2. If the shard has `highest_block_number` greater than the target block number for + // pruning, filter block numbers inside the shard which are less than the target + // block number for pruning. + while let Some(result) = cursor.next()? { + let (key, blocks): (T::Key, BlockNumberList) = result; + + // If shard consists only of block numbers less than the target one, delete shard + // completely. + if key.as_ref().highest_block_number <= to_block { + cursor.delete_current()?; + deleted += 1; + if key.as_ref().highest_block_number == to_block { + // Shard contains only block numbers up to the target one, so we can skip to + // the last shard for this key. It is guaranteed that further shards for this + // sharded key will not contain the target block number, as it's in this shard. + cursor.seek_exact(last_key(&key))?; + } + } + // Shard contains block numbers that are higher than the target one, so we need to + // filter it. It is guaranteed that further shards for this sharded key will not + // contain the target block number, as it's in this shard. + else { + let new_blocks = + blocks.iter(0).skip_while(|block| *block <= to_block as usize).collect::>(); + + // If there were blocks less than or equal to the target one + // (so the shard has changed), update the shard. + if blocks.len() != new_blocks.len() { + // If there are no more blocks in this shard, we need to remove it, as empty + // shards are not allowed. + if new_blocks.is_empty() { + if key.as_ref().highest_block_number == u64::MAX { + let prev_row = cursor.prev()?; + match prev_row { + // If current shard is the last shard for the sharded key that + // has previous shards, replace it with the previous shard. + Some((prev_key, prev_value)) if key_matches(&prev_key, &key) => { + cursor.delete_current()?; + deleted += 1; + // Upsert will replace the last shard for this sharded key with + // the previous value. + cursor.upsert(key.clone(), prev_value)?; + } + // If there's no previous shard for this sharded key, + // just delete last shard completely. + _ => { + // If we successfully moved the cursor to a previous row, + // jump to the original last shard. + if prev_row.is_some() { + cursor.next()?; + } + // Delete shard. + cursor.delete_current()?; + deleted += 1; + } + } + } + // If current shard is not the last shard for this sharded key, + // just delete it. + else { + cursor.delete_current()?; + deleted += 1; + } + } else { + cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))?; + } + } + + // Jump to the last shard for this key, if current key isn't already the last shard. + if key.as_ref().highest_block_number != u64::MAX { + cursor.seek_exact(last_key(&key))?; + } + } + + processed += 1; + } + + Ok((processed, deleted)) +} diff --git a/crates/prune/src/segments/mod.rs b/crates/prune/src/segments/mod.rs index 1446351aea32..06b4c6988892 100644 --- a/crates/prune/src/segments/mod.rs +++ b/crates/prune/src/segments/mod.rs @@ -1,18 +1,27 @@ +mod account_history; mod headers; +mod history; mod receipts; +mod receipts_by_logs; +mod sender_recovery; +mod storage_history; +mod transaction_lookup; mod transactions; +pub(crate) use account_history::AccountHistory; pub(crate) use headers::Headers; pub(crate) use receipts::Receipts; +pub(crate) use receipts_by_logs::ReceiptsByLogs; +pub(crate) use sender_recovery::SenderRecovery; +pub(crate) use storage_history::StorageHistory; +pub(crate) use transaction_lookup::TransactionLookup; pub(crate) use transactions::Transactions; use crate::PrunerError; use reth_db::database::Database; use reth_interfaces::RethResult; use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber}; -use reth_provider::{ - BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, -}; +use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointWriter}; use std::ops::RangeInclusive; use tracing::error; @@ -23,30 +32,30 @@ use tracing::error; /// 2. If [Segment::prune] returned a [Some] in `checkpoint` of [PruneOutput], call /// [Segment::save_checkpoint]. /// 3. Subtract `pruned` of [PruneOutput] from `delete_limit` of next [PruneInput]. -pub(crate) trait Segment { - /// Segment of the data that's pruned. - const SEGMENT: PruneSegment; +pub(crate) trait Segment { + fn segment(&self) -> PruneSegment; - /// Prune data for [Self::SEGMENT] using the provided input. - fn prune( + /// Prune data for [Self::segment] using the provided input. + fn prune( &self, provider: &DatabaseProviderRW<'_, DB>, input: PruneInput, ) -> Result; - /// Save checkpoint for [Self::SEGMENT] to the database. - fn save_checkpoint( + /// Save checkpoint for [Self::segment] to the database. + fn save_checkpoint( &self, provider: &DatabaseProviderRW<'_, DB>, checkpoint: PruneCheckpoint, ) -> RethResult<()> { - provider.save_prune_checkpoint(Self::SEGMENT, checkpoint) + provider.save_prune_checkpoint(self.segment(), checkpoint) } } /// Segment pruning input, see [Segment::prune]. #[derive(Debug, Clone, Copy)] pub(crate) struct PruneInput { + pub(crate) previous_checkpoint: Option, /// Target block up to which the pruning needs to be done, inclusive. pub(crate) to_block: BlockNumber, /// Maximum entries to delete from the database. @@ -62,18 +71,16 @@ impl PruneInput { /// 2. If checkpoint doesn't exist, return 0. /// /// To get the range end: get last tx number for `to_block`. - pub(crate) fn get_next_tx_num_range_from_checkpoint( + pub(crate) fn get_next_tx_num_range( &self, provider: &DatabaseProviderRW<'_, DB>, - segment: PruneSegment, ) -> RethResult>> { - let from_tx_number = provider - .get_prune_checkpoint(segment)? + let from_tx_number = self.previous_checkpoint // Checkpoint exists, prune from the next transaction after the highest pruned one .and_then(|checkpoint| match checkpoint.tx_number { Some(tx_number) => Some(tx_number + 1), _ => { - error!(target: "pruner", %segment, ?checkpoint, "Expected transaction number in prune checkpoint, found None"); + error!(target: "pruner", ?checkpoint, "Expected transaction number in prune checkpoint, found None"); None }, }) @@ -102,13 +109,9 @@ impl PruneInput { /// 2. If checkpoint doesn't exist, use block 0. /// /// To get the range end: use block `to_block`. - pub(crate) fn get_next_block_range( - &self, - provider: &DatabaseProviderRW<'_, DB>, - segment: PruneSegment, - ) -> RethResult>> { - let from_block = provider - .get_prune_checkpoint(segment)? + pub(crate) fn get_next_block_range(&self) -> Option> { + let from_block = self + .previous_checkpoint .and_then(|checkpoint| checkpoint.block_number) // Checkpoint exists, prune from the next block after the highest pruned one .map(|block_number| block_number + 1) @@ -117,10 +120,10 @@ impl PruneInput { let range = from_block..=self.to_block; if range.is_empty() { - return Ok(None) + return None } - Ok(Some(range)) + Some(range) } } diff --git a/crates/prune/src/segments/receipts.rs b/crates/prune/src/segments/receipts.rs index ee63a08c6f72..33f1824b1133 100644 --- a/crates/prune/src/segments/receipts.rs +++ b/crates/prune/src/segments/receipts.rs @@ -12,16 +12,18 @@ use tracing::{instrument, trace}; #[non_exhaustive] pub(crate) struct Receipts; -impl Segment for Receipts { - const SEGMENT: PruneSegment = PruneSegment::Receipts; +impl Segment for Receipts { + fn segment(&self) -> PruneSegment { + PruneSegment::Receipts + } #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] - fn prune( + fn prune( &self, provider: &DatabaseProviderRW<'_, DB>, input: PruneInput, ) -> Result { - let tx_range = match input.get_next_tx_num_range_from_checkpoint(provider, Self::SEGMENT)? { + let tx_range = match input.get_next_tx_num_range(provider)? { Some(range) => range, None => { trace!(target: "pruner", "No receipts to prune"); @@ -56,14 +58,14 @@ impl Segment for Receipts { }) } - fn save_checkpoint( + fn save_checkpoint( &self, provider: &DatabaseProviderRW<'_, DB>, checkpoint: PruneCheckpoint, ) -> RethResult<()> { - provider.save_prune_checkpoint(Self::SEGMENT, checkpoint)?; + provider.save_prune_checkpoint(PruneSegment::Receipts, checkpoint)?; - // `PruneSegment::Receipts` overrides `PruneSegmnt::ContractLogs`, so we can preemptively + // `PruneSegment::Receipts` overrides `PruneSegment::ContractLogs`, so we can preemptively // limit their pruning start point. provider.save_prune_checkpoint(PruneSegment::ContractLogs, checkpoint)?; @@ -84,7 +86,7 @@ mod tests { generators, generators::{random_block_range, random_receipt}, }; - use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, TxNumber, B256}; + use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256}; use reth_provider::PruneCheckpointReader; use reth_stages::test_utils::TestTransaction; use std::ops::Sub; @@ -117,12 +119,19 @@ mod tests { let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| { let prune_mode = PruneMode::Before(to_block); - let input = PruneInput { to_block, delete_limit: 10 }; + let input = PruneInput { + previous_checkpoint: tx + .inner() + .get_prune_checkpoint(PruneSegment::Receipts) + .unwrap(), + to_block, + delete_limit: 10, + }; let segment = Receipts::default(); let next_tx_number_to_prune = tx .inner() - .get_prune_checkpoint(Receipts::SEGMENT) + .get_prune_checkpoint(PruneSegment::Receipts) .unwrap() .and_then(|checkpoint| checkpoint.tx_number) .map(|tx_number| tx_number + 1) @@ -171,7 +180,7 @@ mod tests { receipts.len() - (last_pruned_tx_number + 1) ); assert_eq!( - tx.inner().get_prune_checkpoint(Receipts::SEGMENT).unwrap(), + tx.inner().get_prune_checkpoint(PruneSegment::Receipts).unwrap(), Some(PruneCheckpoint { block_number: last_pruned_block_number, tx_number: Some(last_pruned_tx_number as TxNumber), diff --git a/crates/prune/src/segments/receipts_by_logs.rs b/crates/prune/src/segments/receipts_by_logs.rs new file mode 100644 index 000000000000..93c775c02991 --- /dev/null +++ b/crates/prune/src/segments/receipts_by_logs.rs @@ -0,0 +1,305 @@ +use crate::{segments::PruneOutput, PrunerError}; +use reth_db::{database::Database, tables}; +use reth_primitives::{ + BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, ReceiptsLogPruneConfig, + MINIMUM_PRUNING_DISTANCE, +}; +use reth_provider::{ + BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, + TransactionsProvider, +}; +use tracing::{instrument, trace}; + +#[derive(Default)] +#[non_exhaustive] +pub(crate) struct ReceiptsByLogs; + +impl ReceiptsByLogs { + /// Prune receipts up to the provided block, inclusive, by filtering logs. Works as in inclusion + /// list, and removes every receipt not belonging to it. Respects the batch size. + #[instrument(level = "trace", skip(self, provider), target = "pruner")] + pub(crate) fn prune( + &self, + provider: &DatabaseProviderRW<'_, DB>, + receipts_log_filter: &ReceiptsLogPruneConfig, + tip_block_number: BlockNumber, + delete_limit: usize, + ) -> Result { + // Contract log filtering removes every receipt possible except the ones in the list. So, + // for the other receipts it's as if they had a `PruneMode::Distance()` of + // `MINIMUM_PRUNING_DISTANCE`. + let to_block = PruneMode::Distance(MINIMUM_PRUNING_DISTANCE) + .prune_target_block( + tip_block_number, + MINIMUM_PRUNING_DISTANCE, + PruneSegment::ContractLogs, + )? + .map(|(bn, _)| bn) + .unwrap_or_default(); + + // Get status checkpoint from latest run + let mut last_pruned_block = provider + .get_prune_checkpoint(PruneSegment::ContractLogs)? + .and_then(|checkpoint| checkpoint.block_number); + + let initial_last_pruned_block = last_pruned_block; + + let mut from_tx_number = match initial_last_pruned_block { + Some(block) => provider + .block_body_indices(block)? + .map(|block| block.last_tx_num() + 1) + .unwrap_or(0), + None => 0, + }; + + // Figure out what receipts have already been pruned, so we can have an accurate + // `address_filter` + let address_filter = + receipts_log_filter.group_by_block(tip_block_number, last_pruned_block)?; + + // Splits all transactions in different block ranges. Each block range will have its own + // filter address list and will check it while going through the table + // + // Example: + // For an `address_filter` such as: + // { block9: [a1, a2], block20: [a3, a4, a5] } + // + // The following structures will be created in the exact order as showed: + // `block_ranges`: [ + // (block0, block8, 0 addresses), + // (block9, block19, 2 addresses), + // (block20, to_block, 5 addresses) + // ] + // `filtered_addresses`: [a1, a2, a3, a4, a5] + // + // The first range will delete all receipts between block0 - block8 + // The second range will delete all receipts between block9 - 19, except the ones with + // emitter logs from these addresses: [a1, a2]. + // The third range will delete all receipts between block20 - to_block, except the ones with + // emitter logs from these addresses: [a1, a2, a3, a4, a5] + let mut block_ranges = vec![]; + let mut blocks_iter = address_filter.iter().peekable(); + let mut filtered_addresses = vec![]; + + while let Some((start_block, addresses)) = blocks_iter.next() { + filtered_addresses.extend_from_slice(addresses); + + // This will clear all receipts before the first appearance of a contract log or since + // the block after the last pruned one. + if block_ranges.is_empty() { + let init = last_pruned_block.map(|b| b + 1).unwrap_or_default(); + if init < *start_block { + block_ranges.push((init, *start_block - 1, 0)); + } + } + + let end_block = + blocks_iter.peek().map(|(next_block, _)| *next_block - 1).unwrap_or(to_block); + + // Addresses in lower block ranges, are still included in the inclusion list for future + // ranges. + block_ranges.push((*start_block, end_block, filtered_addresses.len())); + } + + trace!( + target: "pruner", + ?block_ranges, + ?filtered_addresses, + "Calculated block ranges and filtered addresses", + ); + + let mut limit = delete_limit; + let mut done = true; + let mut last_pruned_transaction = None; + for (start_block, end_block, num_addresses) in block_ranges { + let block_range = start_block..=end_block; + + // Calculate the transaction range from this block range + let tx_range_end = match provider.block_body_indices(end_block)? { + Some(body) => body.last_tx_num(), + None => { + trace!( + target: "pruner", + ?block_range, + "No receipts to prune." + ); + continue + } + }; + let tx_range = from_tx_number..=tx_range_end; + + // Delete receipts, except the ones in the inclusion list + let mut last_skipped_transaction = 0; + let deleted; + (deleted, done) = provider.prune_table_with_range::( + tx_range, + limit, + |(tx_num, receipt)| { + let skip = num_addresses > 0 && + receipt.logs.iter().any(|log| { + filtered_addresses[..num_addresses].contains(&&log.address) + }); + + if skip { + last_skipped_transaction = *tx_num; + } + skip + }, + |row| last_pruned_transaction = Some(row.0), + )?; + trace!(target: "pruner", %deleted, %done, ?block_range, "Pruned receipts"); + + limit = limit.saturating_sub(deleted); + + // For accurate checkpoints we need to know that we have checked every transaction. + // Example: we reached the end of the range, and the last receipt is supposed to skip + // its deletion. + last_pruned_transaction = + Some(last_pruned_transaction.unwrap_or_default().max(last_skipped_transaction)); + last_pruned_block = Some( + provider + .transaction_block(last_pruned_transaction.expect("qed"))? + .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? + // If there's more receipts to prune, set the checkpoint block number to + // previous, so we could finish pruning its receipts on the + // next run. + .saturating_sub(if done { 0 } else { 1 }), + ); + + if limit == 0 { + done &= end_block == to_block; + break + } + + from_tx_number = last_pruned_transaction.expect("qed") + 1; + } + + // If there are contracts using `PruneMode::Distance(_)` there will be receipts before + // `to_block` that become eligible to be pruned in future runs. Therefore, our checkpoint is + // not actually `to_block`, but the `lowest_block_with_distance` from any contract. + // This ensures that in future pruner runs we can prune all these receipts between the + // previous `lowest_block_with_distance` and the new one using + // `get_next_tx_num_range_from_checkpoint`. + // + // Only applies if we were able to prune everything intended for this run, otherwise the + // checkpoint is the `last_pruned_block`. + let prune_mode_block = receipts_log_filter + .lowest_block_with_distance(tip_block_number, initial_last_pruned_block)? + .unwrap_or(to_block); + + provider.save_prune_checkpoint( + PruneSegment::ContractLogs, + PruneCheckpoint { + block_number: Some(prune_mode_block.min(last_pruned_block.unwrap_or(u64::MAX))), + tx_number: last_pruned_transaction, + prune_mode: PruneMode::Before(prune_mode_block), + }, + )?; + + Ok(PruneOutput { done, pruned: delete_limit - limit, checkpoint: None }) + } +} + +#[cfg(test)] +mod tests { + use crate::segments::receipts_by_logs::ReceiptsByLogs; + use assert_matches::assert_matches; + use reth_db::{cursor::DbCursorRO, tables, transaction::DbTx}; + use reth_interfaces::test_utils::{ + generators, + generators::{random_block_range, random_eoa_account, random_log, random_receipt}, + }; + use reth_primitives::{PruneMode, PruneSegment, ReceiptsLogPruneConfig, B256}; + use reth_provider::{PruneCheckpointReader, TransactionsProvider}; + use reth_stages::test_utils::TestTransaction; + use std::collections::BTreeMap; + + #[test] + fn prune_receipts_by_logs() { + let tx = TestTransaction::default(); + let mut rng = generators::rng(); + + let tip = 20000; + let blocks = [ + random_block_range(&mut rng, 0..=100, B256::ZERO, 1..5), + random_block_range(&mut rng, (100 + 1)..=(tip - 100), B256::ZERO, 0..1), + random_block_range(&mut rng, (tip - 100 + 1)..=tip, B256::ZERO, 1..5), + ] + .concat(); + tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); + + let mut receipts = Vec::new(); + + let (deposit_contract_addr, _) = random_eoa_account(&mut rng); + for block in &blocks { + for (txi, transaction) in block.body.iter().enumerate() { + let mut receipt = random_receipt(&mut rng, transaction, Some(1)); + receipt.logs.push(random_log( + &mut rng, + if txi == (block.body.len() - 1) { Some(deposit_contract_addr) } else { None }, + Some(1), + )); + receipts.push((receipts.len() as u64, receipt)); + } + } + tx.insert_receipts(receipts).expect("insert receipts"); + + assert_eq!( + tx.table::().unwrap().len(), + blocks.iter().map(|block| block.body.len()).sum::() + ); + assert_eq!( + tx.table::().unwrap().len(), + tx.table::().unwrap().len() + ); + + let run_prune = || { + let provider = tx.inner_rw(); + + let prune_before_block: usize = 20; + let prune_mode = PruneMode::Before(prune_before_block as u64); + let receipts_log_filter = + ReceiptsLogPruneConfig(BTreeMap::from([(deposit_contract_addr, prune_mode)])); + + let result = ReceiptsByLogs::default().prune(&provider, &receipts_log_filter, tip, 10); + provider.commit().expect("commit"); + + assert_matches!(result, Ok(_)); + let output = result.unwrap(); + + let (pruned_block, pruned_tx) = tx + .inner() + .get_prune_checkpoint(PruneSegment::ContractLogs) + .unwrap() + .map(|checkpoint| (checkpoint.block_number.unwrap(), checkpoint.tx_number.unwrap())) + .unwrap_or_default(); + + // All receipts are in the end of the block + let unprunable = pruned_block.saturating_sub(prune_before_block as u64 - 1); + + assert_eq!( + tx.table::().unwrap().len(), + blocks.iter().map(|block| block.body.len()).sum::() - + ((pruned_tx + 1) - unprunable) as usize + ); + + output.done + }; + + while !run_prune() {} + + let provider = tx.inner(); + let mut cursor = provider.tx_ref().cursor_read::().unwrap(); + let walker = cursor.walk(None).unwrap(); + for receipt in walker { + let (tx_num, receipt) = receipt.unwrap(); + + // Either we only find our contract, or the receipt is part of the unprunable receipts + // set by tip - 128 + assert!( + receipt.logs.iter().any(|l| l.address == deposit_contract_addr) || + provider.transaction_block(tx_num).unwrap().unwrap() > tip - 128, + ); + } + } +} diff --git a/crates/prune/src/segments/sender_recovery.rs b/crates/prune/src/segments/sender_recovery.rs new file mode 100644 index 000000000000..830f9fd84adb --- /dev/null +++ b/crates/prune/src/segments/sender_recovery.rs @@ -0,0 +1,183 @@ +use crate::{ + segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment}, + PrunerError, +}; +use reth_db::{database::Database, tables}; +use reth_primitives::PruneSegment; +use reth_provider::{DatabaseProviderRW, TransactionsProvider}; +use tracing::{instrument, trace}; + +#[derive(Default)] +#[non_exhaustive] +pub(crate) struct SenderRecovery; + +impl Segment for SenderRecovery { + fn segment(&self) -> PruneSegment { + PruneSegment::SenderRecovery + } + + #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] + fn prune( + &self, + provider: &DatabaseProviderRW<'_, DB>, + input: PruneInput, + ) -> Result { + let tx_range = match input.get_next_tx_num_range(provider)? { + Some(range) => range, + None => { + trace!(target: "pruner", "No transaction senders to prune"); + return Ok(PruneOutput::done()) + } + }; + let tx_range_end = *tx_range.end(); + + let mut last_pruned_transaction = tx_range_end; + let (pruned, done) = provider.prune_table_with_range::( + tx_range, + input.delete_limit, + |_| false, + |row| last_pruned_transaction = row.0, + )?; + trace!(target: "pruner", %pruned, %done, "Pruned transaction senders"); + + let last_pruned_block = provider + .transaction_block(last_pruned_transaction)? + .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? + // If there's more transaction senders to prune, set the checkpoint block number to + // previous, so we could finish pruning its transaction senders on the next run. + .checked_sub(if done { 0 } else { 1 }); + + Ok(PruneOutput { + done, + pruned, + checkpoint: Some(PruneOutputCheckpoint { + block_number: last_pruned_block, + tx_number: Some(last_pruned_transaction), + }), + }) + } +} + +#[cfg(test)] +mod tests { + use crate::segments::{PruneInput, PruneOutput, Segment, SenderRecovery}; + use assert_matches::assert_matches; + use itertools::{ + FoldWhile::{Continue, Done}, + Itertools, + }; + use reth_db::tables; + use reth_interfaces::test_utils::{generators, generators::random_block_range}; + use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256}; + use reth_provider::PruneCheckpointReader; + use reth_stages::test_utils::TestTransaction; + use std::ops::Sub; + + #[test] + fn prune() { + let tx = TestTransaction::default(); + let mut rng = generators::rng(); + + let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3); + tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); + + let mut transaction_senders = Vec::new(); + for block in &blocks { + for transaction in &block.body { + transaction_senders.push(( + transaction_senders.len() as u64, + transaction.recover_signer().expect("recover signer"), + )); + } + } + tx.insert_transaction_senders(transaction_senders.clone()) + .expect("insert transaction senders"); + + assert_eq!( + tx.table::().unwrap().len(), + blocks.iter().map(|block| block.body.len()).sum::() + ); + assert_eq!( + tx.table::().unwrap().len(), + tx.table::().unwrap().len() + ); + + let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| { + let prune_mode = PruneMode::Before(to_block); + let input = PruneInput { + previous_checkpoint: tx + .inner() + .get_prune_checkpoint(PruneSegment::SenderRecovery) + .unwrap(), + to_block, + delete_limit: 10, + }; + let segment = SenderRecovery::default(); + + let next_tx_number_to_prune = tx + .inner() + .get_prune_checkpoint(PruneSegment::SenderRecovery) + .unwrap() + .and_then(|checkpoint| checkpoint.tx_number) + .map(|tx_number| tx_number + 1) + .unwrap_or_default(); + + let last_pruned_tx_number = blocks + .iter() + .take(to_block as usize) + .map(|block| block.body.len()) + .sum::() + .min(next_tx_number_to_prune as usize + input.delete_limit) + .sub(1); + + let last_pruned_block_number = blocks + .iter() + .fold_while((0, 0), |(_, mut tx_count), block| { + tx_count += block.body.len(); + + if tx_count > last_pruned_tx_number { + Done((block.number, tx_count)) + } else { + Continue((block.number, tx_count)) + } + }) + .into_inner() + .0; + + let provider = tx.inner_rw(); + let result = segment.prune(&provider, input).unwrap(); + assert_matches!( + result, + PruneOutput {done, pruned, checkpoint: Some(_)} + if (done, pruned) == expected_result + ); + segment + .save_checkpoint( + &provider, + result.checkpoint.unwrap().as_prune_checkpoint(prune_mode), + ) + .unwrap(); + provider.commit().expect("commit"); + + let last_pruned_block_number = + last_pruned_block_number.checked_sub(if result.done { 0 } else { 1 }); + + assert_eq!( + tx.table::().unwrap().len(), + transaction_senders.len() - (last_pruned_tx_number + 1) + ); + assert_eq!( + tx.inner().get_prune_checkpoint(PruneSegment::SenderRecovery).unwrap(), + Some(PruneCheckpoint { + block_number: last_pruned_block_number, + tx_number: Some(last_pruned_tx_number as TxNumber), + prune_mode + }) + ); + }; + + test_prune(6, (false, 10)); + test_prune(6, (true, 2)); + test_prune(10, (true, 8)); + } +} diff --git a/crates/prune/src/segments/storage_history.rs b/crates/prune/src/segments/storage_history.rs new file mode 100644 index 000000000000..d836da69f449 --- /dev/null +++ b/crates/prune/src/segments/storage_history.rs @@ -0,0 +1,232 @@ +use crate::{ + segments::{ + history::prune_history_indices, PruneInput, PruneOutput, PruneOutputCheckpoint, Segment, + }, + PrunerError, +}; +use reth_db::{ + database::Database, + models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress}, + tables, +}; +use reth_primitives::PruneSegment; +use reth_provider::DatabaseProviderRW; +use tracing::{instrument, trace}; + +#[derive(Default)] +#[non_exhaustive] +pub(crate) struct StorageHistory; + +impl Segment for StorageHistory { + fn segment(&self) -> PruneSegment { + PruneSegment::StorageHistory + } + + #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] + fn prune( + &self, + provider: &DatabaseProviderRW<'_, DB>, + input: PruneInput, + ) -> Result { + let range = match input.get_next_block_range() { + Some(range) => range, + None => { + trace!(target: "pruner", "No storage history to prune"); + return Ok(PruneOutput::done()) + } + }; + let range_end = *range.end(); + + let mut last_changeset_pruned_block = None; + let (pruned_changesets, done) = provider + .prune_table_with_range::( + BlockNumberAddress::range(range), + input.delete_limit / 2, + |_| false, + |row| last_changeset_pruned_block = Some(row.0.block_number()), + )?; + trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)"); + + let last_changeset_pruned_block = last_changeset_pruned_block + // If there's more storage storage changesets to prune, set the checkpoint block number + // to previous, so we could finish pruning its storage changesets on the next run. + .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) + .unwrap_or(range_end); + + let (processed, pruned_indices) = prune_history_indices::( + provider, + last_changeset_pruned_block, + |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key, + |key| StorageShardedKey::last(key.address, key.sharded_key.key), + )?; + trace!(target: "pruner", %processed, deleted = %pruned_indices, %done, "Pruned storage history (history)" ); + + Ok(PruneOutput { + done, + pruned: pruned_changesets + pruned_indices, + checkpoint: Some(PruneOutputCheckpoint { + block_number: Some(last_changeset_pruned_block), + tx_number: None, + }), + }) + } +} + +#[cfg(test)] +mod tests { + use crate::segments::{PruneInput, PruneOutput, Segment, StorageHistory}; + use assert_matches::assert_matches; + use reth_db::{tables, BlockNumberList}; + use reth_interfaces::test_utils::{ + generators, + generators::{random_block_range, random_changeset_range, random_eoa_account_range}, + }; + use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256}; + use reth_provider::PruneCheckpointReader; + use reth_stages::test_utils::TestTransaction; + use std::{collections::BTreeMap, ops::AddAssign}; + + #[test] + fn prune() { + let tx = TestTransaction::default(); + let mut rng = generators::rng(); + + let blocks = random_block_range(&mut rng, 0..=5000, B256::ZERO, 0..1); + tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); + + let accounts = + random_eoa_account_range(&mut rng, 0..2).into_iter().collect::>(); + + let (changesets, _) = random_changeset_range( + &mut rng, + blocks.iter(), + accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), + 2..3, + 1..2, + ); + tx.insert_changesets(changesets.clone(), None).expect("insert changesets"); + tx.insert_history(changesets.clone(), None).expect("insert history"); + + let storage_occurrences = tx.table::().unwrap().into_iter().fold( + BTreeMap::<_, usize>::new(), + |mut map, (key, _)| { + map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1); + map + }, + ); + assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1)); + + assert_eq!( + tx.table::().unwrap().len(), + changesets.iter().flatten().flat_map(|(_, _, entries)| entries).count() + ); + + let original_shards = tx.table::().unwrap(); + + let test_prune = |to_block: BlockNumber, run: usize, expected_result: (bool, usize)| { + let prune_mode = PruneMode::Before(to_block); + let input = PruneInput { + previous_checkpoint: tx + .inner() + .get_prune_checkpoint(PruneSegment::StorageHistory) + .unwrap(), + to_block, + delete_limit: 2000, + }; + let segment = StorageHistory::default(); + + let provider = tx.inner_rw(); + let result = segment.prune(&provider, input).unwrap(); + assert_matches!( + result, + PruneOutput {done, pruned, checkpoint: Some(_)} + if (done, pruned) == expected_result + ); + segment + .save_checkpoint( + &provider, + result.checkpoint.unwrap().as_prune_checkpoint(prune_mode), + ) + .unwrap(); + provider.commit().expect("commit"); + + let changesets = changesets + .iter() + .enumerate() + .flat_map(|(block_number, changeset)| { + changeset.iter().flat_map(move |(address, _, entries)| { + entries.iter().map(move |entry| (block_number, address, entry)) + }) + }) + .collect::>(); + + #[allow(clippy::skip_while_next)] + let pruned = changesets + .iter() + .enumerate() + .skip_while(|(i, (block_number, _, _))| { + *i < input.delete_limit / 2 * run && *block_number <= to_block as usize + }) + .next() + .map(|(i, _)| i) + .unwrap_or_default(); + + let mut pruned_changesets = changesets + .iter() + // Skip what we've pruned so far, subtracting one to get last pruned block number + // further down + .skip(pruned.saturating_sub(1)); + + let last_pruned_block_number = pruned_changesets + .next() + .map(|(block_number, _, _)| if result.done { + *block_number + } else { + block_number.saturating_sub(1) + } as BlockNumber) + .unwrap_or(to_block); + + let pruned_changesets = pruned_changesets.fold( + BTreeMap::<_, Vec<_>>::new(), + |mut acc, (block_number, address, entry)| { + acc.entry((block_number, address)).or_default().push(entry); + acc + }, + ); + + assert_eq!( + tx.table::().unwrap().len(), + pruned_changesets.values().flatten().count() + ); + + let actual_shards = tx.table::().unwrap(); + + let expected_shards = original_shards + .iter() + .filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number) + .map(|(key, blocks)| { + let new_blocks = blocks + .iter(0) + .skip_while(|block| *block <= last_pruned_block_number as usize) + .collect::>(); + (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) + }) + .collect::>(); + + assert_eq!(actual_shards, expected_shards); + + assert_eq!( + tx.inner().get_prune_checkpoint(PruneSegment::StorageHistory).unwrap(), + Some(PruneCheckpoint { + block_number: Some(last_pruned_block_number), + tx_number: None, + prune_mode + }) + ); + }; + + test_prune(998, 1, (false, 1000)); + test_prune(998, 2, (true, 998)); + test_prune(1400, 3, (true, 804)); + } +} diff --git a/crates/prune/src/segments/transaction_lookup.rs b/crates/prune/src/segments/transaction_lookup.rs new file mode 100644 index 000000000000..4a9ca5982753 --- /dev/null +++ b/crates/prune/src/segments/transaction_lookup.rs @@ -0,0 +1,202 @@ +use crate::{ + segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment}, + PrunerError, +}; +use rayon::prelude::*; +use reth_db::{database::Database, tables}; +use reth_primitives::PruneSegment; +use reth_provider::{DatabaseProviderRW, TransactionsProvider}; +use tracing::{instrument, trace}; + +#[derive(Default)] +#[non_exhaustive] +pub(crate) struct TransactionLookup; + +impl Segment for TransactionLookup { + fn segment(&self) -> PruneSegment { + PruneSegment::TransactionLookup + } + + #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] + fn prune( + &self, + provider: &DatabaseProviderRW<'_, DB>, + input: PruneInput, + ) -> Result { + let (start, end) = match input.get_next_tx_num_range(provider)? { + Some(range) => range, + None => { + trace!(target: "pruner", "No transaction lookup entries to prune"); + return Ok(PruneOutput::done()) + } + } + .into_inner(); + let tx_range = start..=(end.min(start + input.delete_limit as u64 - 1)); + let tx_range_end = *tx_range.end(); + + // Retrieve transactions in the range and calculate their hashes in parallel + let hashes = provider + .transactions_by_tx_range(tx_range.clone())? + .into_par_iter() + .map(|transaction| transaction.hash()) + .collect::>(); + + // Number of transactions retrieved from the database should match the tx range count + let tx_count = tx_range.count(); + if hashes.len() != tx_count { + return Err(PrunerError::InconsistentData( + "Unexpected number of transaction hashes retrieved by transaction number range", + )) + } + + let mut last_pruned_transaction = None; + let (pruned, _) = provider.prune_table_with_iterator::( + hashes, + input.delete_limit, + |row| { + last_pruned_transaction = Some(last_pruned_transaction.unwrap_or(row.1).max(row.1)) + }, + )?; + let done = tx_range_end == end; + trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup"); + + let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end); + + let last_pruned_block = provider + .transaction_block(last_pruned_transaction)? + .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? + // If there's more transaction lookup entries to prune, set the checkpoint block number + // to previous, so we could finish pruning its transaction lookup entries on the next + // run. + .checked_sub(if done { 0 } else { 1 }); + + Ok(PruneOutput { + done, + pruned, + checkpoint: Some(PruneOutputCheckpoint { + block_number: last_pruned_block, + tx_number: Some(last_pruned_transaction), + }), + }) + } +} + +#[cfg(test)] +mod tests { + use crate::segments::{PruneInput, PruneOutput, Segment, TransactionLookup}; + use assert_matches::assert_matches; + use itertools::{ + FoldWhile::{Continue, Done}, + Itertools, + }; + use reth_db::tables; + use reth_interfaces::test_utils::{generators, generators::random_block_range}; + use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256}; + use reth_provider::PruneCheckpointReader; + use reth_stages::test_utils::TestTransaction; + use std::ops::Sub; + + #[test] + fn prune() { + let tx = TestTransaction::default(); + let mut rng = generators::rng(); + + let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3); + tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); + + let mut tx_hash_numbers = Vec::new(); + for block in &blocks { + for transaction in &block.body { + tx_hash_numbers.push((transaction.hash, tx_hash_numbers.len() as u64)); + } + } + tx.insert_tx_hash_numbers(tx_hash_numbers.clone()).expect("insert tx hash numbers"); + + assert_eq!( + tx.table::().unwrap().len(), + blocks.iter().map(|block| block.body.len()).sum::() + ); + assert_eq!( + tx.table::().unwrap().len(), + tx.table::().unwrap().len() + ); + + let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| { + let prune_mode = PruneMode::Before(to_block); + let input = PruneInput { + previous_checkpoint: tx + .inner() + .get_prune_checkpoint(PruneSegment::TransactionLookup) + .unwrap(), + to_block, + delete_limit: 10, + }; + let segment = TransactionLookup::default(); + + let next_tx_number_to_prune = tx + .inner() + .get_prune_checkpoint(PruneSegment::TransactionLookup) + .unwrap() + .and_then(|checkpoint| checkpoint.tx_number) + .map(|tx_number| tx_number + 1) + .unwrap_or_default(); + + let last_pruned_tx_number = blocks + .iter() + .take(to_block as usize) + .map(|block| block.body.len()) + .sum::() + .min(next_tx_number_to_prune as usize + input.delete_limit) + .sub(1); + + let last_pruned_block_number = blocks + .iter() + .fold_while((0, 0), |(_, mut tx_count), block| { + tx_count += block.body.len(); + + if tx_count > last_pruned_tx_number { + Done((block.number, tx_count)) + } else { + Continue((block.number, tx_count)) + } + }) + .into_inner() + .0; + + let provider = tx.inner_rw(); + let result = segment.prune(&provider, input).unwrap(); + assert_matches!( + result, + PruneOutput {done, pruned, checkpoint: Some(_)} + if (done, pruned) == expected_result + ); + segment + .save_checkpoint( + &provider, + result.checkpoint.unwrap().as_prune_checkpoint(prune_mode), + ) + .unwrap(); + provider.commit().expect("commit"); + + let last_pruned_block_number = + last_pruned_block_number.checked_sub(if result.done { 0 } else { 1 }); + + assert_eq!( + tx.table::().unwrap().len(), + tx_hash_numbers.len() - (last_pruned_tx_number + 1) + ); + assert_eq!( + tx.inner().get_prune_checkpoint(PruneSegment::TransactionLookup).unwrap(), + Some(PruneCheckpoint { + block_number: last_pruned_block_number, + tx_number: Some(last_pruned_tx_number as TxNumber), + prune_mode + }) + ); + }; + + test_prune(6, (false, 10)); + test_prune(6, (true, 2)); + test_prune(10, (true, 8)); + } +} diff --git a/crates/prune/src/segments/transactions.rs b/crates/prune/src/segments/transactions.rs index ea8b072523f0..2dcfc28b248c 100644 --- a/crates/prune/src/segments/transactions.rs +++ b/crates/prune/src/segments/transactions.rs @@ -11,16 +11,18 @@ use tracing::{instrument, trace}; #[non_exhaustive] pub(crate) struct Transactions; -impl Segment for Transactions { - const SEGMENT: PruneSegment = PruneSegment::Transactions; +impl Segment for Transactions { + fn segment(&self) -> PruneSegment { + PruneSegment::Transactions + } #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] - fn prune( + fn prune( &self, provider: &DatabaseProviderRW<'_, DB>, input: PruneInput, ) -> Result { - let tx_range = match input.get_next_tx_num_range_from_checkpoint(provider, Self::SEGMENT)? { + let tx_range = match input.get_next_tx_num_range(provider)? { Some(range) => range, None => { trace!(target: "pruner", "No transactions to prune"); @@ -65,7 +67,7 @@ mod tests { }; use reth_db::tables; use reth_interfaces::test_utils::{generators, generators::random_block_range}; - use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, TxNumber, B256}; + use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256}; use reth_provider::PruneCheckpointReader; use reth_stages::test_utils::TestTransaction; use std::ops::Sub; @@ -84,12 +86,19 @@ mod tests { let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| { let prune_mode = PruneMode::Before(to_block); - let input = PruneInput { to_block, delete_limit: 10 }; + let input = PruneInput { + previous_checkpoint: tx + .inner() + .get_prune_checkpoint(PruneSegment::Transactions) + .unwrap(), + to_block, + delete_limit: 10, + }; let segment = Transactions::default(); let next_tx_number_to_prune = tx .inner() - .get_prune_checkpoint(Transactions::SEGMENT) + .get_prune_checkpoint(PruneSegment::Transactions) .unwrap() .and_then(|checkpoint| checkpoint.tx_number) .map(|tx_number| tx_number + 1) @@ -138,7 +147,7 @@ mod tests { transactions.len() - (last_pruned_tx_number + 1) ); assert_eq!( - tx.inner().get_prune_checkpoint(Transactions::SEGMENT).unwrap(), + tx.inner().get_prune_checkpoint(PruneSegment::Transactions).unwrap(), Some(PruneCheckpoint { block_number: last_pruned_block_number, tx_number: Some(last_pruned_tx_number as TxNumber),