Skip to content

Commit

Permalink
Add DataColumnSidecar gossip topic and verification (sigp#5050 and s…
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmygchen committed Jul 22, 2024
1 parent 06dff60 commit 0ef9a1a
Show file tree
Hide file tree
Showing 30 changed files with 1,769 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 91 additions & 2 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::early_attester_cache::EarlyAttesterCache;
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
Expand Down Expand Up @@ -51,8 +52,8 @@ use crate::observed_aggregates::{
use crate::observed_attesters::{
ObservedAggregators, ObservedAttesters, ObservedSyncAggregators, ObservedSyncContributors,
};
use crate::observed_blob_sidecars::ObservedBlobSidecars;
use crate::observed_block_producers::ObservedBlockProducers;
use crate::observed_data_sidecars::ObservedDataSidecars;
use crate::observed_operations::{ObservationOutcome, ObservedOperations};
use crate::observed_slashable::ObservedSlashable;
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
Expand Down Expand Up @@ -426,7 +427,9 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// Maintains a record of which validators have proposed blocks for each slot.
pub observed_block_producers: RwLock<ObservedBlockProducers<T::EthSpec>>,
/// Maintains a record of blob sidecars seen over the gossip network.
pub observed_blob_sidecars: RwLock<ObservedBlobSidecars<T::EthSpec>>,
pub observed_blob_sidecars: RwLock<ObservedDataSidecars<BlobSidecar<T::EthSpec>>>,
/// Maintains a record of column sidecars seen over the gossip network.
pub observed_column_sidecars: RwLock<ObservedDataSidecars<DataColumnSidecar<T::EthSpec>>>,
/// Maintains a record of slashable message seen over the gossip network or RPC.
pub observed_slashable: RwLock<ObservedSlashable<T::EthSpec>>,
/// Maintains a record of which validators have submitted voluntary exits.
Expand Down Expand Up @@ -2118,6 +2121,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}

pub fn verify_data_column_sidecar_for_gossip(
self: &Arc<Self>,
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
subnet_id: u64,
) -> Result<GossipVerifiedDataColumn<T>, GossipDataColumnError<T::EthSpec>> {
metrics::inc_counter(&metrics::BLOBS_COLUMN_SIDECAR_PROCESSING_REQUESTS);
let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES);
GossipVerifiedDataColumn::new(data_column_sidecar, subnet_id, self).map(|v| {
metrics::inc_counter(&metrics::DATA_COLUMNS_SIDECAR_PROCESSING_SUCCESSES);
v
})
}

pub fn verify_blob_sidecar_for_gossip(
self: &Arc<Self>,
blob_sidecar: Arc<BlobSidecar<T::EthSpec>>,
Expand Down Expand Up @@ -2964,6 +2980,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.remove_notified(&block_root, r)
}

/// Cache the data columns in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_gossip_data_columns(
self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let Ok(block_root) = data_columns
.iter()
.map(|c| c.block_root())
.unique()
.exactly_one()
else {
return Err(BlockError::InternalError(
"Columns should be from the same block".to_string(),
));
};

// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its samples again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown(block_root));
}

let r = self
.check_gossip_data_columns_availability_and_import(data_columns)
.await;
self.remove_notified_custody_columns(&block_root, r)
}

/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_rpc_blobs(
Expand Down Expand Up @@ -3013,6 +3062,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
r
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified_custody_columns(
&self,
block_root: &Hash256,
r: Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let has_missing_components =
matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _)));
if !has_missing_components {
self.reqresp_pre_import_cache.write().remove(block_root);
}
r
}

/// Wraps `process_block` in logic to cache the block's commitments in the processing cache
/// and evict if the block was imported or errored.
pub async fn process_block_with_early_caching<B: IntoExecutionPendingBlock<T>>(
Expand Down Expand Up @@ -3257,6 +3321,31 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.process_availability(slot, availability).await
}

/// Checks if the provided data column can make any cached blocks available, and imports immediately
/// if so, otherwise caches the data column in the data availability checker.
async fn check_gossip_data_columns_availability_and_import(
self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
if let Some(slasher) = self.slasher.as_ref() {
for data_colum in &data_columns {
slasher.accept_block_header(data_colum.signed_block_header());
}
}

let Ok(slot) = data_columns.iter().map(|c| c.slot()).unique().exactly_one() else {
return Err(BlockError::InternalError(
"Columns for the same block should have matching slot".to_string(),
));
};

let availability = self
.data_availability_checker
.put_gossip_data_columns(data_columns)?;

self.process_availability(slot, availability).await
}

