Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into alexey/exex-rollup-blob
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed May 1, 2024
2 parents 7825a23 + 2334317 commit f70880f
Show file tree
Hide file tree
Showing 22 changed files with 476 additions and 515 deletions.
6 changes: 5 additions & 1 deletion bin/reth/src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
//! CLI definition and entrypoint to executable

#[cfg(feature = "optimism")]
use crate::commands::import_op;
use crate::{
args::{
utils::{chain_help, genesis_value_parser, SUPPORTED_CHAINS},
LogArgs,
},
commands::{
config_cmd, db, debug_cmd, dump_genesis, import, import_op, init_cmd, init_state,
config_cmd, db, debug_cmd, dump_genesis, import, init_cmd, init_state,
node::{self, NoArgs},
p2p, recover, stage, test_vectors,
},
Expand Down Expand Up @@ -148,6 +150,7 @@ impl<Ext: clap::Args + fmt::Debug> Cli<Ext> {
Commands::Init(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::InitState(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::Import(command) => runner.run_blocking_until_ctrl_c(command.execute()),
#[cfg(feature = "optimism")]
Commands::ImportOp(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Expand Down Expand Up @@ -186,6 +189,7 @@ pub enum Commands<Ext: clap::Args + fmt::Debug = NoArgs> {
#[command(name = "import")]
Import(import::ImportCommand),
/// This syncs RLP encoded OP blocks below Bedrock from a file, without executing.
#[cfg(feature = "optimism")]
#[command(name = "import-op")]
ImportOp(import_op::ImportOpCommand),
/// Dumps genesis block JSON configuration to stdout.
Expand Down
149 changes: 59 additions & 90 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use std::{
use tokio::sync::{
mpsc,
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;
Expand Down Expand Up @@ -381,40 +380,6 @@ where
None
}

/// Called to resolve chain forks and ensure that the Execution layer is working with the latest
/// valid chain.
///
/// These responses should adhere to the [Engine API Spec for
/// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
///
/// Returns an error if an internal error occurred like a database error.
fn forkchoice_updated(
&mut self,
state: ForkchoiceState,
attrs: Option<EngineT::PayloadAttributes>,
) -> Result<OnForkChoiceUpdated, CanonicalError> {
trace!(target: "consensus::engine", ?state, "Received new forkchoice state update");

// Pre-validate forkchoice state update and return if it's invalid or
// cannot be processed at the moment.
if let Some(on_updated) = self.pre_validate_forkchoice_update(state) {
return Ok(on_updated)
}

let start = Instant::now();
let make_canonical_result = self.blockchain.make_canonical(state.head_block_hash);
let elapsed = self.record_make_canonical_latency(start, &make_canonical_result);

let status = self.on_forkchoice_updated_make_canonical_result(
state,
attrs,
make_canonical_result,
elapsed,
)?;
trace!(target: "consensus::engine", ?status, ?state, "Returning forkchoice status");
Ok(status)
}

/// Process the result of attempting to make forkchoice state head hash canonical.
///
/// # Returns
Expand Down Expand Up @@ -519,56 +484,54 @@ where
false
}

/// Invoked when we receive a new forkchoice update message.
/// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
/// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
/// chain.
///
/// These responses should adhere to the [Engine API Spec for
/// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
///
/// Returns `true` if the engine now reached its maximum block number, See
/// [EngineSyncController::has_reached_max_block].
/// Returns an error if an internal error occurred like a database error.
fn on_forkchoice_updated(
&mut self,
state: ForkchoiceState,
attrs: Option<EngineT::PayloadAttributes>,
tx: oneshot::Sender<Result<OnForkChoiceUpdated, RethError>>,
) -> Result<OnForkchoiceUpdateOutcome, CanonicalError> {
) -> Result<OnForkChoiceUpdated, CanonicalError> {
self.metrics.forkchoice_updated_messages.increment(1);
self.blockchain.on_forkchoice_update_received(&state);
trace!(target: "consensus::engine", ?state, "Received new forkchoice state update");

let on_updated = match self.forkchoice_updated(state, attrs) {
Ok(response) => response,
Err(error) => {
if error.is_fatal() {
// FCU resulted in a fatal error from which we can't recover
let err = error.clone();
let _ = tx.send(Err(RethError::Canonical(error)));
return Err(err)
}
let _ = tx.send(Err(RethError::Canonical(error)));
return Ok(OnForkchoiceUpdateOutcome::Processed)
}
};

let fcu_status = on_updated.forkchoice_status();
// Pre-validate forkchoice state update and return if it's invalid or
// cannot be processed at the moment.
if let Some(on_updated) = self.pre_validate_forkchoice_update(state) {
return Ok(on_updated)
}

// update the forkchoice state tracker
self.forkchoice_state_tracker.set_latest(state, fcu_status);
let start = Instant::now();
let make_canonical_result = self.blockchain.make_canonical(state.head_block_hash);
let elapsed = self.record_make_canonical_latency(start, &make_canonical_result);

// send the response to the CL ASAP
let _ = tx.send(Ok(on_updated));
let status = self.on_forkchoice_updated_make_canonical_result(
state,
attrs,
make_canonical_result,
elapsed,
)?;
trace!(target: "consensus::engine", ?status, ?state, "Returning forkchoice status");
Ok(status)
}

match fcu_status {
/// Called after the forkchoice update status has been resolved.
/// Depending on the outcome, the method updates the sync state and notifies the listeners
/// about new processed FCU.
fn on_forkchoice_updated_status(&mut self, state: ForkchoiceState, status: ForkchoiceStatus) {
match status {
ForkchoiceStatus::Invalid => {}
ForkchoiceStatus::Valid => {
// FCU head is valid, we're no longer syncing
self.sync_state_updater.update_sync_state(SyncState::Idle);
// node's fully synced, clear active download requests
self.sync.clear_block_download_requests();

// check if we reached the maximum configured block
let tip_number = self.blockchain.canonical_tip().number;
if self.sync.has_reached_max_block(tip_number) {
// Terminate the sync early if it's reached the maximum user
// configured block.
return Ok(OnForkchoiceUpdateOutcome::ReachedMaxBlock)
}
}
ForkchoiceStatus::Syncing => {
// we're syncing
Expand All @@ -577,9 +540,7 @@ where
}

// notify listeners about new processed FCU
self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state, fcu_status));

Ok(OnForkchoiceUpdateOutcome::Processed)
self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status));
}

/// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less
Expand Down Expand Up @@ -966,7 +927,7 @@ where
///
/// If the newest head is not invalid, then this will trigger a new pipeline run to sync the gap
///
/// See [Self::forkchoice_updated] and [BlockchainTreeEngine::make_canonical].
/// See [Self::on_forkchoice_updated] and [BlockchainTreeEngine::make_canonical].
fn on_failed_canonical_forkchoice_update(
&mut self,
state: &ForkchoiceState,
Expand Down Expand Up @@ -1758,17 +1719,34 @@ where
if let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) {
match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
match this.on_forkchoice_updated(state, payload_attrs, tx) {
Ok(OnForkchoiceUpdateOutcome::Processed) => {}
Ok(OnForkchoiceUpdateOutcome::ReachedMaxBlock) => {
// reached the max block, we can terminate the future
return Poll::Ready(Ok(()))
match this.on_forkchoice_updated(state, payload_attrs) {
Ok(on_updated) => {
let fcu_status = on_updated.forkchoice_status();
// update the forkchoice state tracker
this.forkchoice_state_tracker.set_latest(state, fcu_status);
// send the response to the CL ASAP
let _ = tx.send(Ok(on_updated));

if fcu_status.is_valid() {
let tip_number = this.blockchain.canonical_tip().number;
if this.sync.has_reached_max_block(tip_number) {
// Terminate the sync early if it's reached the
// maximum user configured block.
return Poll::Ready(Ok(()))
}
}

this.on_forkchoice_updated_status(state, fcu_status);
}
Err(err) => {
// fatal error, we can terminate the future
return Poll::Ready(Err(RethError::Canonical(err).into()))
Err(error) => {
if error.is_fatal() {
// fatal error, we can terminate the future
let _ = tx.send(Err(RethError::Canonical(error.clone())));
return Poll::Ready(Err(RethError::Canonical(error).into()))
}
let _ = tx.send(Err(RethError::Canonical(error)));
}
}
};
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
this.metrics.new_payload_messages.increment(1);
Expand Down Expand Up @@ -1828,15 +1806,6 @@ where
}
}

/// Represents all outcomes of an applied fork choice update.
#[derive(Debug)]
enum OnForkchoiceUpdateOutcome {
/// FCU was processed successfully.
Processed,
/// FCU was processed successfully and reached max block.
ReachedMaxBlock,
}

/// Represents outcomes of processing a sync event
#[derive(Debug)]
enum SyncEventOutcome {
Expand Down
5 changes: 3 additions & 2 deletions crates/e2e-test-utils/src/engine_api.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::traits::PayloadEnvelopeExt;
use jsonrpsee::http_client::HttpClient;
use jsonrpsee::http_client::{transport::HttpBackend, HttpClient};
use reth::{
api::{EngineTypes, PayloadBuilderAttributes},
providers::CanonStateNotificationStream,
Expand All @@ -10,12 +10,13 @@ use reth::{
};
use reth_payload_builder::PayloadId;
use reth_primitives::B256;
use reth_rpc::AuthClientService;
use std::marker::PhantomData;

/// Helper for engine api operations
pub struct EngineApiTestContext<E> {
pub canonical_stream: CanonStateNotificationStream,
pub engine_api_client: HttpClient,
pub engine_api_client: HttpClient<AuthClientService<HttpBackend>>,
pub _marker: PhantomData<E>,
}

Expand Down
5 changes: 2 additions & 3 deletions crates/node-core/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,8 @@ mod tests {
DatabaseEnv,
};
use reth_primitives::{
Chain, ForkTimestamps, Genesis, IntegerList, GOERLI, GOERLI_GENESIS_HASH, MAINNET,
MAINNET_GENESIS_HASH, SEPOLIA, SEPOLIA_GENESIS_HASH,
Chain, Genesis, IntegerList, GOERLI, GOERLI_GENESIS_HASH, MAINNET, MAINNET_GENESIS_HASH,
SEPOLIA, SEPOLIA_GENESIS_HASH,
};
use reth_provider::test_utils::create_test_provider_factory_with_chain_spec;

Expand Down Expand Up @@ -570,7 +570,6 @@ mod tests {
..Default::default()
},
hardforks: BTreeMap::default(),
fork_timestamps: ForkTimestamps::default(),
genesis_hash: None,
paris_block_and_final_difficulty: None,
deposit_contract: None,
Expand Down
12 changes: 8 additions & 4 deletions crates/payload/builder/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,13 @@ impl CachedReads {
}
}

/// A [Database] that caches reads inside [CachedReads].
#[derive(Debug)]
struct CachedReadsDbMut<'a, DB> {
cached: &'a mut CachedReads,
db: DB,
pub struct CachedReadsDbMut<'a, DB> {
/// The cache of reads.
pub cached: &'a mut CachedReads,
/// The underlying database.
pub db: DB,
}

impl<'a, DB: DatabaseRef> Database for CachedReadsDbMut<'a, DB> {
Expand Down Expand Up @@ -126,7 +129,8 @@ impl<'a, DB: DatabaseRef> Database for CachedReadsDbMut<'a, DB> {
/// `revm::db::State` for repeated payload build jobs.
#[derive(Debug)]
pub struct CachedReadsDBRef<'a, DB> {
inner: RefCell<CachedReadsDbMut<'a, DB>>,
/// The inner cache reads db mut.
pub inner: RefCell<CachedReadsDbMut<'a, DB>>,
}

impl<'a, DB: DatabaseRef> DatabaseRef for CachedReadsDBRef<'a, DB> {
Expand Down
Loading

0 comments on commit f70880f

Please sign in to comment.