diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index c86e35980ba..042d14a4fa4 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -104,6 +104,7 @@ pub struct BeaconChainBuilder { kzg: Option>, task_executor: Option, validator_monitor_config: Option, + import_all_data_columns: bool, } impl @@ -145,6 +146,7 @@ where kzg: None, task_executor: None, validator_monitor_config: None, + import_all_data_columns: false, } } @@ -615,6 +617,12 @@ where self } + /// Sets whether to require and import all data columns when importing block. + pub fn import_all_data_columns(mut self, import_all_data_columns: bool) -> Self { + self.import_all_data_columns = import_all_data_columns; + self + } + /// Sets the `BeaconChain` event handler backend. /// /// For example, provide `ServerSentEventHandler` as a `handler`. @@ -965,8 +973,15 @@ where validator_monitor: RwLock::new(validator_monitor), genesis_backfill_slot, data_availability_checker: Arc::new( - DataAvailabilityChecker::new(slot_clock, self.kzg.clone(), store, &log, self.spec) - .map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?, + DataAvailabilityChecker::new( + slot_clock, + self.kzg.clone(), + store, + self.import_all_data_columns, + &log, + self.spec, + ) + .map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?, ), kzg: self.kzg.clone(), }; diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index ce5995a5581..b4336a054e2 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -95,11 +95,16 @@ impl DataAvailabilityChecker { slot_clock: T::SlotClock, kzg: Option>, store: BeaconStore, + import_all_data_columns: bool, log: &Logger, spec: ChainSpec, ) -> Result { - // TODO(das): support supernode or custom custody requirement - let custody_subnet_count = spec.custody_requirement as usize; + let custody_subnet_count = if import_all_data_columns { + spec.data_column_sidecar_subnet_count as usize + } else { + spec.custody_requirement as usize + }; + let custody_column_count = custody_subnet_count.saturating_mul(spec.data_columns_per_subnet()); @@ -112,8 +117,8 @@ impl DataAvailabilityChecker { Ok(Self { availability_cache: Arc::new(overflow_cache), slot_clock, - log: log.clone(), kzg, + log: log.clone(), spec, }) } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 393ce35f000..6695f3c4bc1 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -207,6 +207,7 @@ where .beacon_graffiti(beacon_graffiti) .event_handler(event_handler) .execution_layer(execution_layer) + .import_all_data_columns(config.network.subscribe_all_data_column_subnets) .validator_monitor_config(config.validator_monitor.clone()); let builder = if let Some(slasher) = self.slasher.clone() { diff --git a/beacon_node/lighthouse_network/src/config.rs b/beacon_node/lighthouse_network/src/config.rs index 91c5b62d0b2..7c95977140e 100644 --- a/beacon_node/lighthouse_network/src/config.rs +++ b/beacon_node/lighthouse_network/src/config.rs @@ -42,7 +42,7 @@ pub struct Config { pub network_dir: PathBuf, /// IP addresses to listen on. - listen_addresses: ListenAddress, + pub(crate) listen_addresses: ListenAddress, /// The address to broadcast to peers about which address we are listening on. None indicates /// that no discovery address has been set in the CLI args. @@ -100,6 +100,9 @@ pub struct Config { /// Attempt to construct external port mappings with UPnP. pub upnp_enabled: bool, + /// Subscribe to all data column subnets for the duration of the runtime. + pub subscribe_all_data_column_subnets: bool, + /// Subscribe to all subnets for the duration of the runtime. pub subscribe_all_subnets: bool, @@ -338,6 +341,7 @@ impl Default for Config { upnp_enabled: true, network_load: 4, private: false, + subscribe_all_data_column_subnets: false, subscribe_all_subnets: false, import_all_attestations: false, shutdown_after_sync: false, diff --git a/beacon_node/lighthouse_network/src/discovery/enr.rs b/beacon_node/lighthouse_network/src/discovery/enr.rs index 51e50808e1d..04ae9971502 100644 --- a/beacon_node/lighthouse_network/src/discovery/enr.rs +++ b/beacon_node/lighthouse_network/src/discovery/enr.rs @@ -14,7 +14,7 @@ use std::fs::File; use std::io::prelude::*; use std::path::Path; use std::str::FromStr; -use types::{EnrForkId, EthSpec}; +use types::{ChainSpec, EnrForkId, EthSpec}; use super::enr_ext::{EnrExt, QUIC6_ENR_KEY, QUIC_ENR_KEY}; @@ -24,6 +24,8 @@ pub const ETH2_ENR_KEY: &str = "eth2"; pub const ATTESTATION_BITFIELD_ENR_KEY: &str = "attnets"; /// The ENR field specifying the sync committee subnet bitfield. pub const SYNC_COMMITTEE_BITFIELD_ENR_KEY: &str = "syncnets"; +/// The ENR field specifying the peerdas custody subnet count. +pub const PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY: &str = "csc"; /// Extension trait for ENR's within Eth2. pub trait Eth2Enr { @@ -35,6 +37,9 @@ pub trait Eth2Enr { &self, ) -> Result, &'static str>; + /// The peerdas custody subnet count associated with the ENR. + fn custody_subnet_count(&self, spec: &ChainSpec) -> u64; + fn eth2(&self) -> Result; } @@ -59,6 +64,16 @@ impl Eth2Enr for Enr { .map_err(|_| "Could not decode the ENR syncnets bitfield") } + /// if the custody value is non-existent in the ENR, then we assume the minimum custody value + /// defined in the spec. + fn custody_subnet_count(&self, spec: &ChainSpec) -> u64 { + self.get_decodable::(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) + .and_then(|r| r.ok()) + // If value supplied in ENR is invalid, fallback to `custody_requirement` + .filter(|csc| csc <= &spec.data_column_sidecar_subnet_count) + .unwrap_or(spec.custody_requirement) + } + fn eth2(&self) -> Result { let eth2_bytes = self.get(ETH2_ENR_KEY).ok_or("ENR has no eth2 field")?; @@ -126,12 +141,13 @@ pub fn build_or_load_enr( config: &NetworkConfig, enr_fork_id: &EnrForkId, log: &slog::Logger, + spec: &ChainSpec, ) -> Result { // Build the local ENR. // Note: Discovery should update the ENR record's IP to the external IP as seen by the // majority of our peers, if the CLI doesn't expressly forbid it. let enr_key = CombinedKey::from_libp2p(local_key)?; - let mut local_enr = build_enr::(&enr_key, config, enr_fork_id)?; + let mut local_enr = build_enr::(&enr_key, config, enr_fork_id, spec)?; use_or_load_enr(&enr_key, &mut local_enr, config, log)?; Ok(local_enr) @@ -142,6 +158,7 @@ pub fn build_enr( enr_key: &CombinedKey, config: &NetworkConfig, enr_fork_id: &EnrForkId, + spec: &ChainSpec, ) -> Result { let mut builder = discv5::enr::Enr::builder(); let (maybe_ipv4_address, maybe_ipv6_address) = &config.enr_address; @@ -221,6 +238,16 @@ pub fn build_enr( builder.add_value(SYNC_COMMITTEE_BITFIELD_ENR_KEY, &bitfield.as_ssz_bytes()); + // only set `csc` if PeerDAS fork epoch has been scheduled + if spec.is_peer_das_scheduled() { + let custody_subnet_count = if config.subscribe_all_data_column_subnets { + spec.data_column_sidecar_subnet_count + } else { + spec.custody_requirement + }; + builder.add_value(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, &custody_subnet_count); + } + builder .build(enr_key) .map_err(|e| format!("Could not build Local ENR: {:?}", e)) @@ -244,10 +271,12 @@ fn compare_enr(local_enr: &Enr, disk_enr: &Enr) -> bool { // take preference over disk udp port if one is not specified && (local_enr.udp4().is_none() || local_enr.udp4() == disk_enr.udp4()) && (local_enr.udp6().is_none() || local_enr.udp6() == disk_enr.udp6()) - // we need the ATTESTATION_BITFIELD_ENR_KEY and SYNC_COMMITTEE_BITFIELD_ENR_KEY key to match, - // otherwise we use a new ENR. This will likely only be true for non-validating nodes + // we need the ATTESTATION_BITFIELD_ENR_KEY and SYNC_COMMITTEE_BITFIELD_ENR_KEY and + // PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY key to match, otherwise we use a new ENR. This will + // likely only be true for non-validating nodes. && local_enr.get(ATTESTATION_BITFIELD_ENR_KEY) == disk_enr.get(ATTESTATION_BITFIELD_ENR_KEY) && local_enr.get(SYNC_COMMITTEE_BITFIELD_ENR_KEY) == disk_enr.get(SYNC_COMMITTEE_BITFIELD_ENR_KEY) + && local_enr.get(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) == disk_enr.get(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) } /// Loads enr from the given directory @@ -280,3 +309,77 @@ pub fn save_enr_to_disk(dir: &Path, enr: &Enr, log: &slog::Logger) { } } } + +#[cfg(test)] +mod test { + use super::*; + use crate::config::Config as NetworkConfig; + use types::{Epoch, MainnetEthSpec}; + + type E = MainnetEthSpec; + + fn make_eip7594_spec() -> ChainSpec { + let mut spec = E::default_spec(); + spec.eip7594_fork_epoch = Some(Epoch::new(10)); + spec + } + + #[test] + fn custody_subnet_count_default() { + let config = NetworkConfig { + subscribe_all_data_column_subnets: false, + ..NetworkConfig::default() + }; + let spec = make_eip7594_spec(); + + let enr = build_enr_with_config(config, &spec).0; + + assert_eq!( + enr.custody_subnet_count::(&spec), + spec.custody_requirement, + ); + } + + #[test] + fn custody_subnet_count_all() { + let config = NetworkConfig { + subscribe_all_data_column_subnets: true, + ..NetworkConfig::default() + }; + let spec = make_eip7594_spec(); + let enr = build_enr_with_config(config, &spec).0; + + assert_eq!( + enr.custody_subnet_count::(&spec), + spec.data_column_sidecar_subnet_count, + ); + } + + #[test] + fn custody_subnet_count_fallback_default() { + let config = NetworkConfig::default(); + let spec = make_eip7594_spec(); + let (mut enr, enr_key) = build_enr_with_config(config, &spec); + let invalid_subnet_count = 99u64; + + enr.insert( + PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, + &invalid_subnet_count, + &enr_key, + ) + .unwrap(); + + assert_eq!( + enr.custody_subnet_count::(&spec), + spec.custody_requirement, + ); + } + + fn build_enr_with_config(config: NetworkConfig, spec: &ChainSpec) -> (Enr, CombinedKey) { + let keypair = libp2p::identity::secp256k1::Keypair::generate(); + let enr_key = CombinedKey::from_secp256k1(&keypair); + let enr_fork_id = EnrForkId::default(); + let enr = build_enr::(&enr_key, &config, &enr_fork_id, spec).unwrap(); + (enr, enr_key) + } +} diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 865d707495f..300c190cdaf 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -1220,7 +1220,7 @@ mod tests { let mut config = NetworkConfig::default(); config.set_listening_addr(crate::ListenAddress::unused_v4_ports()); let enr_key: CombinedKey = CombinedKey::from_secp256k1(&keypair); - let enr: Enr = build_enr::(&enr_key, &config, &EnrForkId::default()).unwrap(); + let enr: Enr = build_enr::(&enr_key, &config, &EnrForkId::default(), &spec).unwrap(); let log = build_log(slog::Level::Debug, false); let globals = NetworkGlobals::new( enr, diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index c2a2a03fe87..fe649f4199a 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -162,6 +162,7 @@ impl Network { &config, &ctx.enr_fork_id, &log, + ctx.chain_spec, )?; // Construct the metadata let meta_data = utils::load_or_build_metadata(&config.network_dir, &log); diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index e522285a9e3..db5fc7636ec 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -16,6 +16,7 @@ use futures::prelude::*; use futures::StreamExt; use lighthouse_network::service::Network; use lighthouse_network::types::GossipKind; +use lighthouse_network::Eth2Enr; use lighthouse_network::{prometheus_client::registry::Registry, MessageAcceptance}; use lighthouse_network::{ rpc::{GoodbyeReason, RPCResponseErrorCode}, @@ -35,8 +36,8 @@ use task_executor::ShutdownReason; use tokio::sync::mpsc; use tokio::time::Sleep; use types::{ - ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, - Unsigned, ValidatorSubscription, + ChainSpec, DataColumnSubnetId, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, + SyncSubnetId, Unsigned, ValidatorSubscription, }; mod tests; @@ -183,6 +184,8 @@ pub struct NetworkService { next_fork_subscriptions: Pin>>, /// A delay that expires when we need to unsubscribe from old fork topics. next_unsubscribe: Pin>>, + /// Subscribe to all the data column subnets. + subscribe_all_data_column_subnets: bool, /// Subscribe to all the subnets once synced. subscribe_all_subnets: bool, /// Shutdown beacon node after sync is complete. @@ -349,6 +352,7 @@ impl NetworkService { next_fork_update, next_fork_subscriptions, next_unsubscribe, + subscribe_all_data_column_subnets: config.subscribe_all_data_column_subnets, subscribe_all_subnets: config.subscribe_all_subnets, shutdown_after_sync: config.shutdown_after_sync, metrics_enabled: config.metrics_enabled, @@ -733,6 +737,15 @@ impl NetworkService { } } + // TODO(das): This is added here for the purpose of testing, *without* having to + // activate Electra. This should happen as part of the Electra upgrade and we should + // move the subscription logic once it's ready to rebase PeerDAS on Electra, or if + // we decide to activate via the soft fork route: + // https://github.com/sigp/lighthouse/pull/5899 + if self.fork_context.spec.is_peer_das_scheduled() { + self.subscribe_to_peer_das_topics(&mut subscribed_topics); + } + // If we are to subscribe to all subnets we do it here if self.subscribe_all_subnets { for subnet_id in 0..<::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() { @@ -779,6 +792,45 @@ impl NetworkService { } } + fn subscribe_to_peer_das_topics(&mut self, subscribed_topics: &mut Vec) { + if self.subscribe_all_data_column_subnets { + for column_subnet in 0..self.fork_context.spec.data_column_sidecar_subnet_count { + for fork_digest in self.required_gossip_fork_digests() { + let gossip_kind = + Subnet::DataColumn(DataColumnSubnetId::new(column_subnet)).into(); + let topic = + GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest); + if self.libp2p.subscribe(topic.clone()) { + subscribed_topics.push(topic); + } else { + warn!(self.log, "Could not subscribe to topic"; "topic" => %topic); + } + } + } + } else { + for column_subnet in DataColumnSubnetId::compute_custody_subnets::( + self.network_globals.local_enr().node_id().raw().into(), + self.network_globals + .local_enr() + .custody_subnet_count::<::EthSpec>( + &self.fork_context.spec, + ), + &self.fork_context.spec, + ) { + for fork_digest in self.required_gossip_fork_digests() { + let gossip_kind = Subnet::DataColumn(column_subnet).into(); + let topic = + GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest); + if self.libp2p.subscribe(topic.clone()) { + subscribed_topics.push(topic); + } else { + warn!(self.log, "Could not subscribe to topic"; "topic" => %topic); + } + } + } + } + } + /// Handle a message sent to the network service. async fn on_validator_subscription_msg(&mut self, msg: ValidatorSubscriptionMessage) { match msg { diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 2e1b1c093c8..3f991d4db28 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -54,6 +54,18 @@ pub fn cli_app() -> Command { /* * Network parameters. */ + .arg( + Arg::new("subscribe-all-data-column-subnets") + .long("subscribe-all-data-column-subnets") + .action(ArgAction::SetTrue) + .help_heading(FLAG_HEADER) + .help("Subscribe to all data column subnets and participate in data custody for \ + all columns. This will also advertise the beacon node as being long-lived \ + subscribed to all data column subnets. \ + NOTE: this is an experimental flag and may change any time without notice!") + .display_order(0) + .hide(true) + ) .arg( Arg::new("subscribe-all-subnets") .long("subscribe-all-subnets") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index b4fa38da7d7..24bef73f7c2 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -1130,6 +1130,10 @@ pub fn set_network_config( config.network_dir = data_dir.join(DEFAULT_NETWORK_DIR); }; + if parse_flag(cli_args, "subscribe-all-data-column-subnets") { + config.subscribe_all_data_column_subnets = true; + } + if parse_flag(cli_args, "subscribe-all-subnets") { config.subscribe_all_subnets = true; } diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index ca4df32d1e5..ed929061ffb 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -426,6 +426,13 @@ impl ChainSpec { }) } + /// Returns true if `EIP7594_FORK_EPOCH` is set and is not set to `FAR_FUTURE_EPOCH`. + pub fn is_peer_das_scheduled(&self) -> bool { + self.eip7594_fork_epoch.map_or(false, |eip7594_fork_epoch| { + eip7594_fork_epoch != self.far_future_epoch + }) + } + /// Returns a full `Fork` struct for a given epoch. pub fn fork_at_epoch(&self, epoch: Epoch) -> Fork { let current_fork_name = self.fork_name_at_epoch(epoch); diff --git a/lcli/src/generate_bootnode_enr.rs b/lcli/src/generate_bootnode_enr.rs index 52960b929d8..26e17ba73ee 100644 --- a/lcli/src/generate_bootnode_enr.rs +++ b/lcli/src/generate_bootnode_enr.rs @@ -10,7 +10,7 @@ use std::{fs, net::Ipv4Addr}; use std::{fs::File, num::NonZeroU16}; use types::{ChainSpec, EnrForkId, Epoch, EthSpec, Hash256}; -pub fn run(matches: &ArgMatches) -> Result<(), String> { +pub fn run(matches: &ArgMatches, spec: &ChainSpec) -> Result<(), String> { let ip: Ipv4Addr = clap_utils::parse_required(matches, "ip")?; let udp_port: NonZeroU16 = clap_utils::parse_required(matches, "udp-port")?; let tcp_port: NonZeroU16 = clap_utils::parse_required(matches, "tcp-port")?; @@ -37,7 +37,7 @@ pub fn run(matches: &ArgMatches) -> Result<(), String> { next_fork_version: genesis_fork_version, next_fork_epoch: Epoch::max_value(), // FAR_FUTURE_EPOCH }; - let enr = build_enr::(&enr_key, &config, &enr_fork_id) + let enr = build_enr::(&enr_key, &config, &enr_fork_id, spec) .map_err(|e| format!("Unable to create ENR: {:?}", e))?; fs::create_dir_all(&output_dir).map_err(|e| format!("Unable to create output-dir: {:?}", e))?; diff --git a/lcli/src/main.rs b/lcli/src/main.rs index 380aeb6aceb..f055a23b362 100644 --- a/lcli/src/main.rs +++ b/lcli/src/main.rs @@ -707,8 +707,10 @@ fn run(env_builder: EnvironmentBuilder, matches: &ArgMatches) -> } Some(("check-deposit-data", matches)) => check_deposit_data::run(matches) .map_err(|e| format!("Failed to run check-deposit-data command: {}", e)), - Some(("generate-bootnode-enr", matches)) => generate_bootnode_enr::run::(matches) - .map_err(|e| format!("Failed to run generate-bootnode-enr command: {}", e)), + Some(("generate-bootnode-enr", matches)) => { + generate_bootnode_enr::run::(matches, &env.eth2_config.spec) + .map_err(|e| format!("Failed to run generate-bootnode-enr command: {}", e)) + } Some(("mnemonic-validators", matches)) => mnemonic_validators::run(matches) .map_err(|e| format!("Failed to run mnemonic-validators command: {}", e)), Some(("indexed-attestations", matches)) => indexed_attestations::run::(matches)