diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 9c3e31b5ec4..ce4a9b442c4 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -977,8 +977,6 @@ fn verify_head_block_is_known( ) -> Result { let block_opt = chain .canonical_head - .read() - .fork_choice .get_block(&attestation.data.beacon_block_root) .or_else(|| { chain @@ -1246,11 +1244,7 @@ where // processing an attestation that does not include our latest finalized block in its chain. // // We do not delay consideration for later, we simply drop the attestation. - if !chain - .canonical_head - .read() - .fork_choice - .contains_block(&target.root) + if !chain.canonical_head.contains_block(&target.root) && !chain.early_attester_cache.contains_block(target.root) { return Err(Error::UnknownTargetRoot(target.root)); diff --git a/beacon_node/beacon_chain/src/attestation_verification/batch.rs b/beacon_node/beacon_chain/src/attestation_verification/batch.rs index 395f6b244f3..83aaa5332d7 100644 --- a/beacon_node/beacon_chain/src/attestation_verification/batch.rs +++ b/beacon_node/beacon_chain/src/attestation_verification/batch.rs @@ -65,7 +65,7 @@ where .try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) .ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?; - let fork = chain.canonical_head.read().head_fork(); + let fork = chain.canonical_head.cached_head_read_lock().head_fork(); let mut signature_sets = Vec::with_capacity(num_indexed * 3); @@ -169,7 +169,7 @@ where &metrics::ATTESTATION_PROCESSING_BATCH_UNAGG_SIGNATURE_SETUP_TIMES, ); - let fork = chain.canonical_head.read().head_fork(); + let fork = chain.canonical_head.cached_head_read_lock().head_fork(); let pubkey_cache = chain .validator_pubkey_cache diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7dc234706bb..8264cc13814 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -62,7 +62,7 @@ use futures::channel::mpsc::Sender; use itertools::process_results; use itertools::Itertools; use operation_pool::{OperationPool, PersistedOperationPool}; -use parking_lot::{Mutex, RwLock, RwLockWriteGuard}; +use parking_lot::{Mutex, RwLock}; use safe_arith::SafeArith; use slasher::Slasher; use slog::{crit, debug, error, info, trace, warn, Logger}; @@ -92,9 +92,7 @@ use tree_hash::TreeHash; use types::beacon_state::CloneConfig; use types::*; -pub use crate::canonical_head::{ - CanonicalHead, CanonicalHeadRwLock, FastCanonicalHead, FastCanonicalHeadRwLock, -}; +pub use crate::canonical_head::{CanonicalHead, CanonicalHeadRwLock}; pub type ForkChoiceError = fork_choice::Error; @@ -322,13 +320,7 @@ pub struct BeaconChain { pub execution_layer: Option, /// Stores information about the canonical head and finalized/justified checkpoints of the /// chain. Also contains the fork choice struct, for computing the canonical head. - pub canonical_head: CanonicalHeadRwLock>, - /// A smaller version of `CanonicalHead` designed to have very little lock contention but with the - /// downside of sometimes being slightly behind the `CanonicalHead`. - /// - /// To help prevent deadlocks, do not make this field public and only access it via the - /// `BeaconChain::fast_canonical_head` function. - pub(crate) fast_canonical_head: FastCanonicalHeadRwLock, + pub canonical_head: CanonicalHead, /// The root of the genesis block. pub genesis_block_root: Hash256, /// The root of the genesis state. @@ -421,22 +413,6 @@ impl BeaconChain { .as_kv_store_op(BEACON_CHAIN_DB_KEY) } - /// Return a database operation for writing fork choice to disk. - pub fn persist_fork_choice_in_batch(&self) -> KeyValueStoreOp { - Self::persist_fork_choice_in_batch_standalone(&self.canonical_head.read().fork_choice) - } - - /// Return a database operation for writing fork choice to disk. - pub fn persist_fork_choice_in_batch_standalone( - fork_choice: &BeaconForkChoice, - ) -> KeyValueStoreOp { - let persisted_fork_choice = PersistedForkChoice { - fork_choice: fork_choice.to_persisted(), - fork_choice_store: fork_choice.fc_store().to_persisted(), - }; - persisted_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY) - } - /// Load fork choice from disk, returning `None` if it isn't found. pub fn load_fork_choice( store: BeaconStore, @@ -1009,8 +985,8 @@ impl BeaconChain { where E: From, { - let head_lock = self.canonical_head.read(); - f(&head_lock.head_snapshot) + let head_lock = self.canonical_head.cached_head_read_lock(); + f(&head_lock.snapshot) } /// Returns the beacon block root at the head of the canonical chain. @@ -1218,7 +1194,7 @@ impl BeaconChain { /// Returns the slot of the highest block in the canonical chain. pub fn best_slot(&self) -> Slot { - self.canonical_head.read().head_snapshot.beacon_block.slot() + self.canonical_head.cached_head_read_lock().head_slot() } /// Returns the validator index (if any) for the given public key. @@ -1372,8 +1348,6 @@ impl BeaconChain { let execution_status = self .canonical_head - .read() - .fork_choice .get_block_execution_status(&head_block_root) .ok_or(Error::AttestationHeadNotInForkChoice(head_block_root))?; @@ -1426,8 +1400,6 @@ impl BeaconChain { let beacon_block_root = attestation.data.beacon_block_root; match self .canonical_head - .read() - .fork_choice .get_block_execution_status(&beacon_block_root) { // The attestation references a block that is not in fork choice, it must be @@ -1511,8 +1483,8 @@ impl BeaconChain { let beacon_block_root; let beacon_state_root; let head_timer = metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_HEAD_SCRAPE_SECONDS); - let canonical_head_lock = self.canonical_head.read(); - let head = &canonical_head_lock.head_snapshot; + let cached_head_lock = self.canonical_head.cached_head_read_lock(); + let head = &cached_head_lock.snapshot; let head_state = &head.beacon_state; let head_state_slot = head_state.slot(); @@ -1588,9 +1560,12 @@ impl BeaconChain { AttesterCacheKey::new(request_epoch, head_state, beacon_block_root)?; drop(head_timer); + // Drop the head lock ASAP to prevent lock contention. + drop(cached_head_lock); + // Only attest to a block if it is fully verified (i.e. not optimistic or invalid). - match canonical_head_lock - .fork_choice + match self + .canonical_head .get_block_execution_status(&beacon_block_root) { Some(execution_status) if execution_status.is_valid_or_irrelevant() => (), @@ -1603,9 +1578,6 @@ impl BeaconChain { None => return Err(Error::HeadMissingFromForkChoice(beacon_block_root)), }; - // Drop the head lock ASAP to prevent lock contention. - drop(canonical_head_lock); - /* * Phase 2/2: * @@ -1798,8 +1770,6 @@ impl BeaconChain { let _timer = metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES); self.canonical_head - .write() - .fork_choice .on_attestation( self.slot()?, verified.indexed_attestation(), @@ -1934,7 +1904,7 @@ impl BeaconChain { // If there's no eth1 chain then it's impossible to produce blocks and therefore // useless to put things in the op pool. if self.eth1_chain.is_some() { - let fork = self.canonical_head.read().head_fork(); + let fork = self.canonical_head.cached_head_read_lock().head_fork(); self.op_pool .insert_attestation( @@ -2039,15 +2009,9 @@ impl BeaconChain { // pivot block is the same as the current state's pivot block. If it is, then the // attestation's shuffling is the same as the current state's. // To account for skipped slots, find the first block at *or before* the pivot slot. - let canonical_head_lock = self.canonical_head.read(); - let pivot_block_root = canonical_head_lock - .fork_choice - .proto_array() - .core_proto_array() - .iter_block_roots(block_root) - .find(|(_, slot)| *slot <= pivot_slot) - .map(|(block_root, _)| block_root); - drop(canonical_head_lock); + let pivot_block_root = self + .canonical_head + .get_ancestor_at_or_below_slot(block_root, pivot_slot); match pivot_block_root { Some(root) => root == state_pivot_block_root, @@ -2133,8 +2097,10 @@ impl BeaconChain { attester_slashing: SigVerifiedOp>, ) { if self.eth1_chain.is_some() { - self.op_pool - .insert_attester_slashing(attester_slashing, self.canonical_head.read().head_fork()) + self.op_pool.insert_attester_slashing( + attester_slashing, + self.canonical_head.cached_head_read_lock().head_fork(), + ) } } @@ -2652,36 +2618,25 @@ impl BeaconChain { .map_err(BeaconChainError::from)?; } - // Read some information from the canonical head, without holding the read-lock. - let (current_head_finalized_checkpoint, parent_proto_block) = { - let canonical_head_read_lock = self.canonical_head.read(); - - // Do not import a block that doesn't descend from the finalized root. - check_block_is_finalized_descendant::( - &signed_block, - &canonical_head_read_lock.fork_choice, - &self.store, - )?; + // Do not import a block that doesn't descend from the finalized root. + check_block_is_finalized_descendant(self, &signed_block)?; - // Load the parent proto block for later use. - let parent_proto_block = canonical_head_read_lock - .fork_choice - .get_block(&signed_block.parent_root()) - .ok_or_else(|| BlockError::ParentUnknown(signed_block.clone()))?; + // Load the parent proto block for later use. + let parent_proto_block = self + .canonical_head + .get_block(&signed_block.parent_root()) + .ok_or_else(|| BlockError::ParentUnknown(signed_block.clone()))?; - // Note: we're using the finalized checkpoint from the head state, rather than fork - // choice. - // - // We are doing this to ensure that we detect changes in finalization. It's possible - // that fork choice has already been updated to the finalized checkpoint in the block - // we're importing. - let current_head_finalized_checkpoint = canonical_head_read_lock - .head_snapshot - .beacon_state - .finalized_checkpoint(); - - (current_head_finalized_checkpoint, parent_proto_block) - }; + // Note: we're using the finalized checkpoint from the head state, rather than fork + // choice. + // + // We are doing this to ensure that we detect changes in finalization. It's possible + // that fork choice has already been updated to the finalized checkpoint in the block + // we're importing. + let current_head_finalized_checkpoint = self + .canonical_head + .cached_head_read_lock() + .finalized_checkpoint(); let block = signed_block.message(); @@ -2813,17 +2768,16 @@ impl BeaconChain { } } - // Take a write-lock on the canonical head so we can add the block and attestations to fork + // Take a write-lock on fork choice so we can add the block and attestations to fork // choice. // - // Later, we will downgrade this lock to a read-lock and hold it whilst we write the block - // and states to disk. - // // # WARNING // - // It is important to avoid interleaving this canonical head write-lock with any other - // locks. - let mut canonical_head_write_lock = self.canonical_head.write(); + // It is important to avoid interleaving this write-lock with any other locks, *especially* + // the `canonical_head.cached_head` lock. + let mut fork_choice_write_lock = self + .canonical_head + .block_processing_fork_choice_write_lock(); // Register the new block with the fork choice service. { @@ -2834,8 +2788,7 @@ impl BeaconChain { .seconds_from_current_slot_start(self.spec.seconds_per_slot) .ok_or(Error::UnableToComputeTimeAtSlot)?; - canonical_head_write_lock - .fork_choice + fork_choice_write_lock .on_block( current_slot, block, @@ -2849,19 +2802,17 @@ impl BeaconChain { } // Register each indexed attestation in the block with the fork choice service. - for indexed_attestation in indexed_attestations { - match canonical_head_write_lock.fork_choice.on_attestation( - current_slot, - &indexed_attestation, - AttestationFromBlock::True, - ) { - Ok(()) => Ok(()), - // Ignore invalid attestations whilst importing attestations from a block. The - // block might be very old and therefore the attestations useless to fork choice. - Err(ForkChoiceError::InvalidAttestation(_)) => Ok(()), - Err(e) => Err(BlockError::BeaconChainError(e.into())), - }?; - } + match fork_choice_write_lock.on_attestations( + current_slot, + &indexed_attestations, + AttestationFromBlock::True, + ) { + Ok(()) => Ok(()), + // Ignore invalid attestations whilst importing attestations from a block. The + // block might be very old and therefore the attestations useless to fork choice. + Err(ForkChoiceError::InvalidAttestation(_)) => Ok(()), + Err(e) => Err(BlockError::BeaconChainError(e.into())), + }?; // If the block is recent enough and it was not optimistically imported, check to see if it // becomes the head block. If so, apply it to the early attester cache. This will allow @@ -2877,15 +2828,12 @@ impl BeaconChain { if !payload_verification_status.is_optimistic() && block.slot() + EARLY_ATTESTER_CACHE_HISTORIC_SLOTS >= current_slot { - let new_head_root = canonical_head_write_lock - .fork_choice + let new_head_root = fork_choice_write_lock .get_head(current_slot, &self.spec) .map_err(BeaconChainError::from)?; if new_head_root == block_root { - if let Some(proto_block) = - canonical_head_write_lock.fork_choice.get_block(&block_root) - { + if let Some(proto_block) = fork_choice_write_lock.get_block(&block_root) { if let Err(e) = self.early_attester_cache.add_head_block( block_root, signed_block.clone(), @@ -2914,12 +2862,6 @@ impl BeaconChain { { let _db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE); - // Downgrade the canonical head lock and hold it whilst we write to the database. This - // ensures that there can be no modifications to the canonical head until the database has - // been updated. This helps to prevent inconsistencies between fork choice and the on-disk - // database. - let canonical_head_read_lock = RwLockWriteGuard::downgrade(canonical_head_write_lock); - // Store the block and its state, and execute the confirmation batch for the intermediate // states, which will delete their temporary flags. // If the write fails, revert fork choice to the version from disk, else we can @@ -2944,32 +2886,30 @@ impl BeaconChain { // Since the write failed, try to revert the canonical head back to what was stored // in the database. This attempts to prevent inconsistency between the database and // fork choice. - match CanonicalHead::load_from_store(&self.store, &self.spec) { - Ok(past_canonical_head) => { - // Drop the transaction lock, it's no longer required and a deadlock risk since we - // may interact with the canonical_head lock again in this code path. - drop(txn_lock); - - // Drop the read-lock on the head and then take a write-lock. - // - // We don't care if someone mutates the head between dropping the read-lock and - // grabbing the write-lock since we're going to override it anyway. - drop(canonical_head_read_lock); - *self.canonical_head.write() = past_canonical_head - } - Err(e) => { - crit!( - self.log, - "No stored fork choice found to restore from"; - "error" => ?e, - "warning" => "The database is likely corrupt now, consider --purge-db" - ); - return Err(BlockError::BeaconChainError(e)); - } + if let Err(e) = self.canonical_head.restore_from_store( + fork_choice_write_lock, + &self.store, + &self.spec, + ) { + crit!( + self.log, + "No stored fork choice found to restore from"; + "error" => ?e, + "warning" => "The database is likely corrupt now, consider --purge-db" + ); + return Err(BlockError::BeaconChainError(e)); } + + return Err(e.into()); } + + drop(txn_lock); } + // The fork choice write-lock is dropped *after* the on-disk database has been updated. This + // prevents inconsistency between the two at the expense of concurrency. + drop(fork_choice_write_lock); + // We're declaring the block "imported" at this point, since fork choice and the DB know // about it. let block_time_imported = timestamp_now(); @@ -3182,7 +3122,7 @@ impl BeaconChain { // Atomically read some values from the head whilst avoiding holding the read-lock any // longer than necessary. let (head_slot, head_block_root) = { - let head = self.canonical_head.read(); + let head = self.canonical_head.cached_head_read_lock(); (head.head_slot(), head.head_block_root()) }; let (state, state_root_opt) = if head_slot < slot { @@ -3617,13 +3557,7 @@ impl BeaconChain { let inner_op = op.clone(); let fork_choice_result = self .spawn_blocking_handle( - move || { - chain - .canonical_head - .write() - .fork_choice - .on_invalid_execution_payload(&inner_op) - }, + move || chain.canonical_head.on_invalid_execution_payload(&inner_op), "invalid_payload_fork_choice_update", ) .await?; @@ -3659,13 +3593,7 @@ impl BeaconChain { let chain = self.clone(); let justified_block = self .spawn_blocking_handle( - move || { - chain - .canonical_head - .read() - .fork_choice - .get_justified_block() - }, + move || chain.canonical_head.get_justified_block(), "invalid_payload_fork_choice_get_justified", ) .await??; @@ -3701,7 +3629,7 @@ impl BeaconChain { } pub fn block_is_known_to_fork_choice(&self, root: &Hash256) -> bool { - self.canonical_head.read().fork_choice.contains_block(root) + self.canonical_head.contains_block(root) } /// Determines the beacon proposer for the next slot. If that proposer is registered in the @@ -3748,21 +3676,18 @@ impl BeaconChain { let (head_slot, head_root, head_decision_root, head_random, forkchoice_update_params) = self.spawn_blocking_handle( move || { - let canonical_head = chain.canonical_head.read(); - let head_block_root = canonical_head.head_block_root(); - let decision_root = canonical_head - .head_snapshot + let cached_head = chain.canonical_head.cached_head_read_lock(); + let head_block_root = cached_head.head_block_root(); + let decision_root = cached_head + .snapshot .beacon_state .proposer_shuffling_decision_root(head_block_root)?; Ok::<_, Error>(( - canonical_head.head_slot(), + cached_head.head_slot(), head_block_root, decision_root, - canonical_head.head_random()?, - canonical_head - .fork_choice - .get_forkchoice_update_parameters() - .ok_or(Error::ForkchoiceUpdateParamsMissing)?, + cached_head.head_random()?, + cached_head.forkchoice_update_parameters(), )) }, "prepare_beacon_proposer_fork_choice_read", @@ -4055,8 +3980,6 @@ impl BeaconChain { move || { chain .canonical_head - .write() - .fork_choice .on_valid_execution_payload(head_block_root) }, "update_execution_engine_invalid_payload", @@ -4153,8 +4076,6 @@ impl BeaconChain { Ok(false) } else { self.canonical_head - .read() - .fork_choice .is_optimistic_block(&block.canonical_root()) .map_err(BeaconChainError::ForkChoiceError) } @@ -4180,8 +4101,6 @@ impl BeaconChain { Ok(false) } else { self.canonical_head - .read() - .fork_choice .is_optimistic_block_no_fallback(&head_block.canonical_root()) .map_err(BeaconChainError::ForkChoiceError) } @@ -4196,17 +4115,9 @@ impl BeaconChain { /// There is a potential race condition when syncing where the block root of `head_info` could /// be pruned from the fork choice store before being read. pub fn is_optimistic_head(&self) -> Result { - let canonical_head_lock = self.canonical_head.read(); - let head = &canonical_head_lock.head_snapshot; - - if self.slot_is_prior_to_bellatrix(head.beacon_block.slot()) { - Ok(false) - } else { - canonical_head_lock - .fork_choice - .is_optimistic_block_no_fallback(&head.beacon_block_root) - .map_err(BeaconChainError::ForkChoiceError) - } + self.canonical_head + .head_execution_status() + .map(|status| status.is_optimistic()) } pub fn is_optimistic_block_root( @@ -4219,8 +4130,6 @@ impl BeaconChain { Ok(false) } else { self.canonical_head - .read() - .fork_choice .is_optimistic_block_no_fallback(block_root) .map_err(BeaconChainError::ForkChoiceError) } @@ -4362,8 +4271,6 @@ impl BeaconChain { { let head_block = self .canonical_head - .read() - .fork_choice .get_block(&head_block_root) .ok_or(Error::MissingBeaconBlock(head_block_root))?; @@ -4509,11 +4416,11 @@ impl BeaconChain { let mut dump = vec![]; let mut last_slot = { - let head = self.canonical_head.read(); + let head = self.canonical_head.cached_head_read_lock(); BeaconSnapshot { - beacon_block: Arc::new(head.head_snapshot.beacon_block.clone_as_blinded()), - beacon_block_root: head.head_snapshot.beacon_block_root, - beacon_state: head.head_snapshot.beacon_state.clone(), + beacon_block: Arc::new(head.snapshot.beacon_block.clone_as_blinded()), + beacon_block_root: head.snapshot.beacon_block_root, + beacon_state: head.snapshot.beacon_state.clone(), } }; @@ -4579,7 +4486,10 @@ impl BeaconChain { } pub fn dump_as_dot(&self, output: &mut W) { - let canonical_head_hash = self.canonical_head.read().head_snapshot.beacon_block_root; + let canonical_head_hash = self + .canonical_head + .cached_head_read_lock() + .head_block_root(); let mut visited: HashSet = HashSet::new(); let mut finalized_blocks: HashSet = HashSet::new(); let mut justified_blocks: HashSet = HashSet::new(); diff --git a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs index 3231a1e8092..146d2da3dd7 100644 --- a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs +++ b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs @@ -140,22 +140,23 @@ pub fn compute_proposer_duties_from_head( ) -> Result<(Vec, Hash256, ExecutionStatus, Fork), BeaconChainError> { // Atomically collect information about the head whilst hogging the `canonical_head_lock` as // little as possible. - let (mut state, head_state_root, execution_status) = { - let head = chain.canonical_head.read(); + let (mut state, head_state_root, head_block_root) = { + let head = chain.canonical_head.cached_head_read_lock(); // Take a copy of the head state. let head_state = head - .head_snapshot + .snapshot .beacon_state .clone_with(CloneConfig::committee_caches_only()); let head_state_root = head.head_state_root(); let head_block_root = head.head_block_root(); - let execution_status = head - .fork_choice - .get_block_execution_status(&head_block_root) - .ok_or(BeaconChainError::HeadMissingFromForkChoice(head_block_root))?; - (head_state, head_state_root, execution_status) + (head_state, head_state_root, head_block_root) }; + let execution_status = chain + .canonical_head + .get_block_execution_status(&head_block_root) + .ok_or(BeaconChainError::HeadMissingFromForkChoice(head_block_root))?; + // Advance the state into the requested epoch. ensure_state_is_in_epoch(&mut state, head_state_root, current_epoch, &chain.spec)?; diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 92deb262109..f8dd2ee0105 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -58,7 +58,7 @@ use crate::{ use derivative::Derivative; use eth2::types::EventKind; use execution_layer::PayloadStatus; -use fork_choice::{ForkChoice, ForkChoiceStore, PayloadVerificationStatus}; +use fork_choice::PayloadVerificationStatus; use parking_lot::RwLockReadGuard; use proto_array::Block as ProtoBlock; use safe_arith::ArithError; @@ -77,7 +77,7 @@ use std::fs; use std::io::Write; use std::sync::Arc; use std::time::Duration; -use store::{Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, StoreOp}; +use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp}; use task_executor::JoinHandle; use tree_hash::TreeHash; use types::{ @@ -672,12 +672,7 @@ impl GossipVerifiedBlock { // reboot if the `observed_block_producers` cache is empty. In that case, without this // check, we will load the parent and state from disk only to find out later that we // already know this block. - if chain - .canonical_head - .read() - .fork_choice - .contains_block(&block_root) - { + if chain.canonical_head.contains_block(&block_root) { return Err(BlockError::BlockIsAlreadyKnown); } @@ -697,11 +692,7 @@ impl GossipVerifiedBlock { // Do not process a block that doesn't descend from the finalized root. // // We check this *before* we load the parent so that we can return a more detailed error. - check_block_is_finalized_descendant::( - &block, - &chain.canonical_head.read().fork_choice, - &chain.store, - )?; + check_block_is_finalized_descendant(chain, &block)?; let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); let (parent_block, block) = verify_parent_block_is_known(chain, block)?; @@ -1028,12 +1019,7 @@ impl ExecutionPendingBlock { parent: PreProcessingSnapshot, chain: &Arc>, ) -> Result> { - if let Some(parent) = chain - .canonical_head - .read() - .fork_choice - .get_block(&block.parent_root()) - { + if let Some(parent) = chain.canonical_head.get_block(&block.parent_root()) { // Reject any block where the parent has an invalid payload. It's impossible for a valid // block to descend from an invalid parent. if parent.execution_status.is_invalid() { @@ -1214,16 +1200,12 @@ impl ExecutionPendingBlock { let is_optimistic_candidate = chain .spawn_blocking_handle( move || { - inner_chain - .canonical_head - .read() - .fork_choice - .is_optimistic_candidate_block( - current_slot, - block_slot, - &block_parent_root, - &inner_chain.spec, - ) + inner_chain.canonical_head.is_optimistic_candidate_block( + current_slot, + block_slot, + &block_parent_root, + &inner_chain.spec, + ) }, "validate_merge_block_optimistic_candidate", ) @@ -1441,7 +1423,7 @@ fn check_block_against_finalized_slot( ) -> Result<(), BlockError> { let finalized_slot = chain .canonical_head - .read() + .cached_head_read_lock() .finalized_checkpoint() .epoch .start_slot(T::EthSpec::slots_per_epoch()); @@ -1458,12 +1440,14 @@ fn check_block_against_finalized_slot( } /// Returns `Ok(block)` if the block descends from the finalized root. -pub fn check_block_is_finalized_descendant>( +pub fn check_block_is_finalized_descendant( + chain: &BeaconChain, block: &Arc>, - fork_choice: &ForkChoice, - store: &HotColdDB, ) -> Result<(), BlockError> { - if fork_choice.is_descendant_of_finalized(block.parent_root()) { + if chain + .canonical_head + .is_descendant_of_finalized(block.parent_root()) + { Ok(()) } else { // If fork choice does *not* consider the parent to be a descendant of the finalized block, @@ -1474,7 +1458,8 @@ pub fn check_block_is_finalized_descendant( // Check if the block is already known. We know it is post-finalization, so it is // sufficient to check the fork choice. - if chain - .canonical_head - .read() - .fork_choice - .contains_block(&block_root) - { + if chain.canonical_head.contains_block(&block_root) { return Err(BlockError::BlockIsAlreadyKnown); } @@ -1561,8 +1541,6 @@ fn verify_parent_block_is_known( ) -> Result<(ProtoBlock, Arc>), BlockError> { if let Some(proto_block) = chain .canonical_head - .read() - .fork_choice .get_block(&block.message().parent_root()) { Ok((proto_block, block)) @@ -1598,12 +1576,7 @@ fn load_parent( // because it will revert finalization. Note that the finalized block is stored in fork // choice, so we will not reject any child of the finalized block (this is relevant during // genesis). - if !chain - .canonical_head - .read() - .fork_choice - .contains_block(&block.parent_root()) - { + if !chain.canonical_head.contains_block(&block.parent_root()) { return Err(BlockError::ParentUnknown(block)); } @@ -1799,7 +1772,7 @@ fn verify_header_signature( .get(header.message.proposer_index as usize) .cloned() .ok_or(BlockError::UnknownValidator(header.message.proposer_index))?; - let head_fork = chain.canonical_head.read().head_fork(); + let head_fork = chain.canonical_head.cached_head_read_lock().head_fork(); if header.verify_signature::( &proposer_pubkey, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 599ceb11d98..35f57022467 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1,6 +1,4 @@ -use crate::beacon_chain::{ - CanonicalHead, FastCanonicalHead, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY, -}; +use crate::beacon_chain::{CanonicalHead, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY}; use crate::eth1_chain::{CachingEth1Backend, SszEth1}; use crate::fork_choice_signal::ForkChoiceSignalTx; use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary}; @@ -730,10 +728,6 @@ where let genesis_validators_root = head_snapshot.beacon_state.genesis_validators_root(); let genesis_time = head_snapshot.beacon_state.genesis_time(); let head_for_snapshot_cache = head_snapshot.clone(); - let fast_canonical_head = FastCanonicalHead::new::< - Witness, - >(&fork_choice, &head_snapshot, &self.spec) - .map_err(|e| format!("Error creating fast canonical head: {:?}", e))?; let canonical_head = CanonicalHead::new(fork_choice, head_snapshot) .map_err(|e| format!("Error creating canonical head: {:?}", e))?; @@ -775,8 +769,7 @@ where execution_layer: self.execution_layer, genesis_validators_root, genesis_time, - canonical_head: RwLock::new(canonical_head).into(), - fast_canonical_head: RwLock::new(fast_canonical_head).into(), + canonical_head, genesis_block_root, genesis_state_root, fork_choice_signal_tx, diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index e77747be40b..2211afd5a91 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -1,21 +1,27 @@ +use crate::persisted_fork_choice::PersistedForkChoice; use crate::{ - beacon_chain::{BeaconForkChoice, BeaconStore, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT}, + beacon_chain::{ + BeaconForkChoice, BeaconStore, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, FORK_CHOICE_DB_KEY, + }, block_times_cache::BlockTimesCache, events::ServerSentEventHandler, metrics, validator_monitor::{get_slot_delay_ms, timestamp_now}, - BeaconChain, BeaconChainError as Error, BeaconChainTypes, BeaconSnapshot, + BeaconChain, BeaconChainError as Error, BeaconChainTypes, BeaconSnapshot, ForkChoiceError, }; use eth2::types::{EventKind, SseChainReorg, SseFinalizedCheckpoint, SseHead, SseLateHead}; -use fork_choice::{ExecutionStatus, ForkChoiceView, ForkchoiceUpdateParameters, ProtoBlock}; +use fork_choice::{ + AttestationFromBlock, ExecutionStatus, ForkChoiceView, ForkchoiceUpdateParameters, + InvalidationOperation, PayloadVerificationStatus, ProtoBlock, +}; use itertools::process_results; -use parking_lot::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard}; +use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; use slog::{crit, debug, error, warn, Logger}; use slot_clock::SlotClock; use std::mem; use std::sync::Arc; use std::time::Duration; -use store::iter::StateRootsIterator; +use store::{iter::StateRootsIterator, KeyValueStoreOp, StoreItem}; use task_executor::{JoinHandle, ShutdownReason}; use types::*; @@ -30,7 +36,12 @@ impl From> for CanonicalHeadRwLock { } } +// TODO(paul): make all these functions private. impl CanonicalHeadRwLock { + fn new(item: T) -> Self { + Self::from(RwLock::new(item)) + } + pub fn read(&self) -> RwLockReadGuard { self.0.read() } @@ -44,61 +55,145 @@ impl CanonicalHeadRwLock { } } -/// A simple wrapper around an `RwLock` to prevent access to the lock from anywhere else other than -/// this file. -pub struct FastCanonicalHeadRwLock(RwLock); +pub struct CachedHead { + /// Provides the head block and state from the last time the head was updated. + pub snapshot: BeaconSnapshot, + /// The justified checkpoint as per `self.fork_choice`. + /// + /// This value may be distinct to the `self.head_snapshot.beacon_state.justified_checkpoint`. + /// This value should be used over the beacon state value in practically all circumstances. + justified_checkpoint: Checkpoint, + /// The finalized checkpoint as per `self.fork_choice`. + /// + /// This value may be distinct to the `self.head_snapshot.beacon_state.finalized_checkpoint`. + /// This value should be used over the beacon state value in practically all circumstances. + finalized_checkpoint: Checkpoint, + /// The `execution_payload.block_hash` of the block at the head of the chain. Set to `None` + /// before Bellatrix. + head_hash: Option, + /// The `execution_payload.block_hash` of the finalized block. Set to `None` before Bellatrix. + finalized_hash: Option, +} -impl From> for FastCanonicalHeadRwLock { - fn from(rw_lock: RwLock) -> Self { - Self(rw_lock) +impl CachedHead { + /// Returns root of the block at the head of the beacon chain. + pub fn head_block_root(&self) -> Hash256 { + self.snapshot.beacon_block_root } -} -impl FastCanonicalHeadRwLock { - /// Do not make this function public without considering the risk of deadlocks when interacting - /// with the `canonical_head` lock. - fn read(&self) -> RwLockReadGuard { - self.0.read() + /// Returns root of the `BeaconState` at the head of the beacon chain. + /// + /// ## Note + /// + /// This `BeaconState` has *not* been advanced to the current slot, it has the same slot as the + /// head block. + pub fn head_state_root(&self) -> Hash256 { + self.snapshot.beacon_state_root() } - /// Do not make this function public without considering the risk of deadlocks when interacting - /// with the `canonical_head` lock. - fn write(&self) -> RwLockWriteGuard { - self.0.write() + /// Returns slot of the block at the head of the beacon chain. + /// + /// ## Notes + /// + /// This is *not* the current slot as per the system clock. + pub fn head_slot(&self) -> Slot { + self.snapshot.beacon_block.slot() + } + + /// Returns the `Fork` from the `BeaconState` at the head of the chain. + pub fn head_fork(&self) -> Fork { + self.snapshot.beacon_state.fork() + } + + /// Returns the randao mix for the block at the head of the chain. + pub fn head_random(&self) -> Result { + let state = &self.snapshot.beacon_state; + let root = *state.get_randao_mix(state.current_epoch())?; + Ok(root) + } + + /// Returns the finalized checkpoint, as determined by fork choice. + /// + /// ## Note + /// + /// This is *not* the finalized checkpoint of the `head_snapshot.beacon_state`, rather it is the + /// best finalized checkpoint that has been observed by `self.fork_choice`. It is possible that + /// the `head_snapshot.beacon_state` finalized value is earlier than the one returned here. + pub fn finalized_checkpoint(&self) -> Checkpoint { + self.finalized_checkpoint + } + + /// Returns the justified checkpoint, as determined by fork choice. + /// + /// ## Note + /// + /// This is *not* the "current justified checkpoint" of the `head_snapshot.beacon_state`, rather + /// it is the justified checkpoint in the view of `self.fork_choice`. It is possible that the + /// `head_snapshot.beacon_state` justified value is different to, but not conflicting with, the + /// one returned here. + pub fn justified_checkpoint(&self) -> Checkpoint { + self.justified_checkpoint + } + + pub fn forkchoice_update_parameters(&self) -> ForkchoiceUpdateParameters { + ForkchoiceUpdateParameters { + head_root: self.snapshot.beacon_block_root, + head_hash: self.head_hash, + finalized_hash: self.finalized_hash, + } } } -/// A smaller version of `CanonicalHead` designed to have very little lock contention but with the -/// downside of sometimes being slightly behind the `CanonicalHead`. -#[derive(Clone, Debug, PartialEq)] -pub struct FastCanonicalHead { - pub head_block_root: Hash256, - pub head_block_slot: Slot, - pub justified_checkpoint: Checkpoint, - pub finalized_checkpoint: Checkpoint, - pub active_validator_count: usize, +pub struct BlockProcessingForkChoiceWriteLock<'a, T: BeaconChainTypes> { + fork_choice: RwLockWriteGuard<'a, BeaconForkChoice>, } -impl FastCanonicalHead { - pub fn new( - fork_choice: &BeaconForkChoice, - head_snapshot: &BeaconSnapshot, +impl<'a, T: BeaconChainTypes> BlockProcessingForkChoiceWriteLock<'a, T> { + pub fn get_block(&self, block_root: &Hash256) -> Option { + self.fork_choice.get_block(block_root) + } + + #[allow(clippy::too_many_arguments)] + pub fn on_block>( + &mut self, + current_slot: Slot, + block: BeaconBlockRef, + block_root: Hash256, + block_delay: Duration, + state: &BeaconState, + payload_verification_status: PayloadVerificationStatus, spec: &ChainSpec, - ) -> Result { - let state = &head_snapshot.beacon_state; - let fork_choice_view = fork_choice.cached_fork_choice_view(); + ) -> Result<(), ForkChoiceError> { + self.fork_choice.on_block( + current_slot, + block, + block_root, + block_delay, + state, + payload_verification_status, + spec, + ) + } - let active_validator_count = state - .get_active_validator_indices(state.current_epoch(), spec)? - .len(); + pub fn on_attestations( + &mut self, + current_slot: Slot, + attestations: &[IndexedAttestation], + is_from_block: AttestationFromBlock, + ) -> Result<(), ForkChoiceError> { + for indexed_attestation in attestations { + self.fork_choice + .on_attestation(current_slot, indexed_attestation, is_from_block)? + } + Ok(()) + } - Ok(Self { - head_block_root: head_snapshot.beacon_block_root, - head_block_slot: head_snapshot.beacon_block.slot(), - justified_checkpoint: fork_choice_view.justified_checkpoint, - finalized_checkpoint: fork_choice_view.finalized_checkpoint, - active_validator_count, - }) + pub fn get_head( + &mut self, + current_slot: Slot, + spec: &ChainSpec, + ) -> Result { + self.fork_choice.get_head(current_slot, spec) } } @@ -115,19 +210,17 @@ impl FastCanonicalHead { pub struct CanonicalHead { /// Provides an in-memory representation of the non-finalized block tree and is used to run the /// fork choice algorithm and determine the canonical head. - pub fork_choice: BeaconForkChoice, - /// The justified checkpoint as per `self.fork_choice`. + pub fork_choice: CanonicalHeadRwLock>, + /// Provides values cached from a previous execution of `self.fork_choice.get_head`. /// - /// This value may be distinct to the `self.head_snapshot.beacon_state.justified_checkpoint`. - /// This value should be used over the beacon state value in practically all circumstances. - justified_checkpoint: Checkpoint, - /// The finalized checkpoint as per `self.fork_choice`. + /// Although `self.fork_choice` might be slightly more advanced that this value, it is safe to + /// consider that these values represent the "canonical head" of the beacon chain. + pub cached_head: CanonicalHeadRwLock>, + /// A lock used to prevent concurrent runs of `BeaconChain::recompute_head`. /// - /// This value may be distinct to the `self.head_snapshot.beacon_state.finalized_checkpoint`. - /// This value should be used over the beacon state value in practically all circumstances. - finalized_checkpoint: Checkpoint, - /// Provides the head block and state from the last time the head was updated. - pub head_snapshot: BeaconSnapshot, + /// This lock *should not* be made public, it should only be accessed via designated getter + /// methods. + recompute_head_lock: Mutex<()>, } impl CanonicalHead { @@ -137,20 +230,36 @@ impl CanonicalHead { /// `head_snapshot`. pub fn new( fork_choice: BeaconForkChoice, - head_snapshot: BeaconSnapshot, + snapshot: BeaconSnapshot, ) -> Result { let fork_choice_view = fork_choice.cached_fork_choice_view(); - - Ok(Self { - fork_choice, + let forkchoice_update_params = fork_choice.get_forkchoice_update_parameters(); + let cached_head = CachedHead { + snapshot, justified_checkpoint: fork_choice_view.justified_checkpoint, finalized_checkpoint: fork_choice_view.finalized_checkpoint, - head_snapshot, + head_hash: forkchoice_update_params.head_hash, + finalized_hash: forkchoice_update_params.finalized_hash, + }; + + Ok(Self { + fork_choice: CanonicalHeadRwLock::new(fork_choice), + cached_head: CanonicalHeadRwLock::new(cached_head), + recompute_head_lock: Mutex::new(()), }) } - /// Instantiate `Self`, loading the latest persisted `fork_choice` from the `store`. - pub fn load_from_store(store: &BeaconStore, spec: &ChainSpec) -> Result { + pub(crate) fn restore_from_store( + &self, + block_processing_guard: BlockProcessingForkChoiceWriteLock, + store: &BeaconStore, + spec: &ChainSpec, + ) -> Result<(), Error> { + // We don't actually *need* the block processing guard, but we just pass it because the only + // place that calls this function is block processing and we'll get a deadlock if this guard + // isn't dropped. + drop(block_processing_guard); + let fork_choice = >::load_fork_choice(store.clone(), spec)? .ok_or(Error::MissingPersistedForkChoice)?; let fork_choice_view = fork_choice.cached_fork_choice_view(); @@ -163,42 +272,34 @@ impl CanonicalHead { .get_state(&beacon_state_root, Some(beacon_block.slot()))? .ok_or(Error::MissingBeaconState(beacon_state_root))?; - let head_snapshot = BeaconSnapshot { + let snapshot = BeaconSnapshot { beacon_block_root, beacon_block: Arc::new(beacon_block), beacon_state, }; - Self::new(fork_choice, head_snapshot) - } - - /// Returns root of the block at the head of the beacon chain. - pub fn head_block_root(&self) -> Hash256 { - self.head_snapshot.beacon_block_root - } + let fork_choice_view = fork_choice.cached_fork_choice_view(); + let forkchoice_update_params = fork_choice.get_forkchoice_update_parameters(); + let cached_head = CachedHead { + snapshot, + justified_checkpoint: fork_choice_view.justified_checkpoint, + finalized_checkpoint: fork_choice_view.finalized_checkpoint, + head_hash: forkchoice_update_params.head_hash, + finalized_hash: forkchoice_update_params.finalized_hash, + }; - /// Returns root of the `BeaconState` at the head of the beacon chain. - /// - /// ## Note - /// - /// This `BeaconState` has *not* been advanced to the current slot, it has the same slot as the - /// head block. - pub fn head_state_root(&self) -> Hash256 { - self.head_snapshot.beacon_state_root() - } + *self.fork_choice.write() = fork_choice; + *self.cached_head.write() = cached_head; - /// Returns slot of the block at the head of the beacon chain. - /// - /// ## Notes - /// - /// This is *not* the current slot as per the system clock. - pub fn head_slot(&self) -> Slot { - self.head_snapshot.beacon_block.slot() + Ok(()) } - /// Returns the `Fork` from the `BeaconState` at the head of the chain. - pub fn head_fork(&self) -> Fork { - self.head_snapshot.beacon_state.fork() + pub(crate) fn block_processing_fork_choice_write_lock( + &self, + ) -> BlockProcessingForkChoiceWriteLock { + BlockProcessingForkChoiceWriteLock { + fork_choice: self.fork_choice_write_lock(), + } } /// Returns the execution status of the block at the head of the beacon chain. @@ -207,54 +308,116 @@ impl CanonicalHead { /// significantly past the cached `head_snapshot`. In such a scenario is it likely prudent to /// run `BeaconChain::recompute_head` to update the cached values. pub fn head_execution_status(&self) -> Result { - let head_block_root = self.head_block_root(); + let head_block_root = self.cached_head.read().head_block_root(); self.fork_choice + .read() .get_block_execution_status(&head_block_root) .ok_or(Error::HeadMissingFromForkChoice(head_block_root)) } - /// Returns the randao mix for the block at the head of the chain. - pub fn head_random(&self) -> Result { - let state = &self.head_snapshot.beacon_state; - let root = *state.get_randao_mix(state.current_epoch())?; - Ok(root) + pub fn cached_head_read_lock(&self) -> RwLockReadGuard> { + self.cached_head.read() } - /// Returns the finalized checkpoint, as determined by fork choice. - /// - /// ## Note - /// - /// This is *not* the finalized checkpoint of the `head_snapshot.beacon_state`, rather it is the - /// best finalized checkpoint that has been observed by `self.fork_choice`. It is possible that - /// the `head_snapshot.beacon_state` finalized value is earlier than the one returned here. - pub fn finalized_checkpoint(&self) -> Checkpoint { - self.finalized_checkpoint + fn cached_head_write_lock(&self) -> RwLockWriteGuard> { + self.cached_head.write() } - /// Returns the justified checkpoint, as determined by fork choice. - /// - /// ## Note - /// - /// This is *not* the "current justified checkpoint" of the `head_snapshot.beacon_state`, rather - /// it is the justified checkpoint in the view of `self.fork_choice`. It is possible that the - /// `head_snapshot.beacon_state` justified value is different to, but not conflicting with, the - /// one returned here. - pub fn justified_checkpoint(&self) -> Checkpoint { - self.justified_checkpoint + pub fn fork_choice_read_lock(&self) -> RwLockReadGuard> { + self.fork_choice.read() + } + + fn fork_choice_write_lock(&self) -> RwLockWriteGuard> { + self.fork_choice.write() + } + + pub fn on_valid_execution_payload(&self, block_root: Hash256) -> Result<(), ForkChoiceError> { + self.fork_choice_write_lock() + .on_valid_execution_payload(block_root) + } + + pub fn on_invalid_execution_payload( + &self, + op: &InvalidationOperation, + ) -> Result<(), ForkChoiceError> { + self.fork_choice_write_lock() + .on_invalid_execution_payload(op) + } + + pub fn contains_block(&self, block_root: &Hash256) -> bool { + self.fork_choice_read_lock().contains_block(block_root) + } + + pub fn get_block(&self, block_root: &Hash256) -> Option { + self.fork_choice_read_lock().get_block(block_root) + } + + pub fn get_block_execution_status(&self, block_root: &Hash256) -> Option { + self.fork_choice_read_lock() + .get_block_execution_status(block_root) + } + + pub fn get_justified_block(&self) -> Result { + self.fork_choice_read_lock().get_justified_block() + } + + pub fn is_optimistic_candidate_block( + &self, + current_slot: Slot, + block_slot: Slot, + block_parent_root: &Hash256, + spec: &ChainSpec, + ) -> Result { + self.fork_choice_read_lock().is_optimistic_candidate_block( + current_slot, + block_slot, + block_parent_root, + spec, + ) + } + + pub fn is_optimistic_block(&self, block_root: &Hash256) -> Result { + self.fork_choice_read_lock().is_optimistic_block(block_root) + } + + pub fn is_optimistic_block_no_fallback( + &self, + block_root: &Hash256, + ) -> Result { + self.fork_choice_read_lock() + .is_optimistic_block_no_fallback(block_root) + } + + pub fn is_descendant_of_finalized(&self, block_root: Hash256) -> bool { + self.fork_choice_read_lock() + .is_descendant_of_finalized(block_root) + } + + pub fn on_attestation( + &self, + current_slot: Slot, + attestation: &IndexedAttestation, + is_from_block: AttestationFromBlock, + ) -> Result<(), ForkChoiceError> { + self.fork_choice_write_lock() + .on_attestation(current_slot, attestation, is_from_block) + } + + pub fn get_ancestor_at_or_below_slot( + &self, + block_root: &Hash256, + target_slot: Slot, + ) -> Option { + self.fork_choice_read_lock() + .proto_array() + .core_proto_array() + .iter_block_roots(block_root) + .find(|(_, slot)| *slot <= target_slot) + .map(|(block_root, _)| block_root) } } impl BeaconChain { - /// Returns a summary of the `CanonicalHead`. It is "fast" since it lives behind it's own - /// `RwLock` which should have very little contention. The downsides are that it only has - /// limited information about the head and it might lag behind the `CanonicalHead` very slightly - /// (generally on the order of milliseconds). - /// - /// This method should be used by tasks which are very sensitive to delays caused by lock - /// contention, like the networking stack. - pub fn fast_canonical_head(&self) -> FastCanonicalHead { - self.fast_canonical_head.read().clone() - } /// Execute the fork choice algorithm and enthrone the result as the canonical head. /// /// This method replaces the old `BeaconChain::fork_choice` method. @@ -326,35 +489,38 @@ impl BeaconChain { self: &Arc, current_slot: Slot, ) -> Result>>, Error> { - let mut canonical_head_write_lock = self.canonical_head.write(); + let recompute_head_lock = self.canonical_head.recompute_head_lock.lock(); - // Take note of the last-known head and finalization values. + // Atomically read the cached head and FFG checkpoints. // - // It is important to read the `fork_choice_view` from the canonical head rather than from + // It is important to read the `fork_choice_view` from the cached head rather than from // fork choice, since the fork choice value might have changed between calls to this // function. We are interested in the changes since we last cached the head values, not // since fork choice was last run. - let old_view = ForkChoiceView { - head_block_root: canonical_head_write_lock.head_block_root(), - finalized_checkpoint: canonical_head_write_lock.finalized_checkpoint(), - justified_checkpoint: canonical_head_write_lock.justified_checkpoint(), + let old_view = { + let cached_head = self.canonical_head.cached_head_read_lock(); + ForkChoiceView { + head_block_root: cached_head.head_block_root(), + justified_checkpoint: cached_head.justified_checkpoint(), + finalized_checkpoint: cached_head.finalized_checkpoint(), + } }; + let mut fork_choice_write_lock = self.canonical_head.fork_choice_write_lock(); + // Recompute the current head via the fork choice algorithm. - canonical_head_write_lock - .fork_choice - .get_head(current_slot, &self.spec)?; + fork_choice_write_lock.get_head(current_slot, &self.spec)?; // Read the current head value from the fork choice algorithm. - let new_view = canonical_head_write_lock - .fork_choice - .cached_fork_choice_view(); + let new_view = fork_choice_write_lock.cached_fork_choice_view(); + + // Downgrade the fork choice write-lock to a read lock, without allowing access to any + // other writers. + let fork_choice_read_lock = RwLockWriteGuard::downgrade(fork_choice_write_lock); // Check to ensure that the finalized block hasn't been marked as invalid. If it has, // shut down Lighthouse. - let finalized_proto_block = canonical_head_write_lock - .fork_choice - .get_finalized_block()?; + let finalized_proto_block = fork_choice_read_lock.get_finalized_block()?; check_finalized_payload_validity(self, &finalized_proto_block)?; // Sanity check the finalized checkpoint. @@ -363,8 +529,7 @@ impl BeaconChain { // finalized checkpoint. check_against_finality_reversion(&old_view, &new_view)?; - let new_head_proto_block = canonical_head_write_lock - .fork_choice + let new_head_proto_block = fork_choice_read_lock .get_block(&new_view.head_block_root) .ok_or(Error::HeadBlockMissingFromForkChoice( new_view.head_block_root, @@ -400,29 +565,21 @@ impl BeaconChain { return Ok(None); } - perform_debug_logging::( - &old_view, - &new_view, - &canonical_head_write_lock.fork_choice, - &self.log, - ); + // Get the parameters to update the execution layer since either the head or some finality + // parameters have changed. + let forkchoice_update_parameters = fork_choice_read_lock.get_forkchoice_update_parameters(); + + perform_debug_logging::(&old_view, &new_view, &fork_choice_read_lock, &self.log); + + // Drop the read lock, it's no longer required and holding it any longer than necessary + // will just cause lock contention. + drop(fork_choice_read_lock); // If the head has changed, update `self.canonical_head`. - let (mut canonical_head_write_lock, head_update_params) = + let (cached_head_read_lock, head_update_params) = if new_view.head_block_root != old_view.head_block_root { metrics::inc_counter(&metrics::FORK_CHOICE_CHANGED_HEAD); - // Downgrade the write-lock to a read lock to avoid preventing all access to the head - // whilst the head snapshot is loaded. The docs note: - // - // > Note that if there are any writers currently waiting to take the lock then other - // > readers may not be able to acquire the lock even if it was downgraded. - // - // This means that other readers are not *guaranteed* access during this period, but - // there's a decent chance that there are no other writers and they'll be able to read. - let canonical_head_read_lock = - RwLockWriteGuard::downgrade_to_upgradable(canonical_head_write_lock); - // Try and obtain the snapshot for `beacon_block_root` from the snapshot cache, falling // back to a database read if that fails. // @@ -469,63 +626,42 @@ impl BeaconChain { .map(|()| snapshot) })?; - // Upgrade the read lock to a write lock, without allowing any other writers access in - // the meantime. - let mut canonical_head_write_lock = - RwLockUpgradableReadGuard::upgrade(canonical_head_read_lock); + // Now the new snapshot has been obtained, take a write-lock on the cached head so + // we can update it quickly. + let mut cached_head_write_lock = self.canonical_head.cached_head_write_lock(); + + // Enshrine the new snapshot as the head, keeping the old snapshot for later reference. + let old_head = mem::replace(&mut cached_head_write_lock.snapshot, new_head); + // Update the FFG values, since they might have also changed. + cached_head_write_lock.justified_checkpoint = new_view.justified_checkpoint; + cached_head_write_lock.finalized_checkpoint = new_view.finalized_checkpoint; - // Enshrine the new value as the head. - let old_head = mem::replace(&mut canonical_head_write_lock.head_snapshot, new_head); + // Downgrade the cached head write-lock to a read-lock. + let cached_head_read_lock = RwLockWriteGuard::downgrade(cached_head_write_lock); // Clear the early attester cache in case it conflicts with `self.canonical_head`. self.early_attester_cache.clear(); ( - canonical_head_write_lock, + cached_head_read_lock, Some((old_head, new_head_proto_block)), ) } else { - (canonical_head_write_lock, None) - }; + let mut cached_head_write_lock = self.canonical_head.cached_head_write_lock(); - // Update the FFG checkpoints on the `canonical_head`. - canonical_head_write_lock.justified_checkpoint = new_view.justified_checkpoint; - canonical_head_write_lock.finalized_checkpoint = new_view.finalized_checkpoint; + // Update the FFG values, since at least one of them changed if we're at this point + // in the function and the head didn't change. + cached_head_write_lock.justified_checkpoint = new_view.justified_checkpoint; + cached_head_write_lock.finalized_checkpoint = new_view.finalized_checkpoint; - // Downgrade the write-lock to a read-lock, without allowing any other writers access - // during the process. - // - // Holding the write-lock any longer than is required creates the risk of contention and - // deadlocks. This is especially relevant since later parts of this function will interact - // with other locks and potentially perform long-running operations. - // - // The `parking_lot` docs have this to say about downgraded write-locks: - // - // > Note that if there are any writers currently waiting to take the lock then other > - // readers may not be able to acquire the lock even if it was downgraded. - // - // This means that it's dangerous to take another read-lock on the `canonical_head` whilst - // we're holding this read-lock. - let canonical_head_read_lock = RwLockWriteGuard::downgrade(canonical_head_write_lock); + // Downgrade the cached head write-lock to a read-lock. + let cached_head_read_lock = RwLockWriteGuard::downgrade(cached_head_write_lock); - // Alias for readability. - let new_head = &canonical_head_read_lock.head_snapshot; + (cached_head_read_lock, None) + }; - // Update the fast canonical head, whilst holding the lock on the canonical head. - // - // Doing it whilst holding the read-lock ensures that the `canonical_head` and - // `fast_canonical_head` stay consistent (although the fast head might lag slightly behind - // the canonical head). - *self.fast_canonical_head.write() = FastCanonicalHead { - head_block_root: new_view.head_block_root, - head_block_slot: new_head.beacon_block.slot(), - justified_checkpoint: new_view.justified_checkpoint, - finalized_checkpoint: new_view.finalized_checkpoint, - active_validator_count: new_head - .beacon_state - .get_cached_active_validator_indices(RelativeEpoch::Current)? - .len(), - }; + // Alias for readability. + let new_head = &cached_head_read_lock.snapshot; // If the head changed, perform some updates. if let Some((old_head, new_head_proto_block)) = head_update_params { @@ -578,21 +714,18 @@ impl BeaconChain { } } - // Get the parameters toupdate the execution layer since either the head or some finality - // parameters have changed. - let forkchoice_update_parameters = canonical_head_read_lock - .fork_choice - .get_forkchoice_update_parameters() - .ok_or(Error::ForkchoiceUpdateParamsMissing)?; - - // The read-lock on the canonical head *MUST* be dropped before spawning the execution - // layer update tasks since they might try to take a write-lock on the canonical head. - drop(canonical_head_read_lock); + // The read-lock on the cached head *MUST* be dropped before spawning the execution layer + // update tasks since they might try to take a write-lock on the cached head. + drop(cached_head_read_lock); - // The read-lock on the canonical head *MUST* be dropped before this call since it might try to take a write-lock on the canonical head. + // The read-lock on the cached head *MUST* be dropped before this call since it might try to take a write-lock on the canonical head. let el_update_handle = spawn_execution_layer_updates(self.clone(), forkchoice_update_parameters)?; + // We have completed recomputing the head and it's now valid for another process to do the + // same. + drop(recompute_head_lock); + Ok(Some(el_update_handle)) } @@ -836,6 +969,22 @@ impl BeaconChain { Ok(()) } + + /// Return a database operation for writing fork choice to disk. + pub fn persist_fork_choice_in_batch(&self) -> KeyValueStoreOp { + Self::persist_fork_choice_in_batch_standalone(&self.canonical_head.fork_choice_read_lock()) + } + + /// Return a database operation for writing fork choice to disk. + pub fn persist_fork_choice_in_batch_standalone( + fork_choice: &BeaconForkChoice, + ) -> KeyValueStoreOp { + let persisted_fork_choice = PersistedForkChoice { + fork_choice: fork_choice.to_persisted(), + fork_choice_store: fork_choice.fc_store().to_persisted(), + }; + persisted_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY) + } } /// Check to see if the `finalized_proto_block` has an invalid execution payload. If so, shut down diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index 3b801999cf3..6bc056f1071 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -14,6 +14,7 @@ use crate::{ use execution_layer::PayloadStatus; use fork_choice::{InvalidationOperation, PayloadVerificationStatus}; use proto_array::{Block as ProtoBlock, ExecutionStatus}; +use slog::debug; use slot_clock::SlotClock; use state_processing::per_block_processing::{ compute_timestamp_at_slot, is_execution_enabled, is_merge_transition_complete, @@ -186,9 +187,44 @@ pub async fn validate_merge_block<'a, T: BeaconChainTypes>( parent_hash: execution_payload.parent_hash(), } .into()), - // Allow optimistic blocks here, the caller must ensure that the block is an optimistic - // candidate. - None => Ok(()), + None => { + let current_slot = chain + .slot_clock + .now() + .ok_or(BeaconChainError::UnableToReadSlot)?; + // Use a blocking task to check if the block is an optimistic candidate. Interacting + // with the `canonical_head` lock in an async task can block the core executor. + let inner_chain = chain.clone(); + let block_parent_root = block.parent_root(); + let block_slot = block.slot(); + let is_optimistic_candidate = chain + .spawn_blocking_handle( + move || { + inner_chain.canonical_head.is_optimistic_candidate_block( + current_slot, + block_slot, + &block_parent_root, + &inner_chain.spec, + ) + }, + "validate_merge_block_optimistic_candidate", + ) + .await + .map_err(BeaconChainError::from)? + .map_err(BeaconChainError::from)?; + + if is_optimistic_candidate { + debug!( + chain.log, + "Optimistically accepting terminal block"; + "block_hash" => ?execution_payload.parent_hash(), + "msg" => "the terminal block/parent was unavailable" + ); + Ok(()) + } else { + Err(ExecutionPayloadError::UnverifiedNonOptimisticCandidate.into()) + } + } } } @@ -372,8 +408,6 @@ where move || { inner_chain .canonical_head - .read() - .fork_choice .get_block(&finalized_checkpoint.root) }, "prepare_execution_payload_finalized_hash", diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 9100a52d323..3460a3f7085 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -53,7 +53,7 @@ pub use self::historical_blocks::HistoricalBlockError; pub use attestation_verification::Error as AttestationError; pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError}; pub use block_verification::{BlockError, ExecutionPayloadError, GossipVerifiedBlock}; -pub use canonical_head::{CanonicalHead, CanonicalHeadRwLock, FastCanonicalHead}; +pub use canonical_head::{CanonicalHead, CanonicalHeadRwLock}; pub use eth1_chain::{Eth1Chain, Eth1ChainBackend}; pub use events::ServerSentEventHandler; pub use fork_choice::ExecutionStatus; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 99b06c46971..4d0f63674ae 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1,6 +1,6 @@ use crate::observed_attesters::SlotSubcommitteeIndex; use crate::types::consts::altair::SYNC_COMMITTEE_SUBNET_COUNT; -use crate::{BeaconChain, BeaconChainTypes}; +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; use lazy_static::lazy_static; pub use lighthouse_metrics::*; use slot_clock::SlotClock; @@ -10,9 +10,6 @@ use types::{BeaconState, Epoch, EthSpec, Hash256, Slot}; /// The maximum time to wait for the snapshot cache lock during a metrics scrape. const SNAPSHOT_CACHE_TIMEOUT: Duration = Duration::from_millis(100); -/// The time to wait for the canonical head lock before just proceeding with other metrics. -const CANONICAL_HEAD_LOCK_TIMEOUT: Duration = Duration::from_secs(1); - lazy_static! { /* * Block Processing @@ -930,13 +927,10 @@ lazy_static! { /// Scrape the `beacon_chain` for metrics that are not constantly updated (e.g., the present slot, /// head state info, etc) and update the Prometheus `DEFAULT_REGISTRY`. pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { - if let Some(canonical_head) = beacon_chain - .canonical_head - .try_read_for(CANONICAL_HEAD_LOCK_TIMEOUT) - { - let head = &canonical_head.head_snapshot; + let _ = beacon_chain.with_head(|head| { scrape_head_state(&head.beacon_state, head.beacon_state_root()); - } + Ok::<_, BeaconChainError>(()) + }); if let Some(slot) = beacon_chain.slot_clock.now() { scrape_attestation_observation(slot, beacon_chain); diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index 0c5750f41d8..ee54c3f9ab5 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -258,7 +258,10 @@ fn advance_head( // // Fork-choice is not run *before* this function to avoid unnecessary calls whilst syncing. { - let head_slot = beacon_chain.canonical_head.read().head_slot(); + let head_slot = beacon_chain + .canonical_head + .cached_head_read_lock() + .head_slot(); // Don't run this when syncing or if lagging too far behind. if head_slot + MAX_ADVANCE_DISTANCE < current_slot { @@ -278,7 +281,10 @@ fn advance_head( // TODO(paul): try and re-enable this. // beacon_chain.fork_choice()?; - let head_root = beacon_chain.canonical_head.read().head_block_root(); + let head_root = beacon_chain + .canonical_head + .cached_head_read_lock() + .head_block_root(); let (head_slot, head_state_root, mut state) = match beacon_chain .snapshot_cache diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 8d60895de13..662f8c1e8fe 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -524,19 +524,31 @@ where } pub fn head_slot(&self) -> Slot { - self.chain.canonical_head.read().head_slot() + self.chain + .canonical_head + .cached_head_read_lock() + .head_slot() } pub fn head_block_root(&self) -> Hash256 { - self.chain.canonical_head.read().head_block_root() + self.chain + .canonical_head + .cached_head_read_lock() + .head_block_root() } pub fn finalized_checkpoint(&self) -> Checkpoint { - self.chain.canonical_head.read().finalized_checkpoint() + self.chain + .canonical_head + .cached_head_read_lock() + .finalized_checkpoint() } pub fn justified_checkpoint(&self) -> Checkpoint { - self.chain.canonical_head.read().justified_checkpoint() + self.chain + .canonical_head + .cached_head_read_lock() + .justified_checkpoint() } pub fn get_current_slot(&self) -> Slot { @@ -1115,7 +1127,11 @@ where let mut attestation_2 = attestation_1.clone(); attestation_2.data.index += 1; - let fork = self.chain.canonical_head.read().head_fork(); + let fork = self + .chain + .canonical_head + .cached_head_read_lock() + .head_fork(); for attestation in &mut [&mut attestation_1, &mut attestation_2] { for &i in &attestation.attesting_indices { let sk = &self.validator_keypairs[i as usize].sk; @@ -1173,7 +1189,11 @@ where attestation_2.data.index += 1; - let fork = self.chain.canonical_head.read().head_fork(); + let fork = self + .chain + .canonical_head + .cached_head_read_lock() + .head_fork(); for attestation in &mut [&mut attestation_1, &mut attestation_2] { for &i in &attestation.attesting_indices { let sk = &self.validator_keypairs[i as usize].sk; @@ -1211,7 +1231,11 @@ where block_header_2.state_root = Hash256::zero(); let sk = &self.validator_keypairs[validator_index as usize].sk; - let fork = self.chain.canonical_head.read().head_fork(); + let fork = self + .chain + .canonical_head + .cached_head_read_lock() + .head_fork(); let genesis_validators_root = self.chain.genesis_validators_root; let mut signed_block_headers = vec![block_header_1, block_header_2] @@ -1229,7 +1253,11 @@ where pub fn make_voluntary_exit(&self, validator_index: u64, epoch: Epoch) -> SignedVoluntaryExit { let sk = &self.validator_keypairs[validator_index as usize].sk; - let fork = self.chain.canonical_head.read().head_fork(); + let fork = self + .chain + .canonical_head + .cached_head_read_lock() + .head_fork(); let genesis_validators_root = self.chain.genesis_validators_root; VoluntaryExit { @@ -1629,7 +1657,13 @@ where /// Uses `Self::extend_chain` to build the chain out to the `target_slot`. pub async fn extend_to_slot(&self, target_slot: Slot) -> Hash256 { - if self.chain.slot().unwrap() == self.chain.canonical_head.read().head_slot() { + if self.chain.slot().unwrap() + == self + .chain + .canonical_head + .cached_head_read_lock() + .head_slot() + { self.advance_slot(); } @@ -1650,7 +1684,13 @@ where /// - BlockStrategy::OnCanonicalHead, /// - AttestationStrategy::AllValidators, pub async fn extend_slots(&self, num_slots: usize) -> Hash256 { - if self.chain.slot().unwrap() == self.chain.canonical_head.read().head_slot() { + if self.chain.slot().unwrap() + == self + .chain + .canonical_head + .cached_head_read_lock() + .head_slot() + { self.advance_slot(); } diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 19963513c30..7390ce7f945 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -248,6 +248,7 @@ fn dequeue_attestations( /// Equivalent to the `is_from_block` `bool` in: /// /// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/fork-choice.md#validate_on_attestation +#[derive(Clone, Copy)] pub enum AttestationFromBlock { True, False, @@ -286,7 +287,7 @@ pub struct ForkChoice { /// Attestations that arrived at the current slot and must be queued for later processing. queued_attestations: Vec, /// Stores a cache of the values required to be sent to the execution layer. - forkchoice_update_parameters: Option, + forkchoice_update_parameters: ForkchoiceUpdateParameters, /// The most recent result of running `Self::get_head`. head_block_root: Hash256, _phantom: PhantomData, @@ -368,7 +369,12 @@ where fc_store, proto_array, queued_attestations: vec![], - forkchoice_update_parameters: None, + // This will be updated during the next call to `Self::get_head`. + forkchoice_update_parameters: ForkchoiceUpdateParameters { + head_hash: None, + finalized_hash: None, + head_root: Hash256::zero(), + }, // This will be updated during the next call to `Self::get_head`. head_block_root: Hash256::zero(), _phantom: PhantomData, @@ -403,9 +409,8 @@ where /// Returns cached information that can be used to issue a `forkchoiceUpdated` message to an /// execution engine. /// - /// These values are updated each time `Self::get_head` is called. May return `None` if - /// `Self::get_head` has not yet been called. - pub fn get_forkchoice_update_parameters(&self) -> Option { + /// These values are updated each time `Self::get_head` is called. + pub fn get_forkchoice_update_parameters(&self) -> ForkchoiceUpdateParameters { self.forkchoice_update_parameters } @@ -488,11 +493,11 @@ where let finalized_hash = self .get_block(&finalized_root) .and_then(|b| b.execution_status.block_hash()); - self.forkchoice_update_parameters = Some(ForkchoiceUpdateParameters { + self.forkchoice_update_parameters = ForkchoiceUpdateParameters { head_root, head_hash, finalized_hash, - }); + }; Ok(head_root) } @@ -1203,7 +1208,12 @@ where fc_store, proto_array, queued_attestations: persisted.queued_attestations, - forkchoice_update_parameters: None, + // Will be updated in the following call to `Self::get_head`. + forkchoice_update_parameters: ForkchoiceUpdateParameters { + head_hash: None, + finalized_hash: None, + head_root: Hash256::zero(), + }, // Will be updated in the following call to `Self::get_head`. head_block_root: Hash256::zero(), _phantom: PhantomData,