Skip to content

Commit

Permalink
Don't perform reconstruction for proposer node as it already has all …
Browse files Browse the repository at this point in the history
…the columns. (#5806)
  • Loading branch information
jimmygchen authored May 17, 2024
1 parent c98bb52 commit a88ca3c
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 47 deletions.
37 changes: 27 additions & 10 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3030,19 +3030,28 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.remove_notified(&block_root, r)
}

/// Cache the data column in the processing cache, process it, then evict it from the cache if it was
/// Cache the data columns in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_gossip_data_column(
pub async fn process_gossip_data_columns(
self: &Arc<Self>,
data_column: GossipVerifiedDataColumn<T>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError<T::EthSpec>,
> {
let block_root = data_column.block_root();
let Ok(block_root) = data_columns
.iter()
.map(|c| c.block_root())
.unique()
.exactly_one()
else {
return Err(BlockError::InternalError(
"Columns should be from the same block".to_string(),
));
};

// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its samples again.
Expand All @@ -3055,7 +3064,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

let r = self
.check_gossip_data_column_availability_and_import(data_column)
.check_gossip_data_columns_availability_and_import(data_columns)
.await;
self.remove_notified_custody_columns(&block_root, r)
}
Expand Down Expand Up @@ -3402,23 +3411,31 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Checks if the provided data column can make any cached blocks available, and imports immediately
/// if so, otherwise caches the data column in the data availability checker.
async fn check_gossip_data_column_availability_and_import(
async fn check_gossip_data_columns_availability_and_import(
self: &Arc<Self>,
data_column: GossipVerifiedDataColumn<T>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError<T::EthSpec>,
> {
let slot = data_column.slot();
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(data_column.signed_block_header());
for data_colum in &data_columns {
slasher.accept_block_header(data_colum.signed_block_header());
}
}

let Ok(slot) = data_columns.iter().map(|c| c.slot()).unique().exactly_one() else {
return Err(BlockError::InternalError(
"Columns for the same block should have matching slot".to_string(),
));
};

let (availability, data_columns_to_publish) = self
.data_availability_checker
.put_gossip_data_column(data_column)?;
.put_gossip_data_columns(data_columns)?;

self.process_availability(slot, availability)
.await
Expand Down
7 changes: 7 additions & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,13 @@ pub enum BlockError<E: EthSpec> {
///
/// This indicates the peer is sending an unexpected gossip blob and should be penalised.
BlobNotRequired(Slot),
/// An internal error has occurred when processing the block or sidecars.
///
/// ## Peer scoring
///
/// We were unable to process this block due to an internal error. It's unclear if the block is
/// valid.
InternalError(String),
}

impl<E: EthSpec> From<AvailabilityCheckError> for BlockError<E> {
Expand Down
18 changes: 11 additions & 7 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,22 +233,26 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
///
/// This should only accept gossip verified data columns, so we should not have to worry about dupes.
#[allow(clippy::type_complexity)]
pub fn put_gossip_data_column(
pub fn put_gossip_data_columns(
&self,
gossip_data_column: GossipVerifiedDataColumn<T>,
gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<(Availability<T::EthSpec>, DataColumnsToPublish<T::EthSpec>), AvailabilityCheckError>
{
let Some(kzg) = self.kzg.as_ref() else {
return Err(AvailabilityCheckError::KzgNotInitialized);
};
let block_root = gossip_data_column.block_root();
let block_root = gossip_data_columns
.first()
.ok_or(AvailabilityCheckError::MissingCustodyColumns)?
.block_root();

// TODO(das): ensure that our custody requirements include this column
let custody_column =
KzgVerifiedCustodyDataColumn::from_asserted_custody(gossip_data_column.into_inner());
let custody_columns = gossip_data_columns
.into_iter()
.map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner()))
.collect::<Vec<_>>();

self.availability_cache
.put_kzg_verified_data_columns(kzg, block_root, vec![custody_column])
.put_kzg_verified_data_columns(kzg, block_root, custody_columns)
}

/// Check if we have all the blobs for a block. Returns `Availability` which has information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {

/// Potentially trigger reconstruction if:
/// - Our custody requirement is all columns
/// - We >= 50% of columns
/// - We >= 50% of columns, but not all columns
fn should_reconstruct(
&self,
block_import_requirement: &BlockImportRequirement,
Expand All @@ -917,9 +917,13 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
return false;
};

!pending_components.reconstruction_started
&& *num_expected_columns == T::EthSpec::number_of_columns()
&& pending_components.verified_data_columns.len() >= T::EthSpec::number_of_columns() / 2
let num_of_columns = T::EthSpec::number_of_columns();
let has_missing_columns = pending_components.verified_data_columns.len() < num_of_columns;

has_missing_columns
&& !pending_components.reconstruction_started
&& *num_expected_columns == num_of_columns
&& pending_components.verified_data_columns.len() >= num_of_columns / 2
}

pub fn put_kzg_verified_blobs<I: IntoIterator<Item = KzgVerifiedBlob<T::EthSpec>>>(
Expand Down
52 changes: 27 additions & 25 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,31 +273,33 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
}

if let Some(gossip_verified_data_columns) = gossip_verified_data_columns {
let custody_columns = network_globals
.custody_columns(block.epoch())
.map_err(|e| {
warp_utils::reject::broadcast_without_import(format!(
"Failed to compute custody column indices: {:?}",
e
))
})?;

for data_column in gossip_verified_data_columns {
if custody_columns.contains(&data_column.index()) {
if let Err(e) = Box::pin(chain.process_gossip_data_column(data_column)).await {
let msg = format!("Invalid data column: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(msg))
} else {
error!(
log,
"Invalid blob provided to HTTP API";
"reason" => &msg
);
Err(warp_utils::reject::custom_bad_request(msg))
};
}
}
let custody_columns_indices =
network_globals
.custody_columns(block.epoch())
.map_err(|e| {
warp_utils::reject::broadcast_without_import(format!(
"Failed to compute custody column indices: {:?}",
e
))
})?;

let custody_columns = gossip_verified_data_columns
.into_iter()
.filter(|data_column| custody_columns_indices.contains(&data_column.index()))
.collect();

if let Err(e) = Box::pin(chain.process_gossip_data_columns(custody_columns)).await {
let msg = format!("Invalid data column: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(msg))
} else {
error!(
log,
"Invalid blob provided to HTTP API";
"reason" => &msg
);
Err(warp_utils::reject::custom_bad_request(msg))
};
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {

match self
.chain
.process_gossip_data_column(verified_data_column)
.process_gossip_data_columns(vec![verified_data_column])
.await
{
Ok((availability, data_columns_to_publish)) => {
Expand Down Expand Up @@ -1266,6 +1266,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
return None;
}
Err(e @ BlockError::InternalError(_)) => {
error!(self.log, "Internal block gossip validation error";
"error" => %e
);
return None;
}
Err(e @ BlockError::BlobNotRequired(_)) => {
// TODO(das): penalty not implemented yet as other clients may still send us blobs
// during early stage of implementation.
Expand Down

0 comments on commit a88ca3c

Please sign in to comment.