Skip to content

Commit

Permalink
Only import custody data columns after publishing a block.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmygchen committed Apr 16, 2024
1 parent 41d6225 commit cfab1a2
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1261,12 +1261,14 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.and(log_filter.clone())
.then(
move |block_contents: PublishBlockRequest<T::EthSpec>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_block(
Expand All @@ -1277,6 +1279,7 @@ pub fn serve<T: BeaconChainTypes>(
log,
BroadcastValidation::default(),
duplicate_block_status_code,
network_globals,
)
.await
})
Expand All @@ -1292,13 +1295,15 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.and(log_filter.clone())
.then(
move |block_bytes: Bytes,
consensus_version: ForkName,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block_contents = PublishBlockRequest::<T::EthSpec>::from_ssz_bytes(
Expand All @@ -1316,6 +1321,7 @@ pub fn serve<T: BeaconChainTypes>(
log,
BroadcastValidation::default(),
duplicate_block_status_code,
network_globals,
)
.await
})
Expand All @@ -1331,13 +1337,15 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.and(log_filter.clone())
.then(
move |validation_level: api_types::BroadcastValidationQuery,
block_contents: PublishBlockRequest<T::EthSpec>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_block(
Expand All @@ -1348,6 +1356,7 @@ pub fn serve<T: BeaconChainTypes>(
log,
validation_level.broadcast_validation,
duplicate_block_status_code,
network_globals,
)
.await
})
Expand All @@ -1364,6 +1373,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.and(log_filter.clone())
.then(
move |validation_level: api_types::BroadcastValidationQuery,
Expand All @@ -1372,6 +1382,7 @@ pub fn serve<T: BeaconChainTypes>(
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block_contents = PublishBlockRequest::<T::EthSpec>::from_ssz_bytes(
Expand All @@ -1389,6 +1400,7 @@ pub fn serve<T: BeaconChainTypes>(
log,
validation_level.broadcast_validation,
duplicate_block_status_code,
network_globals,
)
.await
})
Expand All @@ -1408,12 +1420,14 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.and(log_filter.clone())
.then(
move |block_contents: Arc<SignedBlindedBeaconBlock<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_blinded_block(
Expand All @@ -1423,6 +1437,7 @@ pub fn serve<T: BeaconChainTypes>(
log,
BroadcastValidation::default(),
duplicate_block_status_code,
network_globals,
)
.await
})
Expand All @@ -1438,12 +1453,14 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.and(log_filter.clone())
.then(
move |block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block = SignedBlindedBeaconBlock::<T::EthSpec>::from_ssz_bytes(
Expand All @@ -1461,6 +1478,7 @@ pub fn serve<T: BeaconChainTypes>(
log,
BroadcastValidation::default(),
duplicate_block_status_code,
network_globals,
)
.await
})
Expand All @@ -1476,13 +1494,15 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.and(log_filter.clone())
.then(
move |validation_level: api_types::BroadcastValidationQuery,
blinded_block: Arc<SignedBlindedBeaconBlock<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_blocks::publish_blinded_block(
Expand All @@ -1492,6 +1512,7 @@ pub fn serve<T: BeaconChainTypes>(
log,
validation_level.broadcast_validation,
duplicate_block_status_code,
network_globals,
)
.await
})
Expand All @@ -1507,13 +1528,15 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(network_globals.clone())
.and(log_filter.clone())
.then(
move |validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block = SignedBlindedBeaconBlock::<T::EthSpec>::from_ssz_bytes(
Expand All @@ -1531,6 +1554,7 @@ pub fn serve<T: BeaconChainTypes>(
log,
validation_level.broadcast_validation,
duplicate_block_status_code,
network_globals,
)
.await
})
Expand Down
35 changes: 34 additions & 1 deletion beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use beacon_chain::{
use eth2::types::{into_full_block_and_blobs, BroadcastValidation, ErrorMessage};
use eth2::types::{FullPayloadContents, PublishBlockRequest};
use execution_layer::ProvenancedPayload;
use lighthouse_network::PubsubMessage;
use lighthouse_network::{NetworkGlobals, PubsubMessage};
use network::NetworkMessage;
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
Expand Down Expand Up @@ -46,6 +46,7 @@ impl<T: BeaconChainTypes, B: IntoGossipVerifiedBlockContents<T>> ProvenancedBloc
}

/// Handles a request from the HTTP API for full blocks.
#[allow(clippy::too_many_arguments)]
pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockContents<T>>(
block_root: Option<Hash256>,
provenanced_block: ProvenancedBlock<T, B>,
Expand All @@ -54,6 +55,7 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
log: Logger,
validation_level: BroadcastValidation,
duplicate_status_code: StatusCode,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
) -> Result<Response, Rejection> {
let seen_timestamp = timestamp_now();

Expand Down Expand Up @@ -238,6 +240,35 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
}
}

if let Some(gossip_verified_data_columns) = gossip_verified_data_columns {
let custody_columns = network_globals
.custody_columns(block.epoch())
.map_err(|e| {
warp_utils::reject::broadcast_without_import(format!(
"Failed to compute custody column indices: {:?}",
e
))
})?;

for data_column in gossip_verified_data_columns {
if custody_columns.contains(&data_column.index()) {
if let Err(e) = Box::pin(chain.process_gossip_data_column(data_column)).await {
let msg = format!("Invalid data column: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(msg))
} else {
error!(
log,
"Invalid blob provided to HTTP API";
"reason" => &msg
);
Err(warp_utils::reject::custom_bad_request(msg))
};
}
}
}
}

