Skip to content

Commit 32ea549

Browse files
authored
Merge f5790e7 into 855a97f
2 parents 855a97f + f5790e7 commit 32ea549

File tree

10 files changed

+257
-23
lines changed

10 files changed

+257
-23
lines changed

crates/blockchain/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ ethrex-metrics = { path = "./metrics", default-features = false }
2525
[dev-dependencies]
2626
serde_json.workspace = true
2727
hex = "0.4.3"
28-
tokio.workspace = true
28+
tokio = { workspace = true, features = [] }
2929

3030
[lib]
3131
path = "./blockchain.rs"

crates/blockchain/blockchain.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ use ethrex_common::types::{
2121
};
2222
use ethrex_common::types::{BlobsBundle, ELASTICITY_MULTIPLIER};
2323
use ethrex_common::{Address, H256};
24-
use ethrex_storage::error::StoreError;
25-
use ethrex_storage::Store;
24+
use ethrex_storage::{error::StoreError, query_plan::QueryPlan, Store};
2625
use ethrex_vm::{BlockExecutionResult, Evm, EvmEngine};
2726
use mempool::Mempool;
2827
use std::collections::HashMap;
@@ -120,6 +119,33 @@ impl Blockchain {
120119
Ok(execution_result)
121120
}
122121

122+
async fn store_block_with_query_plan(
123+
&self,
124+
block: &Block,
125+
execution_result: BlockExecutionResult,
126+
account_updates: &[AccountUpdate],
127+
) -> Result<(), ChainError> {
128+
// Apply the account updates over the last block's state and compute the new state root
129+
let (new_state_root, state_query_plan, accounts_query_plan) = self
130+
.storage
131+
.apply_account_updates_batch(block.header.parent_hash, account_updates)
132+
.await?
133+
.ok_or(ChainError::ParentStateNotFound)?;
134+
135+
// Check state root matches the one in block header
136+
validate_state_root(&block.header, new_state_root)?;
137+
138+
let query_plan = QueryPlan {
139+
account_updates: (state_query_plan, accounts_query_plan),
140+
block: block.clone(),
141+
receipts: (block.hash(), execution_result.receipts),
142+
};
143+
144+
query_plan.apply_to_store(self.storage.clone()).await?;
145+
146+
Ok(())
147+
}
148+
123149
pub async fn store_block(
124150
&self,
125151
block: &Block,
@@ -150,7 +176,7 @@ impl Blockchain {
150176
let since = Instant::now();
151177
let (res, updates) = self.execute_block(block).await?;
152178
let executed = Instant::now();
153-
let result = self.store_block(block, res, &updates).await;
179+
let result = self.store_block_with_query_plan(block, res, &updates).await;
154180
let stored = Instant::now();
155181
Self::print_add_block_logs(block, since, executed, stored);
156182
result

crates/common/trie/trie.rs

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -112,19 +112,17 @@ impl Trie {
112112
/// Remove a value from the trie given its RLP-encoded path.
113113
/// Returns the value if it was succesfully removed or None if it wasn't part of the trie
114114
pub fn remove(&mut self, path: PathRLP) -> Result<Option<ValueRLP>, TrieError> {
115-
let value;
116-
(self.root, value) = if self.root.is_valid() {
117-
// If the trie is not empty, call the root node's removal logic.
118-
let (node, value) = self
119-
.root
120-
.get_node(self.db.as_ref())?
121-
.ok_or(TrieError::InconsistentTree)?
122-
.remove(self.db.as_ref(), Nibbles::from_bytes(&path))?;
115+
if !self.root.is_valid() {
116+
return Ok(None);
117+
}
123118

124-
(node.map(Into::into).unwrap_or_default(), value)
125-
} else {
126-
(NodeRef::default(), None)
127-
};
119+
// If the trie is not empty, call the root node's removal logic.
120+
let (node, value) = self
121+
.root
122+
.get_node(self.db.as_ref())?
123+
.ok_or(TrieError::InconsistentTree)?
124+
.remove(self.db.as_ref(), Nibbles::from_bytes(&path))?;
125+
self.root = node.map(Into::into).unwrap_or_default();
128126

129127
Ok(value)
130128
}
@@ -147,6 +145,12 @@ impl Trie {
147145
}
148146
}
149147

148+
pub fn hash_prepare_batch(&mut self) -> (H256, Vec<(NodeHash, Vec<u8>)>) {
149+
let query_plan = self.commit_vec_aka_state_diff();
150+
let ret_hash = self.hash_no_commit();
151+
(ret_hash, query_plan)
152+
}
153+
150154
/// Compute the hash of the root node and flush any changes into the database.
151155
///
152156
/// This method will also compute the hash of all internal nodes indirectly. It will not clear
@@ -155,12 +159,21 @@ impl Trie {
155159
if self.root.is_valid() {
156160
let mut acc = Vec::new();
157161
self.root.commit(&mut acc);
158-
self.db.put_batch(acc)?;
162+
self.db.put_batch(acc)?; // we'll try to avoid calling this for every commit
159163
}
160164

161165
Ok(())
162166
}
163167

168+
pub fn commit_vec_aka_state_diff(&mut self) -> Vec<(NodeHash, Vec<u8>)> {
169+
let mut acc = Vec::new();
170+
if self.root.is_valid() {
171+
self.root.commit(&mut acc);
172+
}
173+
174+
acc
175+
}
176+
164177
/// Obtain a merkle proof for the given path.
165178
/// The proof will contain all the encoded nodes traversed until reaching the node where the path is stored (including this last node).
166179
/// The proof will still be constructed even if the path is not stored in the trie, proving its absence.

crates/storage/api.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ use ethrex_common::types::{
66
};
77
use std::{collections::HashMap, fmt::Debug, panic::RefUnwindSafe};
88

9-
use crate::{error::StoreError, store::STATE_TRIE_SEGMENTS};
9+
use crate::{error::StoreError, query_plan::QueryPlan, store::STATE_TRIE_SEGMENTS};
1010
use ethrex_trie::{Nibbles, Trie};
1111

1212
// We need async_trait because the stabilized feature lacks support for object safety
1313
// (i.e. dyn StoreEngine)
1414
#[async_trait::async_trait]
1515
pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe {
16+
/// Store changes in a batch
17+
async fn store_changes(&self, query_plan: QueryPlan) -> Result<(), StoreError>;
1618
/// Add a batch of blocks in a single transaction.
1719
/// This will store -> BlockHeader, BlockBody, BlockTransactions, BlockNumber.
1820
async fn add_blocks(&self, blocks: Vec<Block>) -> Result<(), StoreError>;

crates/storage/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod api;
22

3+
pub mod query_plan;
34
#[cfg(any(feature = "libmdbx", feature = "redb"))]
45
mod rlp;
56
mod store;

crates/storage/query_plan.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use crate::{error::StoreError, Store};
2+
use ethrex_common::{
3+
types::{Block, Receipt},
4+
H256,
5+
};
6+
use ethrex_trie::NodeHash;
7+
8+
pub struct QueryPlan {
9+
pub account_updates: (
10+
Vec<(NodeHash, Vec<u8>)>, // vec<(node_hash, node_data)>
11+
Vec<(Vec<u8>, Vec<(NodeHash, Vec<u8>)>)>, // hashed_address, vec<(node_hash, node_data)>
12+
),
13+
pub block: Block,
14+
pub receipts: (H256, Vec<Receipt>),
15+
}
16+
17+
impl QueryPlan {
18+
pub async fn apply_to_store(self, store: Store) -> Result<(), StoreError> {
19+
store.store_changes(self).await?;
20+
Ok(())
21+
}
22+
}

crates/storage/store.rs

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::api::StoreEngine;
22
use crate::error::StoreError;
3+
use crate::query_plan::QueryPlan;
34
use crate::store_db::in_memory::Store as InMemoryStore;
45
#[cfg(feature = "libmdbx")]
56
use crate::store_db::libmdbx::Store as LibmdbxStore;
@@ -15,7 +16,7 @@ use ethrex_common::types::{
1516
};
1617
use ethrex_rlp::decode::RLPDecode;
1718
use ethrex_rlp::encode::RLPEncode;
18-
use ethrex_trie::{Nibbles, Trie};
19+
use ethrex_trie::{Nibbles, NodeHash, Trie};
1920
use sha3::{Digest as _, Keccak256};
2021
use std::collections::{BTreeMap, HashMap};
2122
use std::fmt::Debug;
@@ -44,6 +45,10 @@ pub enum EngineType {
4445
}
4546

4647
impl Store {
48+
pub async fn store_changes(&self, query_plan: QueryPlan) -> Result<(), StoreError> {
49+
self.engine.store_changes(query_plan).await
50+
}
51+
4752
pub fn new(_path: &str, engine_type: EngineType) -> Result<Self, StoreError> {
4853
info!("Starting storage engine ({engine_type:?})");
4954
let store = match engine_type {
@@ -312,6 +317,91 @@ impl Store {
312317
Ok(Some(account_state.nonce))
313318
}
314319

320+
/// Applies account updates based on the block's latest storage state
321+
/// and returns the new state root after the updates have been applied.
322+
pub async fn apply_account_updates_batch(
323+
&self,
324+
block_hash: BlockHash,
325+
account_updates: &[AccountUpdate],
326+
) -> Result<
327+
Option<(
328+
H256,
329+
Vec<(NodeHash, Vec<u8>)>,
330+
Vec<(Vec<u8>, Vec<(NodeHash, Vec<u8>)>)>,
331+
)>,
332+
StoreError,
333+
> {
334+
let Some(state_trie) = self.state_trie(block_hash)? else {
335+
return Ok(None);
336+
};
337+
338+
let (state_trie_hash, state_query_plan, query_plan) = self
339+
.apply_account_updates_from_trie_batch(state_trie, account_updates)
340+
.await?;
341+
342+
Ok(Some((state_trie_hash, state_query_plan, query_plan)))
343+
}
344+
345+
pub async fn apply_account_updates_from_trie_batch(
346+
&self,
347+
mut state_trie: Trie,
348+
account_updates: &[AccountUpdate],
349+
) -> Result<
350+
(
351+
H256,
352+
Vec<(NodeHash, Vec<u8>)>,
353+
Vec<(Vec<u8>, Vec<(NodeHash, Vec<u8>)>)>,
354+
),
355+
StoreError,
356+
> {
357+
let mut ret_vec_account = Vec::new();
358+
for update in account_updates.iter() {
359+
let hashed_address = hash_address(&update.address);
360+
if update.removed {
361+
// Remove account from trie
362+
state_trie.remove(hashed_address)?;
363+
continue;
364+
}
365+
// Add or update AccountState in the trie
366+
// Fetch current state or create a new state to be inserted
367+
let mut account_state = match state_trie.get(&hashed_address)? {
368+
Some(encoded_state) => AccountState::decode(&encoded_state)?,
369+
None => AccountState::default(),
370+
};
371+
if let Some(info) = &update.info {
372+
account_state.nonce = info.nonce;
373+
account_state.balance = info.balance;
374+
account_state.code_hash = info.code_hash;
375+
// Store updated code in DB
376+
if let Some(code) = &update.code {
377+
self.add_account_code(info.code_hash, code.clone()).await?;
378+
}
379+
}
380+
// Store the added storage in the account's storage trie and compute its new root
381+
if !update.added_storage.is_empty() {
382+
let mut storage_trie = self.engine.open_storage_trie(
383+
H256::from_slice(&hashed_address),
384+
account_state.storage_root,
385+
);
386+
for (storage_key, storage_value) in &update.added_storage {
387+
let hashed_key = hash_key(storage_key);
388+
if storage_value.is_zero() {
389+
storage_trie.remove(hashed_key)?;
390+
} else {
391+
storage_trie.insert(hashed_key, storage_value.encode_to_vec())?;
392+
}
393+
}
394+
let (storage_hash, storage_query_plan) = storage_trie.hash_prepare_batch();
395+
account_state.storage_root = storage_hash;
396+
ret_vec_account.push((hashed_address.clone(), storage_query_plan));
397+
}
398+
state_trie.insert(hashed_address, account_state.encode_to_vec())?;
399+
}
400+
let (state_hash, state_query_plan) = state_trie.hash_prepare_batch();
401+
402+
Ok((state_hash, state_query_plan, ret_vec_account))
403+
}
404+
315405
/// Applies account updates based on the block's latest storage state
316406
/// and returns the new state root after the updates have been applied.
317407
pub async fn apply_account_updates(

crates/storage/store_db/in_memory.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
use crate::{
2-
api::StoreEngine,
3-
error::StoreError,
4-
store::{MAX_SNAPSHOT_READS, STATE_TRIE_SEGMENTS},
2+
api::StoreEngine, error::StoreError, query_plan::QueryPlan, store::{MAX_SNAPSHOT_READS, STATE_TRIE_SEGMENTS}
53
};
64
use bytes::Bytes;
75
use ethereum_types::{H256, U256};
@@ -87,6 +85,10 @@ impl Store {
8785

8886
#[async_trait::async_trait]
8987
impl StoreEngine for Store {
88+
async fn store_changes(&self, _query_plan: QueryPlan) -> Result<(), StoreError> {
89+
todo!()
90+
}
91+
9092
fn get_block_header(&self, block_number: u64) -> Result<Option<BlockHeader>, StoreError> {
9193
let store = self.inner();
9294
if let Some(hash) = store.canonical_hashes.get(&block_number) {

0 commit comments

Comments
 (0)