Skip to content

Commit

Permalink
feat(prune): headers segment triggered by snapshots (#4936)
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin authored Oct 9, 2023
1 parent 670d459 commit 9ca0cdc
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 40 deletions.
6 changes: 4 additions & 2 deletions crates/primitives/src/prune/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ pub enum PruneSegment {
SenderRecovery,
/// Prune segment responsible for the `TxHashNumber` table.
TransactionLookup,
/// Prune segment responsible for all `Receipts`.
/// Prune segment responsible for all rows in `Receipts` table.
Receipts,
/// Prune segment responsible for some `Receipts` filtered by logs.
/// Prune segment responsible for some rows in `Receipts` table filtered by logs.
ContractLogs,
/// Prune segment responsible for the `AccountChangeSet` and `AccountHistory` tables.
AccountHistory,
/// Prune segment responsible for the `StorageChangeSet` and `StorageHistory` tables.
StorageHistory,
/// Prune segment responsible for the `CanonicalHeaders`, `Headers` and `HeaderTD` tables.
Headers,
}

/// PruneSegment error type.
Expand Down
34 changes: 33 additions & 1 deletion crates/prune/src/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl<DB: Database> Pruner<DB> {
let mut segments = BTreeMap::new();

// TODO(alexey): prune snapshotted segments of data (headers, transactions)
// let highest_snapshots = *self.highest_snapshots_tracker.borrow();
let highest_snapshots = *self.highest_snapshots_tracker.borrow();

// Multiply `delete_limit` (number of row to delete per block) by number of blocks since
// last pruner run. `previous_tip_block_number` is close to `tip_block_number`, usually
Expand Down Expand Up @@ -282,6 +282,38 @@ impl<DB: Database> Pruner<DB> {
);
}

if let Some(snapshots) = highest_snapshots {
if let (Some(to_block), true) = (snapshots.headers, delete_limit > 0) {
let prune_mode = PruneMode::Before(to_block + 1);
trace!(
target: "pruner",
prune_segment = ?PruneSegment::Headers,
%to_block,
?prune_mode,
"Got target block to prune"
);

let segment_start = Instant::now();
let segment = segments::Headers::default();
let output = segment.prune(&provider, PruneInput { to_block, delete_limit })?;
if let Some(checkpoint) = output.checkpoint {
segment
.save_checkpoint(&provider, checkpoint.as_prune_checkpoint(prune_mode))?;
}
self.metrics
.get_prune_segment_metrics(PruneSegment::Headers)
.duration_seconds
.record(segment_start.elapsed());

done = done && output.done;
delete_limit = delete_limit.saturating_sub(output.pruned);
segments.insert(
PruneSegment::Headers,
(PruneProgress::from_done(output.done), output.pruned),
);
}
}

provider.commit()?;
self.previous_tip_block_number = Some(tip_block_number);

Expand Down
192 changes: 192 additions & 0 deletions crates/prune/src/segments/headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
use crate::{
segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment},
PrunerError,
};
use itertools::Itertools;
use reth_db::{database::Database, table::Table, tables};
use reth_interfaces::RethResult;
use reth_primitives::{BlockNumber, PruneSegment};
use reth_provider::DatabaseProviderRW;
use std::ops::RangeInclusive;
use tracing::{instrument, trace};

#[derive(Default)]
#[non_exhaustive]
pub(crate) struct Headers;

