Skip to content

Commit

Permalink
fix: copy and prune data from database with move_to_static_files, b…
Browse files Browse the repository at this point in the history
…efore a pipeline run/unwind (#8127)
  • Loading branch information
joshieDo authored May 9, 2024
1 parent ee3c939 commit ad54af8
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 38 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion bin/reth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ reth-payload-builder.workspace = true
reth-payload-validator.workspace = true
reth-basic-payload-builder.workspace = true
reth-discv4.workspace = true
reth-prune.workspace = true
reth-static-file = { workspace = true }
reth-trie = { workspace = true, features = ["metrics"] }
reth-nippy-jar.workspace = true
Expand Down
12 changes: 2 additions & 10 deletions bin/reth/src/commands/stage/unwind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use clap::{Parser, Subcommand};
use reth_beacon_consensus::EthBeaconConsensus;
use reth_config::{Config, PruneConfig};
use reth_config::Config;
use reth_consensus::Consensus;
use reth_db::{database::Database, open_db};
use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
Expand All @@ -13,7 +13,6 @@ use reth_provider::{
BlockExecutionWriter, BlockNumReader, ChainSpecProvider, HeaderSyncMode, ProviderFactory,
StaticFileProviderFactory,
};
use reth_prune::PrunerBuilder;
use reth_stages::{
sets::DefaultStages,
stages::{
Expand Down Expand Up @@ -107,14 +106,7 @@ impl Command {
let mut pipeline = self.build_pipeline(config, provider_factory.clone()).await?;

// Move all applicable data from database to static files.
pipeline.produce_static_files()?;

// Run the pruner so we don't potentially end up with higher height in the database vs
// static files.
let mut pruner = PrunerBuilder::new(PruneConfig::default())
.prune_delete_limit(usize::MAX)
.build(provider_factory);
pruner.run(*range.end())?;
pipeline.move_to_static_files()?;

pipeline.unwind((*range.start()).saturating_sub(1), None)?;
} else {
Expand Down
1 change: 1 addition & 0 deletions crates/primitives/src/prune/mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl PruneMode {
PruneMode::Distance(distance) if *distance >= segment.min_blocks(purpose) => {
Some((tip - distance, *self))
}
PruneMode::Before(n) if *n == tip + 1 && purpose.is_static_file() => Some((tip, *self)),
PruneMode::Before(n) if *n > tip => None, // Nothing to prune yet
PruneMode::Before(n) if tip - n >= segment.min_blocks(purpose) => Some((n - 1, *self)),
_ => return Err(PruneSegmentError::Configuration(segment)),
Expand Down
3 changes: 1 addition & 2 deletions crates/prune/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::time::Duration;

use crate::{segments::SegmentSet, Pruner};
use reth_config::PruneConfig;
use reth_db::database::Database;
use reth_primitives::{FinishedExExHeight, PruneModes, MAINNET};
use reth_provider::ProviderFactory;
use std::time::Duration;
use tokio::sync::watch;

/// Contains the information required to build a pruner
Expand Down
13 changes: 13 additions & 0 deletions crates/prune/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,16 @@ pub enum PrunerError {
#[error(transparent)]
Provider(#[from] ProviderError),
}

impl From<PrunerError> for RethError {
fn from(err: PrunerError) -> Self {
match err {
PrunerError::PruneSegment(_) | PrunerError::InconsistentData(_) => {
RethError::Custom(err.to_string())
}
PrunerError::Interface(err) => err,
PrunerError::Database(err) => RethError::Database(err),
PrunerError::Provider(err) => RethError::Provider(err),
}
}
}
1 change: 1 addition & 0 deletions crates/stages-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ reth-interfaces.workspace = true
reth-static-file.workspace = true
reth-tokio-util.workspace = true
reth-consensus.workspace = true
reth-prune.workspace = true

# metrics
reth-metrics.workspace = true
Expand Down
56 changes: 37 additions & 19 deletions crates/stages-api/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use reth_provider::{
providers::StaticFileWriter, ProviderFactory, StageCheckpointReader, StageCheckpointWriter,
StaticFileProviderFactory,
};
use reth_prune::PrunerBuilder;
use reth_static_file::StaticFileProducer;
use reth_tokio_util::EventListeners;
use std::pin::Pin;
Expand Down Expand Up @@ -140,7 +141,7 @@ where
match target {
PipelineTarget::Sync(tip) => self.set_tip(tip),
PipelineTarget::Unwind(target) => {
if let Err(err) = self.produce_static_files() {
if let Err(err) = self.move_to_static_files() {
return (self, Err(err.into()))
}
if let Err(err) = self.unwind(target, None) {
Expand Down Expand Up @@ -199,7 +200,7 @@ where
/// pipeline (for example the `Finish` stage). Or [ControlFlow::Unwind] of the stage that caused
/// the unwind.
pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
self.produce_static_files()?;
self.move_to_static_files()?;

let mut previous_stage = None;
for stage_index in 0..self.stages.len() {
Expand Down Expand Up @@ -236,9 +237,10 @@ where
Ok(self.progress.next_ctrl())
}

/// Run [static file producer](StaticFileProducer) and move all data from the database to static
/// files for corresponding [segments](reth_primitives::static_file::StaticFileSegment),
/// according to their [stage checkpoints](StageCheckpoint):
/// Run [static file producer](StaticFileProducer) and [pruner](reth_prune::Pruner) to **move**
/// all data from the database to static files for corresponding
/// [segments](reth_primitives::static_file::StaticFileSegment), according to their [stage
/// checkpoints](StageCheckpoint):
/// - [StaticFileSegment::Headers](reth_primitives::static_file::StaticFileSegment::Headers) ->
/// [StageId::Headers]
/// - [StaticFileSegment::Receipts](reth_primitives::static_file::StaticFileSegment::Receipts)
Expand All @@ -248,22 +250,38 @@ where
///
/// CAUTION: This method locks the static file producer Mutex, hence can block the thread if the
/// lock is occupied.
pub fn produce_static_files(&self) -> RethResult<()> {
pub fn move_to_static_files(&self) -> RethResult<()> {
let mut static_file_producer = self.static_file_producer.lock();

let provider = self.provider_factory.provider()?;
let targets = static_file_producer.get_static_file_targets(HighestStaticFiles {
headers: provider
.get_stage_checkpoint(StageId::Headers)?
.map(|checkpoint| checkpoint.block_number),
receipts: provider
.get_stage_checkpoint(StageId::Execution)?
.map(|checkpoint| checkpoint.block_number),
transactions: provider
.get_stage_checkpoint(StageId::Bodies)?
.map(|checkpoint| checkpoint.block_number),
})?;
static_file_producer.run(targets)?;
// Copies data from database to static files
let lowest_static_file_height = {
let provider = self.provider_factory.provider()?;
let stages_checkpoints = [StageId::Headers, StageId::Execution, StageId::Bodies]
.into_iter()
.map(|stage| {
provider.get_stage_checkpoint(stage).map(|c| c.map(|c| c.block_number))
})
.collect::<Result<Vec<_>, _>>()?;

let targets = static_file_producer.get_static_file_targets(HighestStaticFiles {
headers: stages_checkpoints[0],
receipts: stages_checkpoints[1],
transactions: stages_checkpoints[2],
})?;
static_file_producer.run(targets)?;
stages_checkpoints.into_iter().min().expect("exists")
};

// Deletes data which has been copied to static files.
if let Some(prune_tip) = lowest_static_file_height {
// Run the pruner so we don't potentially end up with higher height in the database vs
// static files during a pipeline unwind
let mut pruner = PrunerBuilder::new(Default::default())
.prune_delete_limit(usize::MAX)
.build(self.provider_factory.clone());

pruner.run(prune_tip)?;
}

Ok(())
}
Expand Down
19 changes: 14 additions & 5 deletions crates/storage/provider/src/providers/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ use reth_db::mdbx::DatabaseArguments;
/// A common provider that fetches data from a database or static file.
///
/// This provider implements most provider or provider factory traits.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct ProviderFactory<DB> {
/// Database
db: DB,
db: Arc<DB>,
/// Chain spec
chain_spec: Arc<ChainSpec>,
/// Static File Provider
Expand All @@ -52,7 +52,7 @@ impl<DB> ProviderFactory<DB> {
static_files_path: PathBuf,
) -> RethResult<ProviderFactory<DB>> {
Ok(Self {
db,
db: Arc::new(db),
chain_spec,
static_file_provider: StaticFileProvider::new(static_files_path)?,
})
Expand All @@ -71,7 +71,7 @@ impl<DB> ProviderFactory<DB> {

#[cfg(any(test, feature = "test-utils"))]
/// Consumes Self and returns DB
pub fn into_db(self) -> DB {
pub fn into_db(self) -> Arc<DB> {
self.db
}
}
Expand All @@ -86,7 +86,7 @@ impl ProviderFactory<DatabaseEnv> {
static_files_path: PathBuf,
) -> RethResult<Self> {
Ok(ProviderFactory::<DatabaseEnv> {
db: init_db(path, args).map_err(|e| RethError::Custom(e.to_string()))?,
db: Arc::new(init_db(path, args).map_err(|e| RethError::Custom(e.to_string()))?),
chain_spec,
static_file_provider: StaticFileProvider::new(static_files_path)?,
})
Expand Down Expand Up @@ -558,6 +558,15 @@ impl<DB: Database> PruneCheckpointReader for ProviderFactory<DB> {
}
}

impl<DB> Clone for ProviderFactory<DB> {
fn clone(&self) -> Self {
ProviderFactory {
db: Arc::clone(&self.db),
chain_spec: self.chain_spec.clone(),
static_file_provider: self.static_file_provider.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::ProviderFactory;
Expand Down

0 comments on commit ad54af8

Please sign in to comment.