Skip to content

Commit

Permalink
perf: add ETL to init_from_state_dump (#8022)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo authored May 2, 2024
1 parent 7428573 commit aba48a5
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 135 deletions.
127 changes: 65 additions & 62 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions bin/reth/src/commands/init_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
dirs::{DataDirPath, MaybePlatformPath},
};
use clap::Parser;
use reth_config::config::EtlConfig;
use reth_db::{database::Database, init_db};
use reth_node_core::init::{init_from_state_dump, init_genesis};
use reth_primitives::{ChainSpec, B256};
Expand Down Expand Up @@ -78,11 +79,15 @@ impl InitStateCommand {
info!(target: "reth::cli", "Database opened");

let provider_factory = ProviderFactory::new(db, self.chain, data_dir.static_files())?;
let etl_config = EtlConfig::new(
Some(EtlConfig::from_datadir(data_dir.data_dir())),
EtlConfig::default_file_size(),
);

info!(target: "reth::cli", "Writing genesis block");

let hash = match self.state {
Some(path) => init_at_state(path, provider_factory)?,
Some(path) => init_at_state(path, provider_factory, etl_config)?,
None => init_genesis(provider_factory)?,
};

Expand All @@ -95,6 +100,7 @@ impl InitStateCommand {
pub fn init_at_state<DB: Database>(
state_dump_path: PathBuf,
factory: ProviderFactory<DB>,
etl_config: EtlConfig,
) -> eyre::Result<B256> {
info!(target: "reth::cli",
path=?state_dump_path,
Expand All @@ -103,5 +109,5 @@ pub fn init_at_state<DB: Database>(
let file = File::open(state_dump_path)?;
let reader = BufReader::new(file);

init_from_state_dump(reader, factory)
init_from_state_dump(reader, factory, etl_config)
}
2 changes: 2 additions & 0 deletions crates/node-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ reth-tasks.workspace = true
reth-trie.workspace = true
reth-consensus-common.workspace = true
reth-beacon-consensus.workspace = true
reth-etl.workspace = true
reth-codecs.workspace = true

# ethereum
discv5.workspace = true
Expand Down
148 changes: 98 additions & 50 deletions crates/node-core/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
//! Reth genesis initialization utility functions.

use reth_codecs::Compact;
use reth_config::config::EtlConfig;
use reth_db::{
database::Database,
tables,
transaction::{DbTx, DbTxMut},
};
use reth_etl::Collector;
use reth_interfaces::{db::DatabaseError, provider::ProviderResult};
use reth_primitives::{
stage::StageId, Account, Address, Bytecode, ChainSpec, GenesisAccount, Receipts,
Expand Down Expand Up @@ -293,10 +296,16 @@ pub fn insert_genesis_header<DB: Database>(
Ok(())
}

/// Initialize chain with state at specific block, from reader of state dump.
/// Reads account state from a [`BufRead`] reader and initializes it at the highest block that can
/// be found on database.
///
/// It's similar to [`init_genesis`] but supports importing state too big to fit in memory, and can
/// be set to the highest block present. One practical usecase is to import OP mainnet state at
/// bedrock transition block.
pub fn init_from_state_dump<DB: Database>(
mut reader: impl BufRead,
factory: ProviderFactory<DB>,
etl_config: EtlConfig,
) -> eyre::Result<B256> {
let block = factory.last_block_number()?;
let hash = factory.block_hash(block)?.unwrap();
Expand All @@ -307,47 +316,115 @@ pub fn init_from_state_dump<DB: Database>(
"Initializing state at block"
);

let mut total_inserted_accounts = 0;
let mut accounts = Vec::with_capacity(AVERAGE_COUNT_ACCOUNTS_PER_GB_STATE_DUMP);
let mut chunk_total_byte_len = 0;
let mut line = String::new();

// first line can be state root, then it can be used for verifying against computed state root
let expected_state_root = parse_state_root(&mut reader)?;

// remaining lines are accounts
let collector = parse_accounts(&mut reader, etl_config)?;

// write state to db
let mut provider_rw = factory.provider_rw()?;
dump_state(collector, &mut provider_rw, block)?;

// compute and compare state root. this advances the stage checkpoints.
let computed_state_root = compute_state_root(&provider_rw)?;
if computed_state_root != expected_state_root {
error!(target: "reth::cli",
?computed_state_root,
?expected_state_root,
"Computed state root does not match state root in state dump"
);

Err(InitDatabaseError::SateRootMismatch { expected_state_root, computed_state_root })?
} else {
info!(target: "reth::cli",
?computed_state_root,
"Computed state root matches state root in state dump"
);
}

provider_rw.commit()?;

Ok(hash)
}

/// Parses and returns expected state root.
fn parse_state_root(reader: &mut impl BufRead) -> eyre::Result<B256> {
let mut line = String::new();
reader.read_line(&mut line)?;
let expected_state_root = serde_json::from_str::<StateRoot>(&line)?.root;

let expected_state_root = serde_json::from_str::<StateRoot>(&line)?.root;
trace!(target: "reth::cli",
root=%expected_state_root,
"Read state root from file"
);
Ok(expected_state_root)
}

line.clear();
/// Parses accounts and pushes them to a [`Collector`].
fn parse_accounts(
mut reader: impl BufRead,
etl_config: EtlConfig,
) -> Result<Collector<Address, GenesisAccount>, eyre::Error> {
let mut line = String::new();
let mut collector = Collector::new(etl_config.file_size, etl_config.dir);

// remaining lines are accounts
let mut provider_rw = factory.provider_rw()?;
while let Ok(n) = reader.read_line(&mut line) {
chunk_total_byte_len += n;
if DEFAULT_SOFT_LIMIT_BYTE_LEN_ACCOUNTS_CHUNK <= chunk_total_byte_len || n == 0 {
// acc
if n == 0 {
break;
}

let GenesisAccountWithAddress { genesis_account, address } = serde_json::from_str(&line)?;
collector.insert(address, genesis_account)?;

if !collector.is_empty() && collector.len() % AVERAGE_COUNT_ACCOUNTS_PER_GB_STATE_DUMP == 0
{
info!(target: "reth::cli",
parsed_new_accounts=collector.len(),
);
}

line.clear();
}

Ok(collector)
}

/// Takes a [`Collector`] and processes all accounts.
fn dump_state<DB: Database>(
mut collector: Collector<Address, GenesisAccount>,
provider_rw: &mut DatabaseProviderRW<DB>,
block: u64,
) -> Result<(), eyre::Error> {
let accounts_len = collector.len();
let mut accounts = Vec::with_capacity(AVERAGE_COUNT_ACCOUNTS_PER_GB_STATE_DUMP);
let mut total_inserted_accounts = 0;

for (index, entry) in collector.iter()?.enumerate() {
let (address, account) = entry?;
let (address, _) = Address::from_compact(address.as_slice(), address.len());
let (account, _) = GenesisAccount::from_compact(account.as_slice(), account.len());

accounts.push((address, account));

if (index > 0 && index % AVERAGE_COUNT_ACCOUNTS_PER_GB_STATE_DUMP == 0) ||
index == accounts_len - 1
{
total_inserted_accounts += accounts.len();

info!(target: "reth::cli",
chunk_total_byte_len,
parsed_new_accounts=accounts.len(),
total_inserted_accounts,
"Writing accounts to db"
);

// reset
chunk_total_byte_len = 0;

// use transaction to insert genesis header
insert_genesis_hashes(
&provider_rw,
provider_rw,
accounts.iter().map(|(address, account)| (address, account)),
)?;

insert_history(
&provider_rw,
provider_rw,
accounts.iter().map(|(address, account)| (address, account)),
block,
)?;
Expand All @@ -363,37 +440,8 @@ pub fn init_from_state_dump<DB: Database>(

accounts.clear();
}

if n == 0 {
break;
}

let GenesisAccountWithAddress { genesis_account, address } = serde_json::from_str(&line)?;
accounts.push((address, genesis_account));

line.clear();
}

// compute and compare state root. this advances the stage checkpoints.
let computed_state_root = compute_state_root(&provider_rw)?;
if computed_state_root != expected_state_root {
error!(target: "reth::cli",
?computed_state_root,
?expected_state_root,
"Computed state root does not match state root in state dump"
);

Err(InitDatabaseError::SateRootMismatch { expected_state_root, computed_state_root })?
} else {
info!(target: "reth::cli",
?computed_state_root,
"Computed state root matches state root in state dump"
);
}

provider_rw.commit()?;

Ok(hash)
Ok(())
}

/// Computes the state root (from scratch) based on the accounts and storages present in the
Expand Down
3 changes: 2 additions & 1 deletion crates/storage/codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ reth-codecs-derive = { path = "./derive", default-features = false }

# eth
alloy-eips = { workspace = true, optional = true }
alloy-genesis = { workspace = true, optional = true }
alloy-primitives.workspace = true

# misc
Expand All @@ -36,5 +37,5 @@ proptest-derive.workspace = true
[features]
default = ["std", "alloy"]
std = ["alloy-primitives/std", "bytes/std"]
alloy = ["dep:alloy-eips", "dep:modular-bitfield"]
alloy = ["dep:alloy-eips", "dep:alloy-genesis", "dep:modular-bitfield"]
optimism = ["reth-codecs-derive/optimism"]
67 changes: 67 additions & 0 deletions crates/storage/codecs/src/alloy/genesis_account.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use crate::Compact;
use alloy_genesis::GenesisAccount as AlloyGenesisAccount;
use alloy_primitives::{Bytes, B256, U256};
use reth_codecs_derive::main_codec;

/// GenesisAccount acts as bridge which simplifies Compact implementation for AlloyGenesisAccount.
///
/// Notice: Make sure this struct is 1:1 with `alloy_genesis::GenesisAccount`
#[main_codec]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
struct GenesisAccount {
/// The nonce of the account at genesis.
nonce: Option<u64>,
/// The balance of the account at genesis.
balance: U256,
/// The account's bytecode at genesis.
code: Option<Bytes>,
/// The account's storage at genesis.
storage: Option<StorageEntries>,
/// The account's private key. Should only be used for testing.
private_key: Option<B256>,
}

#[main_codec]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
struct StorageEntries {
entries: Vec<StorageEntry>,
}

#[main_codec]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
struct StorageEntry {
key: B256,
value: B256,
}

impl Compact for AlloyGenesisAccount {
fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
let account = GenesisAccount {
nonce: self.nonce,
balance: self.balance,
code: self.code,
storage: self.storage.map(|s| StorageEntries {
entries: s.into_iter().map(|(key, value)| StorageEntry { key, value }).collect(),
}),
private_key: self.private_key,
};
account.to_compact(buf)
}

fn from_compact(buf: &[u8], len: usize) -> (Self, &[u8]) {
let (account, _) = GenesisAccount::from_compact(buf, len);
let alloy_account = AlloyGenesisAccount {
nonce: account.nonce,
balance: account.balance,
code: account.code,
storage: account
.storage
.map(|s| s.entries.into_iter().map(|entry| (entry.key, entry.value)).collect()),
private_key: account.private_key,
};
(alloy_account, buf)
}
}
1 change: 1 addition & 0 deletions crates/storage/codecs/src/alloy/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod access_list;
mod genesis_account;
mod log;
mod txkind;
mod withdrawal;
4 changes: 3 additions & 1 deletion crates/storage/db/src/tables/codecs/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ impl_compression_for_compact!(
CompactU256,
StageCheckpoint,
PruneCheckpoint,
ClientVersion
ClientVersion,
// Non-DB
GenesisAccount
);

macro_rules! impl_compression_fixed_compact {
Expand Down
24 changes: 5 additions & 19 deletions crates/storage/provider/src/bundle_state/state_reverts.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use rayon::slice::ParallelSliceMut;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
cursor::{DbCursorRO, DbDupCursorRO, DbDupCursorRW},
models::{AccountBeforeTx, BlockNumberAddress},
tables,
transaction::{DbTx, DbTxMut},
Expand Down Expand Up @@ -75,30 +75,16 @@ impl StateReverts {
tracing::trace!(target: "provider::reverts", "Writing account changes");
let mut account_changeset_cursor = tx.cursor_dup_write::<tables::AccountChangeSets>()?;

// append entries if key is new
let should_append_accounts =
account_changeset_cursor.last()?.map_or(true, |(block_number, _)| {
block_number < first_block || block_number == first_block && block_number == 0
});
for (block_index, mut account_block_reverts) in self.0.accounts.into_iter().enumerate() {
let block_number = first_block + block_index as BlockNumber;
// Sort accounts by address.
account_block_reverts.par_sort_by_key(|a| a.0);

for (address, info) in account_block_reverts {
if should_append_accounts {
account_changeset_cursor.append_dup(
block_number,
AccountBeforeTx { address, info: info.map(into_reth_acc) },
)?;
} else {
// upsert on dupsort tables will append to subkey. see implementation of
// DbCursorRW::upsert for reth_db::implementation::mdbx::cursor::Cursor<RW, _>
account_changeset_cursor.upsert(
block_number,
AccountBeforeTx { address, info: info.map(into_reth_acc) },
)?;
}
account_changeset_cursor.append_dup(
block_number,
AccountBeforeTx { address, info: info.map(into_reth_acc) },
)?;
}
}

Expand Down

0 comments on commit aba48a5

Please sign in to comment.