Skip to content

Commit

Permalink
Remove BlobSidecar construction and publish after PeerDAS activated (
Browse files Browse the repository at this point in the history
…#5759)

* Avoid building and publishing blob sidecars after PeerDAS.

* Ignore gossip blobs with a slot greater than peer das activation epoch.

* Only attempt to verify blob count and import blobs before PeerDAS.
  • Loading branch information
jimmygchen authored May 10, 2024
1 parent 09d217c commit 9f495e7
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 98 deletions.
5 changes: 5 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3006,6 +3006,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::BlockIsAlreadyKnown(blob.block_root()));
}

// No need to process and import blobs beyond the PeerDAS epoch.
if self.spec.is_peer_das_enabled_for_epoch(blob.epoch()) {
return Err(BlockError::BlobNotRequired(blob.slot()));
}

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_blob_sidecar_subscribers() {
event_handler.register(EventKind::BlobSidecar(SseBlobSidecar::from_blob_sidecar(
Expand Down
7 changes: 6 additions & 1 deletion beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use ssz_types::VariableList;
use std::time::Duration;
use tree_hash::TreeHash;
use types::blob_sidecar::BlobIdentifier;
use types::{BeaconStateError, BlobSidecar, EthSpec, Hash256, SignedBeaconBlockHeader, Slot};
use types::{
BeaconStateError, BlobSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlockHeader, Slot,
};

/// An error occurred while validating a gossip blob.
#[derive(Debug)]
Expand Down Expand Up @@ -231,6 +233,9 @@ impl<T: BeaconChainTypes> GossipVerifiedBlob<T> {
pub fn slot(&self) -> Slot {
self.blob.blob.slot()
}
pub fn epoch(&self) -> Epoch {
self.slot().epoch(T::EthSpec::slots_per_epoch())
}
pub fn index(&self) -> u64 {
self.blob.blob.index
}
Expand Down
82 changes: 46 additions & 36 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ use task_executor::JoinHandle;
use tree_hash::TreeHash;
use types::data_column_sidecar::DataColumnSidecarError;
use types::{
BeaconBlockRef, BeaconState, BeaconStateError, BlobSidecarList, ChainSpec, DataColumnSidecar,
BeaconBlockRef, BeaconState, BeaconStateError, BlobsList, ChainSpec, DataColumnSidecar,
DataColumnSubnetId, Epoch, EthSpec, ExecutionBlockHash, FullPayload, Hash256, InconsistentFork,
PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
KzgProofs, PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock,
SignedBeaconBlockHeader, Slot,
};
use types::{BlobSidecar, ExecPayload};

Expand Down Expand Up @@ -308,6 +309,14 @@ pub enum BlockError<E: EthSpec> {
/// TODO: We may need to penalize the peer that gave us a potentially invalid rpc blob.
/// https://github.com/sigp/lighthouse/issues/4546
AvailabilityCheck(AvailabilityCheckError),
/// A Blob with a slot after PeerDAS is received and is not required to be imported.
/// This can happen because we stay subscribed to the blob subnet after 2 epochs, as we could
/// still receive valid blobs from a Deneb epoch after PeerDAS is activated.
///
/// ## Peer scoring
///
/// This indicates the peer is sending an unexpected gossip blob and should be penalised.
BlobNotRequired(Slot),
}

impl<E: EthSpec> From<AvailabilityCheckError> for BlockError<E> {
Expand Down Expand Up @@ -717,36 +726,15 @@ impl<T: BeaconChainTypes> IntoGossipVerifiedBlockContents<T> for PublishBlockReq
chain: &BeaconChain<T>,
) -> Result<GossipVerifiedBlockContents<T>, BlockContentsError<T::EthSpec>> {
let (block, blobs) = self.deconstruct();
let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch());

let gossip_verified_blobs = blobs
.map(|(kzg_proofs, blobs)| {
let mut gossip_verified_blobs = vec![];
for (i, (kzg_proof, blob)) in kzg_proofs.iter().zip(blobs).enumerate() {
let _timer =
metrics::start_timer(&metrics::BLOB_SIDECAR_INCLUSION_PROOF_COMPUTATION);
let blob = BlobSidecar::new(i, blob, &block, *kzg_proof)
.map_err(BlockContentsError::BlobSidecarError)?;
drop(_timer);
let gossip_verified_blob =
GossipVerifiedBlob::new(Arc::new(blob), i as u64, chain)?;
gossip_verified_blobs.push(gossip_verified_blob);
}
let gossip_verified_blobs = VariableList::from(gossip_verified_blobs);
Ok::<_, BlockContentsError<T::EthSpec>>(gossip_verified_blobs)
})
.transpose()?;

let peer_das_enabled = chain
.spec
.eip7594_fork_epoch
.map_or(false, |eip7594_fork_epoch| {
block.epoch() >= eip7594_fork_epoch
});

let gossip_verified_data_columns = if peer_das_enabled {
build_gossip_verified_data_columns(chain, &block, gossip_verified_blobs.as_ref())?
let (gossip_verified_blobs, gossip_verified_data_columns) = if peer_das_enabled {
let gossip_verified_data_columns =
build_gossip_verified_data_columns(chain, &block, blobs.map(|(_, blobs)| blobs))?;
(None, gossip_verified_data_columns)
} else {
None
let gossip_verified_blobs = build_gossip_verified_blobs(chain, &block, blobs)?;
(gossip_verified_blobs, None)
};

let gossip_verified_block = GossipVerifiedBlock::new(block, chain)?;
Expand All @@ -763,12 +751,37 @@ impl<T: BeaconChainTypes> IntoGossipVerifiedBlockContents<T> for PublishBlockReq
}
}

#[allow(clippy::type_complexity)]
fn build_gossip_verified_blobs<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
block: &Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
blobs: Option<(KzgProofs<T::EthSpec>, BlobsList<T::EthSpec>)>,
) -> Result<Option<GossipVerifiedBlobList<T>>, BlockContentsError<T::EthSpec>> {
blobs
.map(|(kzg_proofs, blobs)| {
let mut gossip_verified_blobs = vec![];
for (i, (kzg_proof, blob)) in kzg_proofs.iter().zip(blobs).enumerate() {
let _timer =
metrics::start_timer(&metrics::BLOB_SIDECAR_INCLUSION_PROOF_COMPUTATION);
let blob = BlobSidecar::new(i, blob, block, *kzg_proof)
.map_err(BlockContentsError::BlobSidecarError)?;
drop(_timer);
let gossip_verified_blob =
GossipVerifiedBlob::new(Arc::new(blob), i as u64, chain)?;
gossip_verified_blobs.push(gossip_verified_blob);
}
let gossip_verified_blobs = VariableList::from(gossip_verified_blobs);
Ok::<_, BlockContentsError<T::EthSpec>>(gossip_verified_blobs)
})
.transpose()
}

fn build_gossip_verified_data_columns<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
block: &SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>,
gossip_verified_blobs: Option<&GossipVerifiedBlobList<T>>,
blobs: Option<BlobsList<T::EthSpec>>,
) -> Result<Option<GossipVerifiedDataColumnList<T>>, BlockContentsError<T::EthSpec>> {
gossip_verified_blobs
blobs
// Only attempt to build data columns if blobs is non empty to avoid skewing the metrics.
.filter(|b| !b.is_empty())
.map(|blobs| {
Expand All @@ -780,11 +793,8 @@ fn build_gossip_verified_data_columns<T: BeaconChainTypes>(
GossipDataColumnError::<T::EthSpec>::KzgNotInitialized,
))?;

let blob_sidecar_list: Vec<_> = blobs.iter().map(|blob| blob.clone_blob()).collect();
let blob_sidecar_list = BlobSidecarList::new(blob_sidecar_list)
.map_err(DataColumnSidecarError::SszError)?;
let timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_COMPUTATION);
let sidecars = DataColumnSidecar::build_sidecars(&blob_sidecar_list, block, kzg)?;
let sidecars = DataColumnSidecar::build_sidecars(&blobs, block, kzg)?;
drop(timer);
let mut gossip_verified_data_columns = vec![];
for sidecar in sidecars {
Expand Down
13 changes: 2 additions & 11 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,14 +419,14 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
/// Determines the blob requirements for a block. If the block is pre-deneb, no blobs are required.
/// If the epoch is from prior to the data availability boundary, no blobs are required.
pub fn blobs_required_for_epoch(&self, epoch: Epoch) -> bool {
self.da_check_required_for_epoch(epoch) && !self.is_peer_das_enabled_for_epoch(epoch)
self.da_check_required_for_epoch(epoch) && !self.spec.is_peer_das_enabled_for_epoch(epoch)
}

/// Determines the data column requirements for an epoch.
/// - If the epoch is pre-peerdas, no data columns are required.
/// - If the epoch is from prior to the data availability boundary, no data columns are required.
pub fn data_columns_required_for_epoch(&self, epoch: Epoch) -> bool {
self.da_check_required_for_epoch(epoch) && self.is_peer_das_enabled_for_epoch(epoch)
self.da_check_required_for_epoch(epoch) && self.spec.is_peer_das_enabled_for_epoch(epoch)
}

/// See `Self::blobs_required_for_epoch`
Expand All @@ -439,15 +439,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block.num_expected_blobs() > 0 && self.data_columns_required_for_epoch(block.epoch())
}

/// Returns true if the given epoch is greater than or equal to the `EIP7594_FORK_EPOCH`.
fn is_peer_das_enabled_for_epoch(&self, block_epoch: Epoch) -> bool {
self.spec
.eip7594_fork_epoch
.map_or(false, |eip7594_fork_epoch| {
block_epoch >= eip7594_fork_epoch
})
}

/// The epoch at which we require a data availability check in block processing.
/// `None` if the `Deneb` fork is disabled.
pub fn data_availability_boundary(&self) -> Option<Epoch> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::store::{DBColumn, KeyValueStore};
use crate::BeaconChainTypes;
use lru::LruCache;
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use slog::{debug, trace, Logger};
use slog::{trace, Logger};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use ssz_types::{FixedVector, VariableList};
Expand Down Expand Up @@ -235,7 +235,7 @@ impl<E: EthSpec> PendingComponents<E> {
) -> bool {
match block_import_requirement {
BlockImportRequirement::AllBlobs => {
debug!(
trace!(
log,
"Checking block and blob importability";
"block_root" => %self.block_root,
Expand All @@ -250,7 +250,6 @@ impl<E: EthSpec> PendingComponents<E> {
}
BlockImportRequirement::CustodyColumns(num_expected_columns) => {
let num_received_data_columns = self.num_received_data_columns();

trace!(
log,
"Checking block and data column importability";
Expand Down Expand Up @@ -284,7 +283,11 @@ impl<E: EthSpec> PendingComponents<E> {
///
/// WARNING: This function can potentially take a lot of time if the state needs to be
/// reconstructed from disk. Ensure you are not holding any write locks while calling this.
pub fn make_available<R>(self, recover: R) -> Result<Availability<E>, AvailabilityCheckError>
pub fn make_available<R>(
self,
spec: &ChainSpec,
recover: R,
) -> Result<Availability<E>, AvailabilityCheckError>
where
R: FnOnce(
DietAvailabilityPendingExecutedBlock<E>,
Expand All @@ -306,17 +309,23 @@ impl<E: EthSpec> PendingComponents<E> {
let Some(diet_executed_block) = executed_block else {
return Err(AvailabilityCheckError::Unexpected);
};
let num_blobs_expected = diet_executed_block.num_blobs_expected();
let Some(verified_blobs) = verified_blobs
.into_iter()
.cloned()
.map(|b| b.map(|b| b.to_blob()))
.take(num_blobs_expected)
.collect::<Option<Vec<_>>>()
else {
return Err(AvailabilityCheckError::Unexpected);
};
let verified_blobs = VariableList::new(verified_blobs)?;

let is_before_peer_das = !spec.is_peer_das_enabled_for_epoch(diet_executed_block.epoch());
let blobs = is_before_peer_das
.then(|| {
let num_blobs_expected = diet_executed_block.num_blobs_expected();
let Some(verified_blobs) = verified_blobs
.into_iter()
.cloned()
.map(|b| b.map(|b| b.to_blob()))
.take(num_blobs_expected)
.collect::<Option<Vec<_>>>()
else {
return Err(AvailabilityCheckError::Unexpected);
};
Ok(VariableList::new(verified_blobs)?)
})
.transpose()?;

let executed_block = recover(diet_executed_block)?;

Expand All @@ -329,7 +338,7 @@ impl<E: EthSpec> PendingComponents<E> {
let available_block = AvailableBlock {
block_root,
block,
blobs: Some(verified_blobs),
blobs,
blobs_available_timestamp,
// TODO(das) Do we need a check here for number of expected custody columns?
// TODO(das): Update store types to prevent this conversion
Expand Down Expand Up @@ -790,10 +799,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
.epoch()
.ok_or(AvailabilityCheckError::UnableToDetermineImportRequirement)?;

let peer_das_enabled = self
.spec
.eip7594_fork_epoch
.map_or(false, |eip7594_fork_epoch| epoch >= eip7594_fork_epoch);
let peer_das_enabled = self.spec.is_peer_das_enabled_for_epoch(epoch);
if peer_das_enabled {
Ok(BlockImportRequirement::CustodyColumns(
self.custody_column_count,
Expand Down Expand Up @@ -824,7 +830,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
if pending_components.is_available(block_import_requirement, &self.log) {
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
pending_components.make_available(&self.spec, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
} else {
Expand Down Expand Up @@ -864,7 +870,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
if pending_components.is_available(block_import_requirement, &self.log) {
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
pending_components.make_available(&self.spec, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
} else {
Expand Down Expand Up @@ -904,7 +910,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
if pending_components.is_available(block_import_requirement, &self.log) {
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
pending_components.make_available(&self.spec, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ impl<E: EthSpec> DietAvailabilityPendingExecutedBlock<E> {
.cloned()
.unwrap_or_default()
}

/// Returns the epoch corresponding to `self.slot()`.
pub fn epoch(&self) -> Epoch {
self.block.slot().epoch(E::slots_per_epoch())
}
}

/// This LRU cache holds BeaconStates used for block import. If the cache overflows,
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
}
};

// TODO(das): We could potentially get rid of these conversions and pass `GossipVerified` types
// to `publish_block`, i.e. have `GossipVerified` types in `PubsubMessage`?
// This saves us from extra code and provides guarantee that published
// components are verified.
// Clone here, so we can take advantage of the `Arc`. The block in `BlockContents` is not,
// `Arc`'d but blobs are.
let block = gossip_verified_block.block.block_cloned();
Expand Down
10 changes: 10 additions & 0 deletions beacon_node/network/src/network_beacon_processor/gossip_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
return None;
}
Err(e @ BlockError::BlobNotRequired(_)) => {
// TODO(das): penalty not implemented yet as other clients may still send us blobs
// during early stage of implementation.
debug!(self.log, "Received blobs for slot after PeerDAS epoch from peer";
"error" => %e,
"peer_id" => %peer_id,
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return None;
}
};

metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL);
Expand Down
7 changes: 7 additions & 0 deletions consensus/types/src/chain_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,13 @@ impl ChainSpec {
}
}

/// Returns true if the given epoch is greater than or equal to the `EIP7594_FORK_EPOCH`.
pub fn is_peer_das_enabled_for_epoch(&self, block_epoch: Epoch) -> bool {
self.eip7594_fork_epoch.map_or(false, |eip7594_fork_epoch| {
block_epoch >= eip7594_fork_epoch
})
}

/// Returns a full `Fork` struct for a given epoch.
pub fn fork_at_epoch(&self, epoch: Epoch) -> Fork {
let current_fork_name = self.fork_name_at_epoch(epoch);
Expand Down
Loading

0 comments on commit 9f495e7

Please sign in to comment.