/// Checks if the provided blobs can make any cached blocks available, and imports immediately
/// if so, otherwise caches the blob in the data availability checker.
async fn check_rpc_blob_availability_and_import(
Expand Down
39 changes: 39 additions & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::block_verification_types::{
AsBlock, BlockContentsError, BlockImportData, GossipVerifiedBlockContents, RpcBlock,
};
use crate::data_availability_checker::{AvailabilityCheckError, MaybeAvailableBlock};
use crate::data_column_verification::GossipDataColumnError;
use crate::eth1_finalization_cache::Eth1FinalizationData;
use crate::execution_payload::{
is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block,
Expand Down Expand Up @@ -303,6 +304,13 @@ pub enum BlockError<E: EthSpec> {
/// TODO: We may need to penalize the peer that gave us a potentially invalid rpc blob.
/// https://github.com/sigp/lighthouse/issues/4546
AvailabilityCheck(AvailabilityCheckError),
/// An internal error has occurred when processing the block or sidecars.
///
/// ## Peer scoring
///
/// We were unable to process this block due to an internal error. It's unclear if the block is
/// valid.
InternalError(String),
}

impl<E: EthSpec> From<AvailabilityCheckError> for BlockError<E> {
Expand Down Expand Up @@ -523,6 +531,20 @@ impl<E: EthSpec> BlockSlashInfo<GossipBlobError<E>> {
}
}

impl<E: EthSpec> BlockSlashInfo<GossipDataColumnError<E>> {
pub fn from_early_error_data_column(
header: SignedBeaconBlockHeader,
e: GossipDataColumnError<E>,
) -> Self {
match e {
GossipDataColumnError::ProposalSignatureInvalid => BlockSlashInfo::SignatureInvalid(e),
// `InvalidSignature` could indicate any signature in the block, so we want
// to recheck the proposer signature alone.
_ => BlockSlashInfo::SignatureNotChecked(header, e),
}
}
}

/// Process invalid blocks to see if they are suitable for the slasher.
///
/// If no slasher is configured, this is a no-op.
Expand Down Expand Up @@ -2007,6 +2029,23 @@ impl<E: EthSpec> BlockBlobError for GossipBlobError<E> {
}
}

impl<E: EthSpec> BlockBlobError for GossipDataColumnError<E> {
fn not_later_than_parent_error(data_column_slot: Slot, parent_slot: Slot) -> Self {
GossipDataColumnError::IsNotLaterThanParent {
data_column_slot,
parent_slot,
}
}

fn unknown_validator_error(validator_index: u64) -> Self {
GossipDataColumnError::UnknownValidator(validator_index)
}

fn proposer_signature_invalid() -> Self {
GossipDataColumnError::ProposalSignatureInvalid
}
}

/// Performs a cheap (time-efficient) state advancement so the committees and proposer shuffling for
/// `slot` can be obtained from `state`.
///
Expand Down
4 changes: 3 additions & 1 deletion beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::graffiti_calculator::{GraffitiCalculator, GraffitiOrigin};
use crate::head_tracker::HeadTracker;
use crate::light_client_server_cache::LightClientServerCache;
use crate::migrate::{BackgroundMigrator, MigratorConfig};
use crate::observed_data_sidecars::ObservedDataSidecars;
use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::timeout_rw_lock::TimeoutRwLock;
Expand Down Expand Up @@ -918,7 +919,8 @@ where
observed_sync_aggregators: <_>::default(),
// TODO: allow for persisting and loading the pool from disk.
observed_block_producers: <_>::default(),
observed_blob_sidecars: <_>::default(),
observed_column_sidecars: RwLock::new(ObservedDataSidecars::new(self.spec.clone())),
observed_blob_sidecars: RwLock::new(ObservedDataSidecars::new(self.spec.clone())),
observed_slashable: <_>::default(),
observed_voluntary_exits: <_>::default(),
observed_proposer_slashings: <_>::default(),
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod error;
mod overflow_lru_cache;
mod state_lru_cache;

use crate::data_column_verification::GossipVerifiedDataColumn;
pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
use types::non_zero_usize::new_non_zero_usize;

Expand Down Expand Up @@ -175,6 +176,14 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.put_kzg_verified_blobs(gossip_blob.block_root(), vec![gossip_blob.into_inner()])
}

pub fn put_gossip_data_columns(
&self,
_gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
// TODO(das) to be implemented
unimplemented!("not implemented")
}

/// Check if we have all the blobs for a block. Returns `Availability` which has information
/// about whether all components have been received or more are required.
pub fn put_pending_executed_block(
Expand Down
Loading

0 comments on commit 0ef9a1a

Please sign in to comment.