diff --git a/Cargo.lock b/Cargo.lock index 64d124ac9c7..3b101ab0188 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4911,6 +4911,7 @@ dependencies = [ "libp2p-mplex", "lighthouse_metrics", "lighthouse_version", + "logging", "lru", "lru_cache", "parking_lot 0.12.1", diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index cc117c3fb92..4f5f6bd6948 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1261,12 +1261,14 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) + .and(network_globals.clone()) .and(log_filter.clone()) .then( move |block_contents: PublishBlockRequest, task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, + network_globals: Arc>, log: Logger| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { publish_blocks::publish_block( @@ -1277,6 +1279,7 @@ pub fn serve( log, BroadcastValidation::default(), duplicate_block_status_code, + network_globals, ) .await }) @@ -1292,6 +1295,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) + .and(network_globals.clone()) .and(log_filter.clone()) .then( move |block_bytes: Bytes, @@ -1299,6 +1303,7 @@ pub fn serve( task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, + network_globals: Arc>, log: Logger| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { let block_contents = PublishBlockRequest::::from_ssz_bytes( @@ -1316,6 +1321,7 @@ pub fn serve( log, BroadcastValidation::default(), duplicate_block_status_code, + network_globals, ) .await }) @@ -1331,6 +1337,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) + .and(network_globals.clone()) .and(log_filter.clone()) .then( move |validation_level: api_types::BroadcastValidationQuery, @@ -1338,6 +1345,7 @@ pub fn serve( task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, + network_globals: Arc>, log: Logger| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { publish_blocks::publish_block( @@ -1348,6 +1356,7 @@ pub fn serve( log, validation_level.broadcast_validation, duplicate_block_status_code, + network_globals, ) .await }) @@ -1364,6 +1373,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) + .and(network_globals.clone()) .and(log_filter.clone()) .then( move |validation_level: api_types::BroadcastValidationQuery, @@ -1372,6 +1382,7 @@ pub fn serve( task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, + network_globals: Arc>, log: Logger| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { let block_contents = PublishBlockRequest::::from_ssz_bytes( @@ -1389,6 +1400,7 @@ pub fn serve( log, validation_level.broadcast_validation, duplicate_block_status_code, + network_globals, ) .await }) @@ -1408,12 +1420,14 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) + .and(network_globals.clone()) .and(log_filter.clone()) .then( move |block_contents: Arc>, task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, + network_globals: Arc>, log: Logger| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { publish_blocks::publish_blinded_block( @@ -1423,6 +1437,7 @@ pub fn serve( log, BroadcastValidation::default(), duplicate_block_status_code, + network_globals, ) .await }) @@ -1438,12 +1453,14 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) + .and(network_globals.clone()) .and(log_filter.clone()) .then( move |block_bytes: Bytes, task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, + network_globals: Arc>, log: Logger| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { let block = SignedBlindedBeaconBlock::::from_ssz_bytes( @@ -1461,6 +1478,7 @@ pub fn serve( log, BroadcastValidation::default(), duplicate_block_status_code, + network_globals, ) .await }) @@ -1476,6 +1494,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) + .and(network_globals.clone()) .and(log_filter.clone()) .then( move |validation_level: api_types::BroadcastValidationQuery, @@ -1483,6 +1502,7 @@ pub fn serve( task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, + network_globals: Arc>, log: Logger| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { publish_blocks::publish_blinded_block( @@ -1492,6 +1512,7 @@ pub fn serve( log, validation_level.broadcast_validation, duplicate_block_status_code, + network_globals, ) .await }) @@ -1507,6 +1528,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) + .and(network_globals.clone()) .and(log_filter.clone()) .then( move |validation_level: api_types::BroadcastValidationQuery, @@ -1514,6 +1536,7 @@ pub fn serve( task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, + network_globals: Arc>, log: Logger| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { let block = SignedBlindedBeaconBlock::::from_ssz_bytes( @@ -1531,6 +1554,7 @@ pub fn serve( log, validation_level.broadcast_validation, duplicate_block_status_code, + network_globals, ) .await }) diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 59ab3388d8f..7155f6c4889 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -9,7 +9,7 @@ use beacon_chain::{ use eth2::types::{into_full_block_and_blobs, BroadcastValidation, ErrorMessage}; use eth2::types::{FullPayloadContents, PublishBlockRequest}; use execution_layer::ProvenancedPayload; -use lighthouse_network::PubsubMessage; +use lighthouse_network::{NetworkGlobals, PubsubMessage}; use network::NetworkMessage; use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; @@ -46,6 +46,7 @@ impl> ProvenancedBloc } /// Handles a request from the HTTP API for full blocks. +#[allow(clippy::too_many_arguments)] pub async fn publish_block>( block_root: Option, provenanced_block: ProvenancedBlock, @@ -54,6 +55,7 @@ pub async fn publish_block>, ) -> Result { let seen_timestamp = timestamp_now(); @@ -238,6 +240,35 @@ pub async fn publish_block &msg + ); + Err(warp_utils::reject::custom_bad_request(msg)) + }; + } + } + } + } + match Box::pin(chain.process_block( block_root, gossip_verified_block, @@ -324,6 +355,7 @@ pub async fn publish_blinded_block( log: Logger, validation_level: BroadcastValidation, duplicate_status_code: StatusCode, + network_globals: Arc>, ) -> Result { let block_root = blinded_block.canonical_root(); let full_block: ProvenancedBlock> = @@ -336,6 +368,7 @@ pub async fn publish_blinded_block( log, validation_level, duplicate_status_code, + network_globals, ) .await } diff --git a/beacon_node/http_api/tests/broadcast_validation_tests.rs b/beacon_node/http_api/tests/broadcast_validation_tests.rs index 6a3f7947e6b..702b636ff9c 100644 --- a/beacon_node/http_api/tests/broadcast_validation_tests.rs +++ b/beacon_node/http_api/tests/broadcast_validation_tests.rs @@ -370,6 +370,7 @@ pub async fn consensus_partial_pass_only_consensus() { /* submit `block_b` which should induce equivocation */ let channel = tokio::sync::mpsc::unbounded_channel(); + let network_globals = tester.ctx.network_globals.clone().unwrap(); let publication_result = publish_block( None, @@ -379,6 +380,7 @@ pub async fn consensus_partial_pass_only_consensus() { test_logger, validation_level.unwrap(), StatusCode::ACCEPTED, + network_globals, ) .await; @@ -659,6 +661,7 @@ pub async fn equivocation_consensus_late_equivocation() { assert!(gossip_block_contents_a.is_err()); let channel = tokio::sync::mpsc::unbounded_channel(); + let network_globals = tester.ctx.network_globals.clone().unwrap(); let publication_result = publish_block( None, @@ -668,6 +671,7 @@ pub async fn equivocation_consensus_late_equivocation() { test_logger, validation_level.unwrap(), StatusCode::ACCEPTED, + network_globals, ) .await; @@ -1305,6 +1309,7 @@ pub async fn blinded_equivocation_consensus_late_equivocation() { assert!(gossip_block_a.is_err()); let channel = tokio::sync::mpsc::unbounded_channel(); + let network_globals = tester.ctx.network_globals.clone().unwrap(); let publication_result = publish_blinded_block( block_b, @@ -1313,6 +1318,7 @@ pub async fn blinded_equivocation_consensus_late_equivocation() { test_logger, validation_level.unwrap(), StatusCode::ACCEPTED, + network_globals, ) .await; diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index 494fd6892a9..a8e8e13668a 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -70,6 +70,7 @@ tempfile = { workspace = true } quickcheck = { workspace = true } quickcheck_macros = { workspace = true } async-channel = { workspace = true } +logging = { workspace = true } [features] libp2p-websocket = [] diff --git a/beacon_node/lighthouse_network/src/discovery/enr.rs b/beacon_node/lighthouse_network/src/discovery/enr.rs index a2aa640eb2b..72d0afd9cad 100644 --- a/beacon_node/lighthouse_network/src/discovery/enr.rs +++ b/beacon_node/lighthouse_network/src/discovery/enr.rs @@ -240,6 +240,7 @@ pub fn build_enr( // set the "custody_subnet_count" field on our ENR let custody_subnet_count = E::min_custody_requirement() as u64; + // TODO(das) read from cli builder.add_value( PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, &custody_subnet_count.as_ssz_bytes(), diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index f9ed2c9f740..ff2cb97d057 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -2,12 +2,13 @@ use crate::peer_manager::peerdb::PeerDB; use crate::rpc::{MetaData, MetaDataV2}; use crate::types::{BackFillState, SyncState}; -use crate::Client; use crate::EnrExt; +use crate::{Client, Eth2Enr}; use crate::{Enr, GossipTopic, Multiaddr, PeerId}; use parking_lot::RwLock; use std::collections::HashSet; -use types::EthSpec; +use types::data_column_sidecar::ColumnIndex; +use types::{DataColumnSubnetId, Epoch, EthSpec}; pub struct NetworkGlobals { /// The current local ENR. @@ -110,6 +111,17 @@ impl NetworkGlobals { std::mem::replace(&mut *self.sync_state.write(), new_state) } + /// Compute custody data columns the node is assigned to custody. + pub fn custody_columns(&self, _epoch: Epoch) -> Result, &'static str> { + let enr = self.local_enr(); + let node_id = enr.node_id().raw().into(); + let custody_subnet_count = enr.custody_subnet_count::()?; + Ok( + DataColumnSubnetId::compute_custody_columns::(node_id, custody_subnet_count) + .collect(), + ) + } + /// TESTING ONLY. Build a dummy NetworkGlobals instance. pub fn new_test_globals(trusted_peers: Vec, log: &slog::Logger) -> NetworkGlobals { use crate::CombinedKeyExt; @@ -129,3 +141,20 @@ impl NetworkGlobals { ) } } + +#[cfg(test)] +mod test { + use crate::NetworkGlobals; + use types::{Epoch, EthSpec, MainnetEthSpec as E}; + + #[test] + fn test_custody_count_default() { + let log = logging::test_logger(); + let default_custody_requirement_column_count = + E::number_of_columns() / E::data_column_subnet_count(); + let globals = NetworkGlobals::::new_test_globals(vec![], &log); + let any_epoch = Epoch::new(0); + let columns = globals.custody_columns(any_epoch).unwrap(); + assert_eq!(columns.len(), default_custody_requirement_column_count); + } +} diff --git a/consensus/types/src/data_column_subnet_id.rs b/consensus/types/src/data_column_subnet_id.rs index b35abf2bbd6..778bfe94124 100644 --- a/consensus/types/src/data_column_subnet_id.rs +++ b/consensus/types/src/data_column_subnet_id.rs @@ -1,4 +1,5 @@ //! Identifies each data column subnet by an integer identifier. +use crate::data_column_sidecar::ColumnIndex; use crate::EthSpec; use ethereum_types::U256; use safe_arith::{ArithError, SafeArith}; @@ -86,7 +87,7 @@ impl DataColumnSubnetId { pub fn compute_custody_columns( node_id: U256, custody_subnet_count: u64, - ) -> impl Iterator { + ) -> impl Iterator { Self::compute_custody_subnets::(node_id, custody_subnet_count) .flat_map(|subnet| subnet.columns::()) }