match Box::pin(chain.process_block(
block_root,
gossip_verified_block,
Expand Down Expand Up @@ -324,6 +355,7 @@ pub async fn publish_blinded_block<T: BeaconChainTypes>(
log: Logger,
validation_level: BroadcastValidation,
duplicate_status_code: StatusCode,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
) -> Result<Response, Rejection> {
let block_root = blinded_block.canonical_root();
let full_block: ProvenancedBlock<T, PublishBlockRequest<T::EthSpec>> =
Expand All @@ -336,6 +368,7 @@ pub async fn publish_blinded_block<T: BeaconChainTypes>(
log,
validation_level,
duplicate_status_code,
network_globals,
)
.await
}
Expand Down
6 changes: 6 additions & 0 deletions beacon_node/http_api/tests/broadcast_validation_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ pub async fn consensus_partial_pass_only_consensus() {

/* submit `block_b` which should induce equivocation */
let channel = tokio::sync::mpsc::unbounded_channel();
let network_globals = tester.ctx.network_globals.clone().unwrap();

let publication_result = publish_block(
None,
Expand All @@ -379,6 +380,7 @@ pub async fn consensus_partial_pass_only_consensus() {
test_logger,
validation_level.unwrap(),
StatusCode::ACCEPTED,
network_globals,
)
.await;

Expand Down Expand Up @@ -659,6 +661,7 @@ pub async fn equivocation_consensus_late_equivocation() {
assert!(gossip_block_contents_a.is_err());

let channel = tokio::sync::mpsc::unbounded_channel();
let network_globals = tester.ctx.network_globals.clone().unwrap();

let publication_result = publish_block(
None,
Expand All @@ -668,6 +671,7 @@ pub async fn equivocation_consensus_late_equivocation() {
test_logger,
validation_level.unwrap(),
StatusCode::ACCEPTED,
network_globals,
)
.await;

Expand Down Expand Up @@ -1305,6 +1309,7 @@ pub async fn blinded_equivocation_consensus_late_equivocation() {
assert!(gossip_block_a.is_err());

let channel = tokio::sync::mpsc::unbounded_channel();
let network_globals = tester.ctx.network_globals.clone().unwrap();

let publication_result = publish_blinded_block(
block_b,
Expand All @@ -1313,6 +1318,7 @@ pub async fn blinded_equivocation_consensus_late_equivocation() {
test_logger,
validation_level.unwrap(),
StatusCode::ACCEPTED,
network_globals,
)
.await;

Expand Down
1 change: 1 addition & 0 deletions beacon_node/lighthouse_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ tempfile = { workspace = true }
quickcheck = { workspace = true }
quickcheck_macros = { workspace = true }
async-channel = { workspace = true }
logging = { workspace = true }

[features]
libp2p-websocket = []
Expand Down
1 change: 1 addition & 0 deletions beacon_node/lighthouse_network/src/discovery/enr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ pub fn build_enr<E: EthSpec>(
// set the "custody_subnet_count" field on our ENR
let custody_subnet_count = E::min_custody_requirement() as u64;

// TODO(das) read from cli
builder.add_value(
PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY,
&custody_subnet_count.as_ssz_bytes(),
Expand Down
33 changes: 31 additions & 2 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
use crate::peer_manager::peerdb::PeerDB;
use crate::rpc::{MetaData, MetaDataV2};
use crate::types::{BackFillState, SyncState};
use crate::Client;
use crate::EnrExt;
use crate::{Client, Eth2Enr};
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
use parking_lot::RwLock;
use std::collections::HashSet;
use types::EthSpec;
use types::data_column_sidecar::ColumnIndex;
use types::{DataColumnSubnetId, Epoch, EthSpec};

pub struct NetworkGlobals<E: EthSpec> {
/// The current local ENR.
Expand Down Expand Up @@ -110,6 +111,17 @@ impl<E: EthSpec> NetworkGlobals<E> {
std::mem::replace(&mut *self.sync_state.write(), new_state)
}

/// Compute custody data columns the node is assigned to custody.
pub fn custody_columns(&self, _epoch: Epoch) -> Result<Vec<ColumnIndex>, &'static str> {
let enr = self.local_enr();
let node_id = enr.node_id().raw().into();
let custody_subnet_count = enr.custody_subnet_count::<E>()?;
Ok(
DataColumnSubnetId::compute_custody_columns::<E>(node_id, custody_subnet_count)
.collect(),
)
}

/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(trusted_peers: Vec<PeerId>, log: &slog::Logger) -> NetworkGlobals<E> {
use crate::CombinedKeyExt;
Expand All @@ -129,3 +141,20 @@ impl<E: EthSpec> NetworkGlobals<E> {
)
}
}

#[cfg(test)]
mod test {
use crate::NetworkGlobals;
use types::{Epoch, EthSpec, MainnetEthSpec as E};

#[test]
fn test_custody_count_default() {
let log = logging::test_logger();
let default_custody_requirement_column_count =
E::number_of_columns() / E::data_column_subnet_count();
let globals = NetworkGlobals::<E>::new_test_globals(vec![], &log);
let any_epoch = Epoch::new(0);
let columns = globals.custody_columns(any_epoch).unwrap();
assert_eq!(columns.len(), default_custody_requirement_column_count);
}
}
Loading

0 comments on commit cfab1a2

Please sign in to comment.