diff --git a/beacon_node/client/src/address_change_broadcast.rs b/beacon_node/client/src/address_change_broadcast.rs deleted file mode 100644 index 69614159fec..00000000000 --- a/beacon_node/client/src/address_change_broadcast.rs +++ /dev/null @@ -1,322 +0,0 @@ -use crate::*; -use lighthouse_network::PubsubMessage; -use network::NetworkMessage; -use slog::{debug, info, warn, Logger}; -use slot_clock::SlotClock; -use std::cmp; -use std::collections::HashSet; -use std::mem; -use std::time::Duration; -use tokio::sync::mpsc::UnboundedSender; -use tokio::time::sleep; -use types::EthSpec; - -/// The size of each chunk of addresses changes to be broadcast at the Capella -/// fork. -const BROADCAST_CHUNK_SIZE: usize = 128; -/// The delay between broadcasting each chunk. -const BROADCAST_CHUNK_DELAY: Duration = Duration::from_millis(500); - -/// If the Capella fork has already been reached, `broadcast_address_changes` is -/// called immediately. -/// -/// If the Capella fork has not been reached, waits until the start of the fork -/// epoch and then calls `broadcast_address_changes`. -pub async fn broadcast_address_changes_at_capella( - chain: &BeaconChain, - network_send: UnboundedSender>, - log: &Logger, -) { - let spec = &chain.spec; - let slot_clock = &chain.slot_clock; - - let capella_fork_slot = if let Some(epoch) = spec.capella_fork_epoch { - epoch.start_slot(T::EthSpec::slots_per_epoch()) - } else { - // Exit now if Capella is not defined. - return; - }; - - // Wait until the Capella fork epoch. - while chain.slot().map_or(true, |slot| slot < capella_fork_slot) { - match slot_clock.duration_to_slot(capella_fork_slot) { - Some(duration) => { - // Sleep until the Capella fork. - sleep(duration).await; - break; - } - None => { - // We were unable to read the slot clock wait another slot - // and then try again. - sleep(slot_clock.slot_duration()).await; - } - } - } - - // The following function will be called in two scenarios: - // - // 1. The node has been running for some time and the Capella fork has just - // been reached. - // 2. The node has just started and it is *after* the Capella fork. - broadcast_address_changes(chain, network_send, log).await -} - -/// Broadcasts any address changes that are flagged for broadcasting at the -/// Capella fork epoch. -/// -/// Address changes are published in chunks, with a delay between each chunk. -/// This helps reduce the load on the P2P network and also helps prevent us from -/// clogging our `network_send` channel and being late to publish -/// blocks, attestations, etc. -pub async fn broadcast_address_changes( - chain: &BeaconChain, - network_send: UnboundedSender>, - log: &Logger, -) { - let head = chain.head_snapshot(); - let mut changes = chain - .op_pool - .get_bls_to_execution_changes_received_pre_capella(&head.beacon_state, &chain.spec); - - while !changes.is_empty() { - // This `split_off` approach is to allow us to have owned chunks of the - // `changes` vec. The `std::slice::Chunks` method uses references and - // the `itertools` iterator that achives this isn't `Send` so it doesn't - // work well with the `sleep` at the end of the loop. - let tail = changes.split_off(cmp::min(BROADCAST_CHUNK_SIZE, changes.len())); - let chunk = mem::replace(&mut changes, tail); - - let mut published_indices = HashSet::with_capacity(BROADCAST_CHUNK_SIZE); - let mut num_ok = 0; - let mut num_err = 0; - - // Publish each individual address change. - for address_change in chunk { - let validator_index = address_change.message.validator_index; - - let pubsub_message = PubsubMessage::BlsToExecutionChange(Box::new(address_change)); - let message = NetworkMessage::Publish { - messages: vec![pubsub_message], - }; - // It seems highly unlikely that this unbounded send will fail, but - // we handle the result nonetheless. - if let Err(e) = network_send.send(message) { - debug!( - log, - "Failed to publish change message"; - "error" => ?e, - "validator_index" => validator_index - ); - num_err += 1; - } else { - debug!( - log, - "Published address change message"; - "validator_index" => validator_index - ); - num_ok += 1; - published_indices.insert(validator_index); - } - } - - // Remove any published indices from the list of indices that need to be - // published. - chain - .op_pool - .register_indices_broadcasted_at_capella(&published_indices); - - info!( - log, - "Published address change messages"; - "num_published" => num_ok, - ); - - if num_err > 0 { - warn!( - log, - "Failed to publish address changes"; - "info" => "failed messages will be retried", - "num_unable_to_publish" => num_err, - ); - } - - sleep(BROADCAST_CHUNK_DELAY).await; - } - - debug!( - log, - "Address change routine complete"; - ); -} - -#[cfg(not(debug_assertions))] // Tests run too slow in debug. -#[cfg(test)] -mod tests { - use super::*; - use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; - use operation_pool::ReceivedPreCapella; - use state_processing::{SigVerifiedOp, VerifyOperation}; - use std::collections::HashSet; - use tokio::sync::mpsc; - use types::*; - - type E = MainnetEthSpec; - - pub const VALIDATOR_COUNT: usize = BROADCAST_CHUNK_SIZE * 3; - pub const EXECUTION_ADDRESS: Address = Address::repeat_byte(42); - - struct Tester { - harness: BeaconChainHarness>, - /// Changes which should be broadcast at the Capella fork. - received_pre_capella_changes: Vec>, - /// Changes which should *not* be broadcast at the Capella fork. - not_received_pre_capella_changes: Vec>, - } - - impl Tester { - fn new() -> Self { - let altair_fork_epoch = Epoch::new(0); - let bellatrix_fork_epoch = Epoch::new(0); - let capella_fork_epoch = Epoch::new(2); - - let mut spec = E::default_spec(); - spec.altair_fork_epoch = Some(altair_fork_epoch); - spec.bellatrix_fork_epoch = Some(bellatrix_fork_epoch); - spec.capella_fork_epoch = Some(capella_fork_epoch); - - let harness = BeaconChainHarness::builder(E::default()) - .spec(spec) - .logger(logging::test_logger()) - .deterministic_keypairs(VALIDATOR_COUNT) - .deterministic_withdrawal_keypairs(VALIDATOR_COUNT) - .fresh_ephemeral_store() - .mock_execution_layer() - .build(); - - Self { - harness, - received_pre_capella_changes: <_>::default(), - not_received_pre_capella_changes: <_>::default(), - } - } - - fn produce_verified_address_change( - &self, - validator_index: u64, - ) -> SigVerifiedOp { - let change = self - .harness - .make_bls_to_execution_change(validator_index, EXECUTION_ADDRESS); - let head = self.harness.chain.head_snapshot(); - - change - .validate(&head.beacon_state, &self.harness.spec) - .unwrap() - } - - fn produce_received_pre_capella_changes(mut self, indices: Vec) -> Self { - for validator_index in indices { - self.received_pre_capella_changes - .push(self.produce_verified_address_change(validator_index)); - } - self - } - - fn produce_not_received_pre_capella_changes(mut self, indices: Vec) -> Self { - for validator_index in indices { - self.not_received_pre_capella_changes - .push(self.produce_verified_address_change(validator_index)); - } - self - } - - async fn run(self) { - let harness = self.harness; - let chain = harness.chain.clone(); - - let mut broadcast_indices = HashSet::new(); - for change in self.received_pre_capella_changes { - broadcast_indices.insert(change.as_inner().message.validator_index); - chain - .op_pool - .insert_bls_to_execution_change(change, ReceivedPreCapella::Yes); - } - - let mut non_broadcast_indices = HashSet::new(); - for change in self.not_received_pre_capella_changes { - non_broadcast_indices.insert(change.as_inner().message.validator_index); - chain - .op_pool - .insert_bls_to_execution_change(change, ReceivedPreCapella::No); - } - - harness.set_current_slot( - chain - .spec - .capella_fork_epoch - .unwrap() - .start_slot(E::slots_per_epoch()), - ); - - let (sender, mut receiver) = mpsc::unbounded_channel(); - - broadcast_address_changes_at_capella(&chain, sender, &logging::test_logger()).await; - - let mut broadcasted_changes = vec![]; - while let Some(NetworkMessage::Publish { mut messages }) = receiver.recv().await { - match messages.pop().unwrap() { - PubsubMessage::BlsToExecutionChange(change) => broadcasted_changes.push(change), - _ => panic!("unexpected message"), - } - } - - assert_eq!( - broadcasted_changes.len(), - broadcast_indices.len(), - "all expected changes should have been broadcast" - ); - - for broadcasted in &broadcasted_changes { - assert!( - !non_broadcast_indices.contains(&broadcasted.message.validator_index), - "messages not flagged for broadcast should not have been broadcast" - ); - } - - let head = chain.head_snapshot(); - assert!( - chain - .op_pool - .get_bls_to_execution_changes_received_pre_capella( - &head.beacon_state, - &chain.spec, - ) - .is_empty(), - "there shouldn't be any capella broadcast changes left in the op pool" - ); - } - } - - // Useful for generating even-numbered indices. Required since only even - // numbered genesis validators have BLS credentials. - fn even_indices(start: u64, count: usize) -> Vec { - (start..).filter(|i| i % 2 == 0).take(count).collect() - } - - #[tokio::test] - async fn one_chunk() { - Tester::new() - .produce_received_pre_capella_changes(even_indices(0, 4)) - .produce_not_received_pre_capella_changes(even_indices(10, 4)) - .run() - .await; - } - - #[tokio::test] - async fn multiple_chunks() { - Tester::new() - .produce_received_pre_capella_changes(even_indices(0, BROADCAST_CHUNK_SIZE * 3 / 2)) - .run() - .await; - } -} diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 558e5cbc84f..243dd132408 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -1,4 +1,3 @@ -use crate::address_change_broadcast::broadcast_address_changes_at_capella; use crate::compute_light_client_updates::{ compute_light_client_updates, LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY, }; @@ -920,25 +919,6 @@ where beacon_chain.slot_clock.clone(), ); } - - // Spawn a service to publish BLS to execution changes at the Capella fork. - if let Some(network_senders) = self.network_senders.clone() { - let inner_chain = beacon_chain.clone(); - let broadcast_context = - runtime_context.service_context("addr_bcast".to_string()); - let log = broadcast_context.log().clone(); - broadcast_context.executor.spawn( - async move { - broadcast_address_changes_at_capella( - &inner_chain, - network_senders.network_send(), - &log, - ) - .await - }, - "addr_broadcast", - ); - } } // Spawn service to publish light_client updates at some interval into the slot. diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 2f14d87efc0..fd92c282554 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -1,6 +1,5 @@ extern crate slog; -mod address_change_broadcast; mod compute_light_client_updates; pub mod config; mod metrics;