impl Segment for Headers {
const SEGMENT: PruneSegment = PruneSegment::Headers;

#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune<DB: Database>(
&self,
provider: &DatabaseProviderRW<'_, DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
let block_range = match input.get_next_block_range(provider, Self::SEGMENT)? {
Some(range) => range,
None => {
trace!(target: "pruner", "No headers to prune");
return Ok(PruneOutput::done())
}
};

let delete_limit = input.delete_limit / 3;
if delete_limit == 0 {
// Nothing to do, `input.delete_limit` is less than 3, so we can't prune all
// headers-related tables up to the same height
return Ok(PruneOutput::not_done())
}

let results = [
self.prune_table::<DB, tables::CanonicalHeaders>(
provider,
block_range.clone(),
delete_limit,
)?,
self.prune_table::<DB, tables::Headers>(provider, block_range.clone(), delete_limit)?,
self.prune_table::<DB, tables::HeaderTD>(provider, block_range, delete_limit)?,
];

if !results.iter().map(|(_, _, last_pruned_block)| last_pruned_block).all_equal() {
return Err(PrunerError::InconsistentData(
"All headers-related tables should be pruned up to the same height",
))
}

let (done, pruned, last_pruned_block) = results.into_iter().fold(
(true, 0, 0),
|(total_done, total_pruned, _), (done, pruned, last_pruned_block)| {
(total_done && done, total_pruned + pruned, last_pruned_block)
},
);

Ok(PruneOutput {
done,
pruned,
checkpoint: Some(PruneOutputCheckpoint {
block_number: Some(last_pruned_block),
tx_number: None,
}),
})
}
}

impl Headers {
/// Prune one headers-related table.
///
/// Returns `done`, number of pruned rows and last pruned block number.
fn prune_table<DB: Database, T: Table<Key = BlockNumber>>(
&self,
provider: &DatabaseProviderRW<'_, DB>,
range: RangeInclusive<BlockNumber>,
delete_limit: usize,
) -> RethResult<(bool, usize, BlockNumber)> {
let mut last_pruned_block = *range.end();
let (pruned, done) = provider.prune_table_with_range::<T>(
range,
delete_limit,
|_| false,
|row| last_pruned_block = row.0,
)?;
trace!(target: "pruner", %pruned, %done, table = %T::NAME, "Pruned headers");

Ok((done, pruned, last_pruned_block))
}
}

