Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove BlobSidecar construction and publish after PeerDAS activated #5759

Merged
merged 3 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3006,6 +3006,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::BlockIsAlreadyKnown(blob.block_root()));
}

// No need to process and import blobs beyond the PeerDAS epoch.
if self.spec.is_peer_das_enabled_for_epoch(blob.epoch()) {
return Err(BlockError::BlobNotRequired(blob.slot()));
}

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_blob_sidecar_subscribers() {
event_handler.register(EventKind::BlobSidecar(SseBlobSidecar::from_blob_sidecar(
Expand Down
7 changes: 6 additions & 1 deletion beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use ssz_types::VariableList;
use std::time::Duration;
use tree_hash::TreeHash;
use types::blob_sidecar::BlobIdentifier;
use types::{BeaconStateError, BlobSidecar, EthSpec, Hash256, SignedBeaconBlockHeader, Slot};
use types::{
BeaconStateError, BlobSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlockHeader, Slot,
};

/// An error occurred while validating a gossip blob.
#[derive(Debug)]
Expand Down Expand Up @@ -231,6 +233,9 @@ impl<T: BeaconChainTypes> GossipVerifiedBlob<T> {
pub fn slot(&self) -> Slot {
self.blob.blob.slot()
}
pub fn epoch(&self) -> Epoch {
self.slot().epoch(T::EthSpec::slots_per_epoch())
}
pub fn index(&self) -> u64 {
self.blob.blob.index
}
Expand Down
82 changes: 46 additions & 36 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ use task_executor::JoinHandle;
use tree_hash::TreeHash;
use types::data_column_sidecar::DataColumnSidecarError;
use types::{
BeaconBlockRef, BeaconState, BeaconStateError, BlobSidecarList, ChainSpec, DataColumnSidecar,
BeaconBlockRef, BeaconState, BeaconStateError, BlobsList, ChainSpec, DataColumnSidecar,
DataColumnSubnetId, Epoch, EthSpec, ExecutionBlockHash, FullPayload, Hash256, InconsistentFork,
PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
KzgProofs, PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock,
SignedBeaconBlockHeader, Slot,
};
use types::{BlobSidecar, ExecPayload};

Expand Down Expand Up @@ -308,6 +309,14 @@ pub enum BlockError<E: EthSpec> {
/// TODO: We may need to penalize the peer that gave us a potentially invalid rpc blob.
/// https://github.com/sigp/lighthouse/issues/4546
AvailabilityCheck(AvailabilityCheckError),
/// A Blob with a slot after PeerDAS is received and is not required to be imported.
/// This can happen because we stay subscribed to the blob subnet after 2 epochs, as we could
/// still receive valid blobs from a Deneb epoch after PeerDAS is activated.
///
/// ## Peer scoring
///
/// This indicates the peer is sending an unexpected gossip blob and should be penalised.
BlobNotRequired(Slot),
}

impl<E: EthSpec> From<AvailabilityCheckError> for BlockError<E> {
Expand Down Expand Up @@ -717,36 +726,15 @@ impl<T: BeaconChainTypes> IntoGossipVerifiedBlockContents<T> for PublishBlockReq
chain: &BeaconChain<T>,
) -> Result<GossipVerifiedBlockContents<T>, BlockContentsError<T::EthSpec>> {
let (block, blobs) = self.deconstruct();
let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch());

let gossip_verified_blobs = blobs
.map(|(kzg_proofs, blobs)| {
let mut gossip_verified_blobs = vec![];
for (i, (kzg_proof, blob)) in kzg_proofs.iter().zip(blobs).enumerate() {
let _timer =
metrics::start_timer(&metrics::BLOB_SIDECAR_INCLUSION_PROOF_COMPUTATION);
let blob = BlobSidecar::new(i, blob, &block, *kzg_proof)
.map_err(BlockContentsError::BlobSidecarError)?;
drop(_timer);
let gossip_verified_blob =
GossipVerifiedBlob::new(Arc::new(blob), i as u64, chain)?;
gossip_verified_blobs.push(gossip_verified_blob);
}
let gossip_verified_blobs = VariableList::from(gossip_verified_blobs);
Ok::<_, BlockContentsError<T::EthSpec>>(gossip_verified_blobs)
})
.transpose()?;

let peer_das_enabled = chain
.spec
.eip7594_fork_epoch
.map_or(false, |eip7594_fork_epoch| {
block.epoch() >= eip7594_fork_epoch
});

let gossip_verified_data_columns = if peer_das_enabled {
build_gossip_verified_data_columns(chain, &block, gossip_verified_blobs.as_ref())?
let (gossip_verified_blobs, gossip_verified_data_columns) = if peer_das_enabled {
let gossip_verified_data_columns =
build_gossip_verified_data_columns(chain, &block, blobs.map(|(_, blobs)| blobs))?;
(None, gossip_verified_data_columns)
} else {
None
let gossip_verified_blobs = build_gossip_verified_blobs(chain, &block, blobs)?;
(gossip_verified_blobs, None)
};

let gossip_verified_block = GossipVerifiedBlock::new(block, chain)?;
Expand All @@ -763,12 +751,37 @@ impl<T: BeaconChainTypes> IntoGossipVerifiedBlockContents<T> for PublishBlockReq
}
}

#[allow(clippy::type_complexity)]
fn build_gossip_verified_blobs<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
block: &Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
blobs: Option<(KzgProofs<T::EthSpec>, BlobsList<T::EthSpec>)>,
) -> Result<Option<GossipVerifiedBlobList<T>>, BlockContentsError<T::EthSpec>> {
blobs
.map(|(kzg_proofs, blobs)| {
let mut gossip_verified_blobs = vec![];
for (i, (kzg_proof, blob)) in kzg_proofs.iter().zip(blobs).enumerate() {
let _timer =
metrics::start_timer(&metrics::BLOB_SIDECAR_INCLUSION_PROOF_COMPUTATION);
let blob = BlobSidecar::new(i, blob, block, *kzg_proof)
.map_err(BlockContentsError::BlobSidecarError)?;
drop(_timer);
let gossip_verified_blob =
GossipVerifiedBlob::new(Arc::new(blob), i as u64, chain)?;
gossip_verified_blobs.push(gossip_verified_blob);
}
let gossip_verified_blobs = VariableList::from(gossip_verified_blobs);
Ok::<_, BlockContentsError<T::EthSpec>>(gossip_verified_blobs)
})
.transpose()
}

fn build_gossip_verified_data_columns<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
block: &SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>,
gossip_verified_blobs: Option<&GossipVerifiedBlobList<T>>,
blobs: Option<BlobsList<T::EthSpec>>,
) -> Result<Option<GossipVerifiedDataColumnList<T>>, BlockContentsError<T::EthSpec>> {
gossip_verified_blobs
blobs
// Only attempt to build data columns if blobs is non empty to avoid skewing the metrics.
.filter(|b| !b.is_empty())
.map(|blobs| {
Expand All @@ -780,11 +793,8 @@ fn build_gossip_verified_data_columns<T: BeaconChainTypes>(
GossipDataColumnError::<T::EthSpec>::KzgNotInitialized,
))?;

let blob_sidecar_list: Vec<_> = blobs.iter().map(|blob| blob.clone_blob()).collect();
let blob_sidecar_list = BlobSidecarList::new(blob_sidecar_list)
.map_err(DataColumnSidecarError::SszError)?;
let timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_COMPUTATION);
let sidecars = DataColumnSidecar::build_sidecars(&blob_sidecar_list, block, kzg)?;
let sidecars = DataColumnSidecar::build_sidecars(&blobs, block, kzg)?;
drop(timer);
let mut gossip_verified_data_columns = vec![];
for sidecar in sidecars {
Expand Down
13 changes: 2 additions & 11 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,14 +419,14 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
/// Determines the blob requirements for a block. If the block is pre-deneb, no blobs are required.
/// If the epoch is from prior to the data availability boundary, no blobs are required.
pub fn blobs_required_for_epoch(&self, epoch: Epoch) -> bool {
self.da_check_required_for_epoch(epoch) && !self.is_peer_das_enabled_for_epoch(epoch)
self.da_check_required_for_epoch(epoch) && !self.spec.is_peer_das_enabled_for_epoch(epoch)
}

/// Determines the data column requirements for an epoch.
/// - If the epoch is pre-peerdas, no data columns are required.
/// - If the epoch is from prior to the data availability boundary, no data columns are required.
pub fn data_columns_required_for_epoch(&self, epoch: Epoch) -> bool {
self.da_check_required_for_epoch(epoch) && self.is_peer_das_enabled_for_epoch(epoch)
self.da_check_required_for_epoch(epoch) && self.spec.is_peer_das_enabled_for_epoch(epoch)
}

/// See `Self::blobs_required_for_epoch`
Expand All @@ -439,15 +439,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block.num_expected_blobs() > 0 && self.data_columns_required_for_epoch(block.epoch())
}

/// Returns true if the given epoch is greater than or equal to the `EIP7594_FORK_EPOCH`.
fn is_peer_das_enabled_for_epoch(&self, block_epoch: Epoch) -> bool {
self.spec
.eip7594_fork_epoch
.map_or(false, |eip7594_fork_epoch| {
block_epoch >= eip7594_fork_epoch
})
}

/// The epoch at which we require a data availability check in block processing.
/// `None` if the `Deneb` fork is disabled.
pub fn data_availability_boundary(&self) -> Option<Epoch> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::store::{DBColumn, KeyValueStore};
use crate::BeaconChainTypes;
use lru::LruCache;
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use slog::{debug, trace, Logger};
use slog::{trace, Logger};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use ssz_types::{FixedVector, VariableList};
Expand Down Expand Up @@ -235,7 +235,7 @@ impl<E: EthSpec> PendingComponents<E> {
) -> bool {
match block_import_requirement {
BlockImportRequirement::AllBlobs => {
debug!(
trace!(
log,
"Checking block and blob importability";
"block_root" => %self.block_root,
Expand All @@ -250,7 +250,6 @@ impl<E: EthSpec> PendingComponents<E> {
}
BlockImportRequirement::CustodyColumns(num_expected_columns) => {
let num_received_data_columns = self.num_received_data_columns();

trace!(
log,
"Checking block and data column importability";
Expand Down Expand Up @@ -284,7 +283,11 @@ impl<E: EthSpec> PendingComponents<E> {
///
/// WARNING: This function can potentially take a lot of time if the state needs to be
/// reconstructed from disk. Ensure you are not holding any write locks while calling this.
pub fn make_available<R>(self, recover: R) -> Result<Availability<E>, AvailabilityCheckError>
pub fn make_available<R>(
self,
spec: &ChainSpec,
recover: R,
) -> Result<Availability<E>, AvailabilityCheckError>
where
R: FnOnce(
DietAvailabilityPendingExecutedBlock<E>,
Expand All @@ -306,17 +309,23 @@ impl<E: EthSpec> PendingComponents<E> {
let Some(diet_executed_block) = executed_block else {
return Err(AvailabilityCheckError::Unexpected);
};
let num_blobs_expected = diet_executed_block.num_blobs_expected();
let Some(verified_blobs) = verified_blobs
.into_iter()
.cloned()
.map(|b| b.map(|b| b.to_blob()))
.take(num_blobs_expected)
.collect::<Option<Vec<_>>>()
else {
return Err(AvailabilityCheckError::Unexpected);
};
let verified_blobs = VariableList::new(verified_blobs)?;

let is_before_peer_das = !spec.is_peer_das_enabled_for_epoch(diet_executed_block.epoch());
let blobs = is_before_peer_das
.then(|| {
let num_blobs_expected = diet_executed_block.num_blobs_expected();
let Some(verified_blobs) = verified_blobs
.into_iter()
.cloned()
.map(|b| b.map(|b| b.to_blob()))
.take(num_blobs_expected)
.collect::<Option<Vec<_>>>()
else {
return Err(AvailabilityCheckError::Unexpected);
};
Ok(VariableList::new(verified_blobs)?)
})
.transpose()?;

let executed_block = recover(diet_executed_block)?;

Expand All @@ -329,7 +338,7 @@ impl<E: EthSpec> PendingComponents<E> {
let available_block = AvailableBlock {
block_root,
block,
blobs: Some(verified_blobs),
blobs,
blobs_available_timestamp,
// TODO(das) Do we need a check here for number of expected custody columns?
// TODO(das): Update store types to prevent this conversion
Expand Down Expand Up @@ -790,10 +799,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
.epoch()
.ok_or(AvailabilityCheckError::UnableToDetermineImportRequirement)?;

let peer_das_enabled = self
.spec
.eip7594_fork_epoch
.map_or(false, |eip7594_fork_epoch| epoch >= eip7594_fork_epoch);
let peer_das_enabled = self.spec.is_peer_das_enabled_for_epoch(epoch);
if peer_das_enabled {
Ok(BlockImportRequirement::CustodyColumns(
self.custody_column_count,
Expand Down Expand Up @@ -824,7 +830,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
if pending_components.is_available(block_import_requirement, &self.log) {
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
pending_components.make_available(&self.spec, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
} else {
Expand Down Expand Up @@ -864,7 +870,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
if pending_components.is_available(block_import_requirement, &self.log) {
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
pending_components.make_available(&self.spec, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
} else {
Expand Down Expand Up @@ -904,7 +910,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
if pending_components.is_available(block_import_requirement, &self.log) {
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
pending_components.make_available(&self.spec, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ impl<E: EthSpec> DietAvailabilityPendingExecutedBlock<E> {
.cloned()
.unwrap_or_default()
}

/// Returns the epoch corresponding to `self.slot()`.
pub fn epoch(&self) -> Epoch {
self.block.slot().epoch(E::slots_per_epoch())
}
}

/// This LRU cache holds BeaconStates used for block import. If the cache overflows,
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
}
};

// TODO(das): We could potentially get rid of these conversions and pass `GossipVerified` types
// to `publish_block`, i.e. have `GossipVerified` types in `PubsubMessage`?
// This saves us from extra code and provides guarantee that published
// components are verified.
// Clone here, so we can take advantage of the `Arc`. The block in `BlockContents` is not,
// `Arc`'d but blobs are.
let block = gossip_verified_block.block.block_cloned();
Expand Down
10 changes: 10 additions & 0 deletions beacon_node/network/src/network_beacon_processor/gossip_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
return None;
}
Err(e @ BlockError::BlobNotRequired(_)) => {
// TODO(das): penalty not implemented yet as other clients may still send us blobs
// during early stage of implementation.
debug!(self.log, "Received blobs for slot after PeerDAS epoch from peer";
"error" => %e,
"peer_id" => %peer_id,
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return None;
}
};

metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL);
Expand Down
7 changes: 7 additions & 0 deletions consensus/types/src/chain_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,13 @@ impl ChainSpec {
}
}

/// Returns true if the given epoch is greater than or equal to the `EIP7594_FORK_EPOCH`.
pub fn is_peer_das_enabled_for_epoch(&self, block_epoch: Epoch) -> bool {
self.eip7594_fork_epoch.map_or(false, |eip7594_fork_epoch| {
block_epoch >= eip7594_fork_epoch
})
}

/// Returns a full `Fork` struct for a given epoch.
pub fn fork_at_epoch(&self, epoch: Epoch) -> Fork {
let current_fork_name = self.fork_name_at_epoch(epoch);
Expand Down
Loading
Loading