Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 24 additions & 42 deletions crates/storage/store_db/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,58 +91,40 @@ impl StoreEngine for Store {
let mut store = self.inner()?;

// Store trie updates
let mut trie = TrieLayerCache::clone(&store.trie_cache);
let parent = update_batch
.blocks
.first()
.ok_or(StoreError::UpdateBatchNoBlocks)?
.header
.parent_hash;

let pre_state_root = store
.headers
.get(&parent)
.map(|header| header.state_root)
.unwrap_or_default();

let last_state_root = update_batch
.blocks
.last()
.ok_or(StoreError::UpdateBatchNoBlocks)?
.header
.state_root;
let nodes = {
let trie = Arc::make_mut(&mut store.trie_cache);

let nodes = trie.commit();
trie.put_iter(
update_batch
.storage_updates
.into_iter()
.flat_map(|(account_hash, nodes)| {
nodes
.into_iter()
.map(move |(path, node)| (apply_prefix(Some(account_hash), path), node))
})
.chain(update_batch.account_updates)
.map(|(key, value)| (key.into_vec(), value)),
);

nodes
};

{
let mut state_trie = store
.state_trie_nodes
.lock()
.map_err(|_| StoreError::LockError)?;

if let Some(root) = trie.get_commitable(pre_state_root, COMMIT_THRESHOLD) {
let nodes = trie.commit(root).unwrap_or_default();
for (key, value) in nodes {
if value.is_empty() {
state_trie.remove(&key);
} else {
state_trie.insert(key, value);
}
for (key, value) in nodes {
if value.is_empty() {
state_trie.remove(&key);
} else {
state_trie.insert(key, value);
}
}
}

let key_values = update_batch
.storage_updates
.into_iter()
.flat_map(|(account_hash, nodes)| {
nodes
.into_iter()
.map(move |(path, node)| (apply_prefix(Some(account_hash), path), node))
})
.chain(update_batch.account_updates)
.collect();
trie.put_batch(pre_state_root, last_state_root, key_values);
store.trie_cache = Arc::new(trie);

for block in update_batch.blocks {
// store block
let number = block.header.number;
Expand Down
65 changes: 26 additions & 39 deletions crates/storage/store_db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ use ethrex_rlp::{
};
use std::fmt::Debug;

// TODO: use finalized hash to determine when to commit
const COMMIT_THRESHOLD: usize = 128;

/// Canonical block hashes column family: [`u8;_`] => [`Vec<u8>`]
/// - [`u8;_`] = `block_number.to_le_bytes()`
/// - [`Vec<u8>`] = `BlockHashRLP::from(block_hash).bytes().clone()`
Expand Down Expand Up @@ -447,20 +444,14 @@ impl Store {
match rx.recv() {
Ok((
notify,
parent_state_root,
child_state_root,
_parent_state_root,
_child_state_root,
account_updates,
storage_updates,
)) => {
// FIXME: what should we do on error?
let _ = store_clone
.apply_trie_updates(
notify,
parent_state_root,
child_state_root,
account_updates,
storage_updates,
)
.apply_trie_updates(notify, account_updates, storage_updates)
.inspect_err(|err| error!("apply_trie_updates failed: {err}"));
}
Err(err) => error!("Error while reading diff layer: {err}"),
Expand Down Expand Up @@ -743,8 +734,6 @@ impl Store {
fn apply_trie_updates(
&self,
notify: SyncSender<Result<(), StoreError>>,
parent_state_root: H256,
child_state_root: H256,
account_updates: Vec<(Nibbles, Vec<u8>)>,
storage_updates: StorageUpdates,
) -> Result<(), StoreError> {
Expand All @@ -753,38 +742,38 @@ impl Store {
let trie_cache = &self.trie_cache;

// Phase 1: update the in-memory diff-layers only, then notify block production.
let new_layer = storage_updates
.into_iter()
.flat_map(|(account_hash, nodes)| {
nodes
let commit_data;
{
let mut lock = trie_cache.lock().map_err(|_| StoreError::LockError)?;
let trie = Arc::make_mut(&mut *lock);
commit_data = trie.commit();

trie.put_iter(
storage_updates
.into_iter()
.map(move |(path, node)| (apply_prefix(Some(account_hash), path), node))
})
.chain(account_updates)
.collect();
// Read-Copy-Update the trie cache with a new layer.
let trie = trie_cache
.lock()
.map_err(|_| StoreError::LockError)?
.clone();
let mut trie_mut = (*trie).clone();
trie_mut.put_batch(parent_state_root, child_state_root, new_layer);
let trie = Arc::new(trie_mut);
*trie_cache.lock().map_err(|_| StoreError::LockError)? = trie.clone();
.flat_map(|(account_hash, nodes)| {
nodes
.into_iter()
.map(move |(path, node)| (apply_prefix(Some(account_hash), path), node))
})
.chain(account_updates)
.map(|(key, value)| (key.into_vec(), value)),
);
}

// Update finished, signal block processing.
notify.send(Ok(())).map_err(|_| StoreError::LockError)?;

// Phase 2: update disk layer.
let Some(root) = trie.get_commitable(parent_state_root, COMMIT_THRESHOLD) else {
// Nothing to commit to disk, move on.
if commit_data.is_empty() {
return Ok(());
};
}

// Stop the flat-key-value generator thread, as the underlying trie is about to change.
// Ignore the error, if the channel is closed it means there is no worker to notify.
let _ = fkv_ctl.send(FKVGeneratorControlMessage::Stop);

// RCU to remove the bottom layer: update step needs to happen after disk layer is updated.
let mut trie_mut = (*trie).clone();
let mut batch = WriteBatch::default();
let [
cf_accounts_trie_nodes,
Expand All @@ -809,8 +798,7 @@ impl Store {
// the account address (32 bytes) + storage path (up to 32 bytes).

// Commit removes the bottom layer and returns it, this is the mutation step.
let nodes = trie_mut.commit(root).unwrap_or_default();
for (key, value) in nodes {
for (key, value) in commit_data {
let is_leaf = key.len() == 65 || key.len() == 131;
let is_account = key.len() <= 65;

Expand Down Expand Up @@ -838,8 +826,7 @@ impl Store {
// We want to send this message even if there was an error during the batch write
let _ = fkv_ctl.send(FKVGeneratorControlMessage::Continue);
result?;
// Phase 3: update diff layers with the removal of bottom layer.
*trie_cache.lock().map_err(|_| StoreError::LockError)? = Arc::new(trie_mut);

Ok(())
}

Expand Down
Loading
Loading