#[cfg(test)]
mod tests {
use crate::segments::{Headers, PruneInput, PruneOutput, Segment};
use assert_matches::assert_matches;
use reth_db::tables;
use reth_interfaces::test_utils::{generators, generators::random_header_range};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestTransaction;

#[test]
fn prune() {
let tx = TestTransaction::default();
let mut rng = generators::rng();

let headers = random_header_range(&mut rng, 0..100, B256::ZERO);
tx.insert_headers_with_td(headers.iter()).expect("insert headers");

assert_eq!(tx.table::<tables::CanonicalHeaders>().unwrap().len(), headers.len());
assert_eq!(tx.table::<tables::Headers>().unwrap().len(), headers.len());
assert_eq!(tx.table::<tables::HeaderTD>().unwrap().len(), headers.len());

let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| {
let prune_mode = PruneMode::Before(to_block);
let input = PruneInput { to_block, delete_limit: 10 };
let segment = Headers::default();

let next_block_number_to_prune = tx
.inner()
.get_prune_checkpoint(Headers::SEGMENT)
.unwrap()
.and_then(|checkpoint| checkpoint.block_number)
.map(|block_number| block_number + 1)
.unwrap_or_default();

let provider = tx.inner_rw();
let result = segment.prune(&provider, input).unwrap();
assert_matches!(
result,
PruneOutput {done, pruned, checkpoint: Some(_)}
if (done, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");

let last_pruned_block_number = to_block
.min(next_block_number_to_prune + input.delete_limit as BlockNumber / 3 - 1);

assert_eq!(
tx.table::<tables::CanonicalHeaders>().unwrap().len(),
headers.len() - (last_pruned_block_number + 1) as usize
);
assert_eq!(
tx.table::<tables::Headers>().unwrap().len(),
headers.len() - (last_pruned_block_number + 1) as usize
);
assert_eq!(
tx.table::<tables::HeaderTD>().unwrap().len(),
headers.len() - (last_pruned_block_number + 1) as usize
);
assert_eq!(
tx.inner().get_prune_checkpoint(Headers::SEGMENT).unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};

test_prune(3, (false, 9));
test_prune(3, (true, 3));
}

#[test]
fn prune_cannot_be_done() {
let tx = TestTransaction::default();

let input = PruneInput {
to_block: 1,
// Less than total number of tables for `Headers` segment
delete_limit: 2,
};
let segment = Headers::default();

let provider = tx.inner_rw();
let result = segment.prune(&provider, input).unwrap();
assert_eq!(result, PruneOutput::not_done());
}
}
44 changes: 41 additions & 3 deletions crates/prune/src/segments/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
mod headers;
mod receipts;

pub(crate) use headers::Headers;
pub(crate) use receipts::Receipts;

use crate::PrunerError;
Expand Down Expand Up @@ -88,10 +91,39 @@ impl PruneInput {

Ok(Some(range))
}

/// Get next inclusive block range to prune according to the checkpoint, `to_block` block
/// number and `limit`.
///
/// To get the range start (`from_block`):
/// 1. If checkpoint exists, use next block.
/// 2. If checkpoint doesn't exist, use block 0.
///
/// To get the range end: use block `to_block`.
pub(crate) fn get_next_block_range<DB: Database>(
&self,
provider: &DatabaseProviderRW<'_, DB>,
segment: PruneSegment,
) -> RethResult<Option<RangeInclusive<BlockNumber>>> {
let from_block = provider
.get_prune_checkpoint(segment)?
.and_then(|checkpoint| checkpoint.block_number)
// Checkpoint exists, prune from the next block after the highest pruned one
.map(|block_number| block_number + 1)
// No checkpoint exists, prune from genesis
.unwrap_or(0);

let range = from_block..=self.to_block;
if range.is_empty() {
return Ok(None)
}

Ok(Some(range))
}
}

/// Segment pruning output, see [Segment::prune].
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) struct PruneOutput {
/// `true` if pruning has been completed up to the target block, and `false` if there's more
/// data to prune in further runs.
Expand All @@ -105,12 +137,18 @@ pub(crate) struct PruneOutput {
impl PruneOutput {
/// Returns a [PruneOutput] with `done = true`, `pruned = 0` and `checkpoint = None`.
/// Use when no pruning is needed.
pub(crate) fn done() -> Self {
pub(crate) const fn done() -> Self {
Self { done: true, pruned: 0, checkpoint: None }
}

/// Returns a [PruneOutput] with `done = false`, `pruned = 0` and `checkpoint = None`.
/// Use when pruning is needed but cannot be done.
pub(crate) const fn not_done() -> Self {
Self { done: false, pruned: 0, checkpoint: None }
}
}

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) struct PruneOutputCheckpoint {
/// Highest pruned block number. If it's [None], the pruning for block `0` is not finished yet.
pub(crate) block_number: Option<BlockNumber>,
Expand Down
30 changes: 14 additions & 16 deletions crates/prune/src/segments/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,20 +136,6 @@ mod tests {
.min(next_tx_number_to_prune as usize + input.delete_limit)
.sub(1);

let last_pruned_block_number = blocks
.iter()
.fold_while((0, 0), |(_, mut tx_count), block| {
tx_count += block.body.len();

if tx_count > last_pruned_tx_number {
Done((block.number, tx_count))
} else {
Continue((block.number, tx_count))
}
})
.into_inner()
.0;

let provider = tx.inner_rw();
let result = segment.prune(&provider, input).unwrap();
assert_matches!(
Expand All @@ -165,8 +151,20 @@ mod tests {
.unwrap();
provider.commit().expect("commit");

let last_pruned_block_number =
last_pruned_block_number.checked_sub(if result.done { 0 } else { 1 });
let last_pruned_block_number = blocks
.iter()
.fold_while((0, 0), |(_, mut tx_count), block| {
tx_count += block.body.len();

if tx_count > last_pruned_tx_number {
Done((block.number, tx_count))
} else {
Continue((block.number, tx_count))
}
})
.into_inner()
.0
.checked_sub(if result.done { 0 } else { 1 });

assert_eq!(
tx.table::<tables::Receipts>().unwrap().len(),
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/test_utils/test_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl TestTransaction {
/// Inserts total difficulty of headers into the corresponding tables.
///
/// Superset functionality of [TestTransaction::insert_headers].
pub(crate) fn insert_headers_with_td<'a, I>(&self, headers: I) -> Result<(), DbError>
pub fn insert_headers_with_td<'a, I>(&self, headers: I) -> Result<(), DbError>
where
I: Iterator<Item = &'a SealedHeader>,
{
Expand Down
Loading

0 comments on commit 9ca0cdc

Please sign in to comment.