diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 58872978842..145449c553a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3030,11 +3030,11 @@ impl BeaconChain { self.remove_notified(&block_root, r) } - /// Cache the data column in the processing cache, process it, then evict it from the cache if it was + /// Cache the data columns in the processing cache, process it, then evict it from the cache if it was /// imported or errors. - pub async fn process_gossip_data_column( + pub async fn process_gossip_data_columns( self: &Arc, - data_column: GossipVerifiedDataColumn, + data_columns: Vec>, ) -> Result< ( AvailabilityProcessingStatus, @@ -3042,7 +3042,16 @@ impl BeaconChain { ), BlockError, > { - let block_root = data_column.block_root(); + let Ok(block_root) = data_columns + .iter() + .map(|c| c.block_root()) + .unique() + .exactly_one() + else { + return Err(BlockError::InternalError( + "Columns should be from the same block".to_string(), + )); + }; // If this block has already been imported to forkchoice it must have been available, so // we don't need to process its samples again. @@ -3055,7 +3064,7 @@ impl BeaconChain { } let r = self - .check_gossip_data_column_availability_and_import(data_column) + .check_gossip_data_columns_availability_and_import(data_columns) .await; self.remove_notified_custody_columns(&block_root, r) } @@ -3402,9 +3411,9 @@ impl BeaconChain { /// Checks if the provided data column can make any cached blocks available, and imports immediately /// if so, otherwise caches the data column in the data availability checker. - async fn check_gossip_data_column_availability_and_import( + async fn check_gossip_data_columns_availability_and_import( self: &Arc, - data_column: GossipVerifiedDataColumn, + data_columns: Vec>, ) -> Result< ( AvailabilityProcessingStatus, @@ -3412,13 +3421,21 @@ impl BeaconChain { ), BlockError, > { - let slot = data_column.slot(); if let Some(slasher) = self.slasher.as_ref() { - slasher.accept_block_header(data_column.signed_block_header()); + for data_colum in &data_columns { + slasher.accept_block_header(data_colum.signed_block_header()); + } } + + let Ok(slot) = data_columns.iter().map(|c| c.slot()).unique().exactly_one() else { + return Err(BlockError::InternalError( + "Columns for the same block should have matching slot".to_string(), + )); + }; + let (availability, data_columns_to_publish) = self .data_availability_checker - .put_gossip_data_column(data_column)?; + .put_gossip_data_columns(data_columns)?; self.process_availability(slot, availability) .await diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 9dc3ee7029f..942930e183a 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -317,6 +317,13 @@ pub enum BlockError { /// /// This indicates the peer is sending an unexpected gossip blob and should be penalised. BlobNotRequired(Slot), + /// An internal error has occurred when processing the block or sidecars. + /// + /// ## Peer scoring + /// + /// We were unable to process this block due to an internal error. It's unclear if the block is + /// valid. + InternalError(String), } impl From for BlockError { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 7e11a163d5a..12b7502e917 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -233,22 +233,26 @@ impl DataAvailabilityChecker { /// /// This should only accept gossip verified data columns, so we should not have to worry about dupes. #[allow(clippy::type_complexity)] - pub fn put_gossip_data_column( + pub fn put_gossip_data_columns( &self, - gossip_data_column: GossipVerifiedDataColumn, + gossip_data_columns: Vec>, ) -> Result<(Availability, DataColumnsToPublish), AvailabilityCheckError> { let Some(kzg) = self.kzg.as_ref() else { return Err(AvailabilityCheckError::KzgNotInitialized); }; - let block_root = gossip_data_column.block_root(); + let block_root = gossip_data_columns + .first() + .ok_or(AvailabilityCheckError::MissingCustodyColumns)? + .block_root(); - // TODO(das): ensure that our custody requirements include this column - let custody_column = - KzgVerifiedCustodyDataColumn::from_asserted_custody(gossip_data_column.into_inner()); + let custody_columns = gossip_data_columns + .into_iter() + .map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner())) + .collect::>(); self.availability_cache - .put_kzg_verified_data_columns(kzg, block_root, vec![custody_column]) + .put_kzg_verified_data_columns(kzg, block_root, custody_columns) } /// Check if we have all the blobs for a block. Returns `Availability` which has information diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index d6fd796158d..71f55c61468 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -906,7 +906,7 @@ impl OverflowLRUCache { /// Potentially trigger reconstruction if: /// - Our custody requirement is all columns - /// - We >= 50% of columns + /// - We >= 50% of columns, but not all columns fn should_reconstruct( &self, block_import_requirement: &BlockImportRequirement, @@ -917,9 +917,13 @@ impl OverflowLRUCache { return false; }; - !pending_components.reconstruction_started - && *num_expected_columns == T::EthSpec::number_of_columns() - && pending_components.verified_data_columns.len() >= T::EthSpec::number_of_columns() / 2 + let num_of_columns = T::EthSpec::number_of_columns(); + let has_missing_columns = pending_components.verified_data_columns.len() < num_of_columns; + + has_missing_columns + && !pending_components.reconstruction_started + && *num_expected_columns == num_of_columns + && pending_components.verified_data_columns.len() >= num_of_columns / 2 } pub fn put_kzg_verified_blobs>>( diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 787e14b6a00..87e79924d3c 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -273,31 +273,33 @@ pub async fn publish_block &msg - ); - Err(warp_utils::reject::custom_bad_request(msg)) - }; - } - } + let custody_columns_indices = + network_globals + .custody_columns(block.epoch()) + .map_err(|e| { + warp_utils::reject::broadcast_without_import(format!( + "Failed to compute custody column indices: {:?}", + e + )) + })?; + + let custody_columns = gossip_verified_data_columns + .into_iter() + .filter(|data_column| custody_columns_indices.contains(&data_column.index())) + .collect(); + + if let Err(e) = Box::pin(chain.process_gossip_data_columns(custody_columns)).await { + let msg = format!("Invalid data column: {e}"); + return if let BroadcastValidation::Gossip = validation_level { + Err(warp_utils::reject::broadcast_without_import(msg)) + } else { + error!( + log, + "Invalid blob provided to HTTP API"; + "reason" => &msg + ); + Err(warp_utils::reject::custom_bad_request(msg)) + }; } } diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index edd145a7168..f3a3f00a125 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -981,7 +981,7 @@ impl NetworkBeaconProcessor { match self .chain - .process_gossip_data_column(verified_data_column) + .process_gossip_data_columns(vec![verified_data_column]) .await { Ok((availability, data_columns_to_publish)) => { @@ -1266,6 +1266,12 @@ impl NetworkBeaconProcessor { ); return None; } + Err(e @ BlockError::InternalError(_)) => { + error!(self.log, "Internal block gossip validation error"; + "error" => %e + ); + return None; + } Err(e @ BlockError::BlobNotRequired(_)) => { // TODO(das): penalty not implemented yet as other clients may still send us blobs // during early stage of implementation.