From 9f495e7f35184b421301cbc83b8dfc212c6eead1 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 10 May 2024 13:09:40 +1000 Subject: [PATCH] Remove `BlobSidecar` construction and publish after PeerDAS activated (#5759) * Avoid building and publishing blob sidecars after PeerDAS. * Ignore gossip blobs with a slot greater than peer das activation epoch. * Only attempt to verify blob count and import blobs before PeerDAS. --- beacon_node/beacon_chain/src/beacon_chain.rs | 5 ++ .../beacon_chain/src/blob_verification.rs | 7 +- .../beacon_chain/src/block_verification.rs | 82 +++++++++++-------- .../src/data_availability_checker.rs | 13 +-- .../overflow_lru_cache.rs | 52 ++++++------ .../state_lru_cache.rs | 5 ++ beacon_node/http_api/src/publish_blocks.rs | 4 + .../gossip_methods.rs | 10 +++ consensus/types/src/chain_spec.rs | 7 ++ consensus/types/src/data_column_sidecar.rs | 43 ++++------ 10 files changed, 130 insertions(+), 98 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index b9c1156c478..78d26a9fb5c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3006,6 +3006,11 @@ impl BeaconChain { return Err(BlockError::BlockIsAlreadyKnown(blob.block_root())); } + // No need to process and import blobs beyond the PeerDAS epoch. + if self.spec.is_peer_das_enabled_for_epoch(blob.epoch()) { + return Err(BlockError::BlobNotRequired(blob.slot())); + } + if let Some(event_handler) = self.event_handler.as_ref() { if event_handler.has_blob_sidecar_subscribers() { event_handler.register(EventKind::BlobSidecar(SseBlobSidecar::from_blob_sidecar( diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index fdf8ee2b971..ba875867a00 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -17,7 +17,9 @@ use ssz_types::VariableList; use std::time::Duration; use tree_hash::TreeHash; use types::blob_sidecar::BlobIdentifier; -use types::{BeaconStateError, BlobSidecar, EthSpec, Hash256, SignedBeaconBlockHeader, Slot}; +use types::{ + BeaconStateError, BlobSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlockHeader, Slot, +}; /// An error occurred while validating a gossip blob. #[derive(Debug)] @@ -231,6 +233,9 @@ impl GossipVerifiedBlob { pub fn slot(&self) -> Slot { self.blob.blob.slot() } + pub fn epoch(&self) -> Epoch { + self.slot().epoch(T::EthSpec::slots_per_epoch()) + } pub fn index(&self) -> u64 { self.blob.blob.index } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 77b89b94554..0a49e107042 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -99,9 +99,10 @@ use task_executor::JoinHandle; use tree_hash::TreeHash; use types::data_column_sidecar::DataColumnSidecarError; use types::{ - BeaconBlockRef, BeaconState, BeaconStateError, BlobSidecarList, ChainSpec, DataColumnSidecar, + BeaconBlockRef, BeaconState, BeaconStateError, BlobsList, ChainSpec, DataColumnSidecar, DataColumnSubnetId, Epoch, EthSpec, ExecutionBlockHash, FullPayload, Hash256, InconsistentFork, - PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, + KzgProofs, PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock, + SignedBeaconBlockHeader, Slot, }; use types::{BlobSidecar, ExecPayload}; @@ -308,6 +309,14 @@ pub enum BlockError { /// TODO: We may need to penalize the peer that gave us a potentially invalid rpc blob. /// https://github.com/sigp/lighthouse/issues/4546 AvailabilityCheck(AvailabilityCheckError), + /// A Blob with a slot after PeerDAS is received and is not required to be imported. + /// This can happen because we stay subscribed to the blob subnet after 2 epochs, as we could + /// still receive valid blobs from a Deneb epoch after PeerDAS is activated. + /// + /// ## Peer scoring + /// + /// This indicates the peer is sending an unexpected gossip blob and should be penalised. + BlobNotRequired(Slot), } impl From for BlockError { @@ -717,36 +726,15 @@ impl IntoGossipVerifiedBlockContents for PublishBlockReq chain: &BeaconChain, ) -> Result, BlockContentsError> { let (block, blobs) = self.deconstruct(); + let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch()); - let gossip_verified_blobs = blobs - .map(|(kzg_proofs, blobs)| { - let mut gossip_verified_blobs = vec![]; - for (i, (kzg_proof, blob)) in kzg_proofs.iter().zip(blobs).enumerate() { - let _timer = - metrics::start_timer(&metrics::BLOB_SIDECAR_INCLUSION_PROOF_COMPUTATION); - let blob = BlobSidecar::new(i, blob, &block, *kzg_proof) - .map_err(BlockContentsError::BlobSidecarError)?; - drop(_timer); - let gossip_verified_blob = - GossipVerifiedBlob::new(Arc::new(blob), i as u64, chain)?; - gossip_verified_blobs.push(gossip_verified_blob); - } - let gossip_verified_blobs = VariableList::from(gossip_verified_blobs); - Ok::<_, BlockContentsError>(gossip_verified_blobs) - }) - .transpose()?; - - let peer_das_enabled = chain - .spec - .eip7594_fork_epoch - .map_or(false, |eip7594_fork_epoch| { - block.epoch() >= eip7594_fork_epoch - }); - - let gossip_verified_data_columns = if peer_das_enabled { - build_gossip_verified_data_columns(chain, &block, gossip_verified_blobs.as_ref())? + let (gossip_verified_blobs, gossip_verified_data_columns) = if peer_das_enabled { + let gossip_verified_data_columns = + build_gossip_verified_data_columns(chain, &block, blobs.map(|(_, blobs)| blobs))?; + (None, gossip_verified_data_columns) } else { - None + let gossip_verified_blobs = build_gossip_verified_blobs(chain, &block, blobs)?; + (gossip_verified_blobs, None) }; let gossip_verified_block = GossipVerifiedBlock::new(block, chain)?; @@ -763,12 +751,37 @@ impl IntoGossipVerifiedBlockContents for PublishBlockReq } } +#[allow(clippy::type_complexity)] +fn build_gossip_verified_blobs( + chain: &BeaconChain, + block: &Arc>>, + blobs: Option<(KzgProofs, BlobsList)>, +) -> Result>, BlockContentsError> { + blobs + .map(|(kzg_proofs, blobs)| { + let mut gossip_verified_blobs = vec![]; + for (i, (kzg_proof, blob)) in kzg_proofs.iter().zip(blobs).enumerate() { + let _timer = + metrics::start_timer(&metrics::BLOB_SIDECAR_INCLUSION_PROOF_COMPUTATION); + let blob = BlobSidecar::new(i, blob, block, *kzg_proof) + .map_err(BlockContentsError::BlobSidecarError)?; + drop(_timer); + let gossip_verified_blob = + GossipVerifiedBlob::new(Arc::new(blob), i as u64, chain)?; + gossip_verified_blobs.push(gossip_verified_blob); + } + let gossip_verified_blobs = VariableList::from(gossip_verified_blobs); + Ok::<_, BlockContentsError>(gossip_verified_blobs) + }) + .transpose() +} + fn build_gossip_verified_data_columns( chain: &BeaconChain, block: &SignedBeaconBlock>, - gossip_verified_blobs: Option<&GossipVerifiedBlobList>, + blobs: Option>, ) -> Result>, BlockContentsError> { - gossip_verified_blobs + blobs // Only attempt to build data columns if blobs is non empty to avoid skewing the metrics. .filter(|b| !b.is_empty()) .map(|blobs| { @@ -780,11 +793,8 @@ fn build_gossip_verified_data_columns( GossipDataColumnError::::KzgNotInitialized, ))?; - let blob_sidecar_list: Vec<_> = blobs.iter().map(|blob| blob.clone_blob()).collect(); - let blob_sidecar_list = BlobSidecarList::new(blob_sidecar_list) - .map_err(DataColumnSidecarError::SszError)?; let timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_COMPUTATION); - let sidecars = DataColumnSidecar::build_sidecars(&blob_sidecar_list, block, kzg)?; + let sidecars = DataColumnSidecar::build_sidecars(&blobs, block, kzg)?; drop(timer); let mut gossip_verified_data_columns = vec![]; for sidecar in sidecars { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index b7a56cac164..1b171508ba6 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -419,14 +419,14 @@ impl DataAvailabilityChecker { /// Determines the blob requirements for a block. If the block is pre-deneb, no blobs are required. /// If the epoch is from prior to the data availability boundary, no blobs are required. pub fn blobs_required_for_epoch(&self, epoch: Epoch) -> bool { - self.da_check_required_for_epoch(epoch) && !self.is_peer_das_enabled_for_epoch(epoch) + self.da_check_required_for_epoch(epoch) && !self.spec.is_peer_das_enabled_for_epoch(epoch) } /// Determines the data column requirements for an epoch. /// - If the epoch is pre-peerdas, no data columns are required. /// - If the epoch is from prior to the data availability boundary, no data columns are required. pub fn data_columns_required_for_epoch(&self, epoch: Epoch) -> bool { - self.da_check_required_for_epoch(epoch) && self.is_peer_das_enabled_for_epoch(epoch) + self.da_check_required_for_epoch(epoch) && self.spec.is_peer_das_enabled_for_epoch(epoch) } /// See `Self::blobs_required_for_epoch` @@ -439,15 +439,6 @@ impl DataAvailabilityChecker { block.num_expected_blobs() > 0 && self.data_columns_required_for_epoch(block.epoch()) } - /// Returns true if the given epoch is greater than or equal to the `EIP7594_FORK_EPOCH`. - fn is_peer_das_enabled_for_epoch(&self, block_epoch: Epoch) -> bool { - self.spec - .eip7594_fork_epoch - .map_or(false, |eip7594_fork_epoch| { - block_epoch >= eip7594_fork_epoch - }) - } - /// The epoch at which we require a data availability check in block processing. /// `None` if the `Deneb` fork is disabled. pub fn data_availability_boundary(&self) -> Option { diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 5947c521545..7e3e65d0790 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -39,7 +39,7 @@ use crate::store::{DBColumn, KeyValueStore}; use crate::BeaconChainTypes; use lru::LruCache; use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; -use slog::{debug, trace, Logger}; +use slog::{trace, Logger}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use ssz_types::{FixedVector, VariableList}; @@ -235,7 +235,7 @@ impl PendingComponents { ) -> bool { match block_import_requirement { BlockImportRequirement::AllBlobs => { - debug!( + trace!( log, "Checking block and blob importability"; "block_root" => %self.block_root, @@ -250,7 +250,6 @@ impl PendingComponents { } BlockImportRequirement::CustodyColumns(num_expected_columns) => { let num_received_data_columns = self.num_received_data_columns(); - trace!( log, "Checking block and data column importability"; @@ -284,7 +283,11 @@ impl PendingComponents { /// /// WARNING: This function can potentially take a lot of time if the state needs to be /// reconstructed from disk. Ensure you are not holding any write locks while calling this. - pub fn make_available(self, recover: R) -> Result, AvailabilityCheckError> + pub fn make_available( + self, + spec: &ChainSpec, + recover: R, + ) -> Result, AvailabilityCheckError> where R: FnOnce( DietAvailabilityPendingExecutedBlock, @@ -306,17 +309,23 @@ impl PendingComponents { let Some(diet_executed_block) = executed_block else { return Err(AvailabilityCheckError::Unexpected); }; - let num_blobs_expected = diet_executed_block.num_blobs_expected(); - let Some(verified_blobs) = verified_blobs - .into_iter() - .cloned() - .map(|b| b.map(|b| b.to_blob())) - .take(num_blobs_expected) - .collect::>>() - else { - return Err(AvailabilityCheckError::Unexpected); - }; - let verified_blobs = VariableList::new(verified_blobs)?; + + let is_before_peer_das = !spec.is_peer_das_enabled_for_epoch(diet_executed_block.epoch()); + let blobs = is_before_peer_das + .then(|| { + let num_blobs_expected = diet_executed_block.num_blobs_expected(); + let Some(verified_blobs) = verified_blobs + .into_iter() + .cloned() + .map(|b| b.map(|b| b.to_blob())) + .take(num_blobs_expected) + .collect::>>() + else { + return Err(AvailabilityCheckError::Unexpected); + }; + Ok(VariableList::new(verified_blobs)?) + }) + .transpose()?; let executed_block = recover(diet_executed_block)?; @@ -329,7 +338,7 @@ impl PendingComponents { let available_block = AvailableBlock { block_root, block, - blobs: Some(verified_blobs), + blobs, blobs_available_timestamp, // TODO(das) Do we need a check here for number of expected custody columns? // TODO(das): Update store types to prevent this conversion @@ -790,10 +799,7 @@ impl OverflowLRUCache { .epoch() .ok_or(AvailabilityCheckError::UnableToDetermineImportRequirement)?; - let peer_das_enabled = self - .spec - .eip7594_fork_epoch - .map_or(false, |eip7594_fork_epoch| epoch >= eip7594_fork_epoch); + let peer_das_enabled = self.spec.is_peer_das_enabled_for_epoch(epoch); if peer_das_enabled { Ok(BlockImportRequirement::CustodyColumns( self.custody_column_count, @@ -824,7 +830,7 @@ impl OverflowLRUCache { if pending_components.is_available(block_import_requirement, &self.log) { // No need to hold the write lock anymore drop(write_lock); - pending_components.make_available(|diet_block| { + pending_components.make_available(&self.spec, |diet_block| { self.state_cache.recover_pending_executed_block(diet_block) }) } else { @@ -864,7 +870,7 @@ impl OverflowLRUCache { if pending_components.is_available(block_import_requirement, &self.log) { // No need to hold the write lock anymore drop(write_lock); - pending_components.make_available(|diet_block| { + pending_components.make_available(&self.spec, |diet_block| { self.state_cache.recover_pending_executed_block(diet_block) }) } else { @@ -904,7 +910,7 @@ impl OverflowLRUCache { if pending_components.is_available(block_import_requirement, &self.log) { // No need to hold the write lock anymore drop(write_lock); - pending_components.make_available(|diet_block| { + pending_components.make_available(&self.spec, |diet_block| { self.state_cache.recover_pending_executed_block(diet_block) }) } else { diff --git a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs index f8a243bd9e8..71b81bcddc6 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs @@ -53,6 +53,11 @@ impl DietAvailabilityPendingExecutedBlock { .cloned() .unwrap_or_default() } + + /// Returns the epoch corresponding to `self.slot()`. + pub fn epoch(&self) -> Epoch { + self.block.slot().epoch(E::slots_per_epoch()) + } } /// This LRU cache holds BeaconStates used for block import. If the cache overflows, diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 067136919e7..896448176a0 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -173,6 +173,10 @@ pub async fn publish_block NetworkBeaconProcessor { ); return None; } + Err(e @ BlockError::BlobNotRequired(_)) => { + // TODO(das): penalty not implemented yet as other clients may still send us blobs + // during early stage of implementation. + debug!(self.log, "Received blobs for slot after PeerDAS epoch from peer"; + "error" => %e, + "peer_id" => %peer_id, + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + return None; + } }; metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL); diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index d9f9865927d..b166e547dbb 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -397,6 +397,13 @@ impl ChainSpec { } } + /// Returns true if the given epoch is greater than or equal to the `EIP7594_FORK_EPOCH`. + pub fn is_peer_das_enabled_for_epoch(&self, block_epoch: Epoch) -> bool { + self.eip7594_fork_epoch.map_or(false, |eip7594_fork_epoch| { + block_epoch >= eip7594_fork_epoch + }) + } + /// Returns a full `Fork` struct for a given epoch. pub fn fork_at_epoch(&self, epoch: Epoch) -> Fork { let current_fork_name = self.fork_name_at_epoch(epoch); diff --git a/consensus/types/src/data_column_sidecar.rs b/consensus/types/src/data_column_sidecar.rs index f92838ceabe..7924a32f7b5 100644 --- a/consensus/types/src/data_column_sidecar.rs +++ b/consensus/types/src/data_column_sidecar.rs @@ -1,10 +1,10 @@ use crate::beacon_block_body::KzgCommitments; use crate::test_utils::TestRandom; -use crate::BeaconStateError; use crate::{ - BeaconBlockHeader, BlobSidecarList, EthSpec, Hash256, KzgProofs, SignedBeaconBlock, - SignedBeaconBlockHeader, Slot, + BeaconBlockHeader, EthSpec, Hash256, KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, + Slot, }; +use crate::{BeaconStateError, BlobsList}; use bls::Signature; use derivative::Derivative; #[cfg_attr(test, double)] @@ -83,7 +83,7 @@ impl DataColumnSidecar { } pub fn build_sidecars( - blobs: &BlobSidecarList, + blobs: &BlobsList, block: &SignedBeaconBlock, kzg: &Kzg, ) -> Result, DataColumnSidecarError> { @@ -106,7 +106,7 @@ impl DataColumnSidecar { // NOTE: assumes blob sidecars are ordered by index for blob in blobs { - let blob = KzgBlob::from_bytes(&blob.blob).map_err(KzgError::from)?; + let blob = KzgBlob::from_bytes(blob).map_err(KzgError::from)?; let (blob_cells, blob_cell_proofs) = kzg.compute_cells_and_proofs(&blob)?; // we iterate over each column, and we construct the column from "top to bottom", @@ -279,11 +279,11 @@ mod test { use crate::beacon_block_body::KzgCommitments; use crate::eth_spec::EthSpec; use crate::{ - BeaconBlock, BeaconBlockDeneb, Blob, BlobSidecar, BlobSidecarList, ChainSpec, - DataColumnSidecar, MainnetEthSpec, SignedBeaconBlock, + BeaconBlock, BeaconBlockDeneb, Blob, ChainSpec, DataColumnSidecar, MainnetEthSpec, + SignedBeaconBlock, }; use bls::Signature; - use kzg::{KzgCommitment, KzgProof}; + use kzg::KzgCommitment; use std::sync::Arc; #[test] @@ -291,8 +291,7 @@ mod test { type E = MainnetEthSpec; let num_of_blobs = 0; let spec = E::default_spec(); - let (signed_block, blob_sidecars) = - create_test_block_and_blob_sidecars::(num_of_blobs, &spec); + let (signed_block, blob_sidecars) = create_test_block_and_blobs::(num_of_blobs, &spec); let mock_kzg = Arc::new(Kzg::default()); let column_sidecars = @@ -306,8 +305,7 @@ mod test { type E = MainnetEthSpec; let num_of_blobs = 6; let spec = E::default_spec(); - let (signed_block, blob_sidecars) = - create_test_block_and_blob_sidecars::(num_of_blobs, &spec); + let (signed_block, blob_sidecars) = create_test_block_and_blobs::(num_of_blobs, &spec); let mut mock_kzg = Kzg::default(); mock_kzg @@ -345,10 +343,10 @@ mod test { } } - fn create_test_block_and_blob_sidecars( + fn create_test_block_and_blobs( num_of_blobs: usize, spec: &ChainSpec, - ) -> (SignedBeaconBlock, BlobSidecarList) { + ) -> (SignedBeaconBlock, BlobsList) { let mut block = BeaconBlock::Deneb(BeaconBlockDeneb::empty(spec)); let mut body = block.body_mut(); let blob_kzg_commitments = body.blob_kzg_commitments_mut().unwrap(); @@ -358,20 +356,11 @@ mod test { let signed_block = SignedBeaconBlock::from_block(block, Signature::empty()); - let sidecars = (0..num_of_blobs) - .map(|index| { - BlobSidecar::new( - index, - Blob::::default(), - &signed_block, - KzgProof::empty(), - ) - .map(Arc::new) - }) - .collect::, _>>() - .unwrap() + let blobs = (0..num_of_blobs) + .map(|_| Blob::::default()) + .collect::>() .into(); - (signed_block, sidecars) + (signed_block, blobs) } }