diff --git a/Cargo.lock b/Cargo.lock index df005da6961..92911f18e9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -835,7 +835,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "5.2.1" +version = "5.3.0" dependencies = [ "beacon_chain", "clap", @@ -1057,7 +1057,7 @@ dependencies = [ [[package]] name = "boot_node" -version = "5.2.1" +version = "5.3.0" dependencies = [ "beacon_node", "clap", @@ -4438,7 +4438,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lcli" -version = "5.2.1" +version = "5.3.0" dependencies = [ "account_utils", "beacon_chain", @@ -5009,7 +5009,7 @@ dependencies = [ [[package]] name = "lighthouse" -version = "5.2.1" +version = "5.3.0" dependencies = [ "account_manager", "account_utils", @@ -6576,8 +6576,7 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quick-protobuf" version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d6da84cc204722a989e01ba2f6e1e276e190f22263d0cb6ce8526fcdb0d2e1f" +source = "git+https://github.com/sigp/quick-protobuf.git?rev=681f413312404ab6e51f0b46f39b0075c6f4ebfd#681f413312404ab6e51f0b46f39b0075c6f4ebfd" dependencies = [ "byteorder", ] diff --git a/Cargo.toml b/Cargo.toml index 901fff2ea60..608b798f5c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,7 +107,7 @@ bytes = "1" clap = { version = "4.5.4", features = ["derive", "cargo", "wrap_help"] } # Turn off c-kzg's default features which include `blst/portable`. We can turn on blst's portable # feature ourselves when desired. -c-kzg = { version = "1", default-features = false } +c-kzg = { version = "1", default-features = false } compare_fields_derive = { path = "common/compare_fields_derive" } criterion = "0.5" delay_map = "0.3" @@ -240,6 +240,9 @@ validator_client = { path = "validator_client" } validator_dir = { path = "common/validator_dir" } warp_utils = { path = "common/warp_utils" } +[patch.crates-io] +quick-protobuf = { git = "https://github.com/sigp/quick-protobuf.git", rev = "681f413312404ab6e51f0b46f39b0075c6f4ebfd" } + [profile.maxperf] inherits = "release" lto = "fat" diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index a5fd29c971f..146f1c1018e 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beacon_node" -version = "5.2.1" +version = "5.3.0" authors = [ "Paul Hauner ", "Age Manning ( &metrics::FORK_CHOICE_REORG_DISTANCE, reorg_distance.as_u64() as i64, ); - warn!( + info!( log, "Beacon chain re-org"; "previous_head" => ?old_block_root, diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 9cc1da13826..e5d60fac49c 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -1129,7 +1129,7 @@ impl Service { Ok(BlockCacheUpdateOutcome { blocks_imported, - head_block_number: self.inner.block_cache.read().highest_block_number(), + head_block_number: block_cache.highest_block_number(), }) } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 556b4194dd8..d92dcd4851c 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -111,9 +111,6 @@ pub struct SyncingChain { /// The current processing batch, if any. current_processing_batch: Option, - /// Batches validated by this chain. - validated_batches: u64, - /// The chain's log. log: slog::Logger, } @@ -161,7 +158,6 @@ impl SyncingChain { attempted_optimistic_starts: HashSet::default(), state: ChainSyncingState::Stopped, current_processing_batch: None, - validated_batches: 0, log: log.new(o!("chain" => id)), } } @@ -182,8 +178,10 @@ impl SyncingChain { } /// Progress in epochs made by the chain - pub fn validated_epochs(&self) -> u64 { - self.validated_batches * EPOCHS_PER_BATCH + pub fn processed_epochs(&self) -> u64 { + self.processing_target + .saturating_sub(self.start_epoch) + .into() } /// Returns the total count of pending blocks in all the batches of this chain @@ -655,7 +653,6 @@ impl SyncingChain { let removed_batches = std::mem::replace(&mut self.batches, remaining_batches); for (id, batch) in removed_batches.into_iter() { - self.validated_batches = self.validated_batches.saturating_add(1); // only for batches awaiting validation can we be sure the last attempt is // right, and thus, that any different attempt is wrong match batch.state() { @@ -1212,7 +1209,6 @@ impl slog::KV for SyncingChain { )?; serializer.emit_usize("batches", self.batches.len())?; serializer.emit_usize("peers", self.peers.len())?; - serializer.emit_u64("validated_batches", self.validated_batches)?; serializer.emit_arguments("state", &format_args!("{:?}", self.state))?; slog::Result::Ok(()) } diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 3621a6605af..1217fbf8fed 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -24,7 +24,7 @@ use types::{Epoch, Hash256, Slot}; const PARALLEL_HEAD_CHAINS: usize = 2; /// Minimum work we require a finalized chain to do before picking a chain with more peers. -const MIN_FINALIZED_CHAIN_VALIDATED_EPOCHS: u64 = 10; +const MIN_FINALIZED_CHAIN_PROCESSED_EPOCHS: u64 = 10; /// The state of the long range/batch sync. #[derive(Clone)] @@ -273,8 +273,8 @@ impl ChainCollection { // chains are different, check that they don't have the same number of peers if let Some(syncing_chain) = self.finalized_chains.get_mut(&syncing_id) { if max_peers > syncing_chain.available_peers() - && syncing_chain.validated_epochs() - > MIN_FINALIZED_CHAIN_VALIDATED_EPOCHS + && syncing_chain.processed_epochs() + > MIN_FINALIZED_CHAIN_PROCESSED_EPOCHS { syncing_chain.stop_syncing(); old_id = Some(Some(syncing_id)); diff --git a/book/src/late-block-re-orgs.md b/book/src/late-block-re-orgs.md index fc4530589d9..4a00f33aa44 100644 --- a/book/src/late-block-re-orgs.md +++ b/book/src/late-block-re-orgs.md @@ -50,10 +50,10 @@ A pair of messages at `INFO` level will be logged if a re-org opportunity is det > INFO Proposing block to re-org current head head_to_reorg: 0xf64f…2b49, slot: 1105320 -This should be followed shortly after by a `WARN` log indicating that a re-org occurred. This is +This should be followed shortly after by a `INFO` log indicating that a re-org occurred. This is expected and normal: -> WARN Beacon chain re-org reorg_distance: 1, new_slot: 1105320, new_head: 0x72791549e4ca792f91053bc7cf1e55c6fbe745f78ce7a16fc3acb6f09161becd, previous_slot: 1105319, previous_head: 0xf64f8e5ed617dc18c1e759dab5d008369767c3678416dac2fe1d389562842b49 +> INFO Beacon chain re-org reorg_distance: 1, new_slot: 1105320, new_head: 0x72791549e4ca792f91053bc7cf1e55c6fbe745f78ce7a16fc3acb6f09161becd, previous_slot: 1105319, previous_head: 0xf64f8e5ed617dc18c1e759dab5d008369767c3678416dac2fe1d389562842b49 In case a re-org is not viable (which should be most of the time), Lighthouse will just propose a block as normal and log the reason the re-org was not attempted at debug level: diff --git a/boot_node/Cargo.toml b/boot_node/Cargo.toml index 4cde8ea2707..46ccd4566be 100644 --- a/boot_node/Cargo.toml +++ b/boot_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "boot_node" -version = "5.2.1" +version = "5.3.0" authors = ["Sigma Prime "] edition = { workspace = true } diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 48cdf7031a1..f0c25124ddb 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -121,6 +121,7 @@ impl fmt::Display for Error { pub struct Timeouts { pub attestation: Duration, pub attester_duties: Duration, + pub attestation_subscriptions: Duration, pub liveness: Duration, pub proposal: Duration, pub proposer_duties: Duration, @@ -137,6 +138,7 @@ impl Timeouts { Timeouts { attestation: timeout, attester_duties: timeout, + attestation_subscriptions: timeout, liveness: timeout, proposal: timeout, proposer_duties: timeout, @@ -2540,7 +2542,12 @@ impl BeaconNodeHttpClient { .push("validator") .push("beacon_committee_subscriptions"); - self.post(path, &subscriptions).await?; + self.post_with_timeout( + path, + &subscriptions, + self.timeouts.attestation_subscriptions, + ) + .await?; Ok(()) } diff --git a/common/lighthouse_version/src/lib.rs b/common/lighthouse_version/src/lib.rs index d32d7994689..f988dd86b1f 100644 --- a/common/lighthouse_version/src/lib.rs +++ b/common/lighthouse_version/src/lib.rs @@ -17,8 +17,8 @@ pub const VERSION: &str = git_version!( // NOTE: using --match instead of --exclude for compatibility with old Git "--match=thiswillnevermatchlol" ], - prefix = "Lighthouse/v5.2.1-", - fallback = "Lighthouse/v5.2.1" + prefix = "Lighthouse/v5.3.0-", + fallback = "Lighthouse/v5.3.0" ); /// Returns the first eight characters of the latest commit hash for this build. diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index ed929061ffb..2c64d21130f 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -1372,10 +1372,13 @@ pub struct Config { #[serde(with = "serde_utils::quoted_u64")] max_per_epoch_activation_exit_churn_limit: u64, + #[serde(default = "default_custody_requirement")] #[serde(with = "serde_utils::quoted_u64")] custody_requirement: u64, + #[serde(default = "default_data_column_sidecar_subnet_count")] #[serde(with = "serde_utils::quoted_u64")] data_column_sidecar_subnet_count: u64, + #[serde(default = "default_number_of_columns")] #[serde(with = "serde_utils::quoted_u64")] number_of_columns: u64, } @@ -1516,6 +1519,18 @@ const fn default_maximum_gossip_clock_disparity_millis() -> u64 { 500 } +const fn default_custody_requirement() -> u64 { + 1 +} + +const fn default_data_column_sidecar_subnet_count() -> u64 { + 32 +} + +const fn default_number_of_columns() -> u64 { + 128 +} + fn max_blocks_by_root_request_common(max_request_blocks: u64) -> usize { let max_request_blocks = max_request_blocks as usize; RuntimeVariableList::::from_vec( diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index 3cddd8ee60b..30721f3d5ba 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "lcli" description = "Lighthouse CLI (modeled after zcli)" -version = "5.2.1" +version = "5.3.0" authors = ["Paul Hauner "] edition = { workspace = true } diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index b720601e70f..7c37aa6d67d 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lighthouse" -version = "5.2.1" +version = "5.3.0" authors = ["Sigma Prime "] edition = { workspace = true } autotests = false diff --git a/slasher/src/database/lmdb_impl.rs b/slasher/src/database/lmdb_impl.rs index 20d89a36fb0..74342968cfa 100644 --- a/slasher/src/database/lmdb_impl.rs +++ b/slasher/src/database/lmdb_impl.rs @@ -165,8 +165,12 @@ impl<'env> Cursor<'env> { } pub fn get_current(&mut self) -> Result, Value<'env>)>, Error> { + // FIXME: lmdb has an extremely broken API which can mutate the SHARED REFERENCE + // `value` after `get_current` is called. We need to convert it to a Vec here in order + // to avoid `value` changing after another cursor operation. I think this represents a bug + // in the LMDB bindings, as shared references should be immutable. if let Some((Some(key), value)) = self.cursor.get(None, None, MDB_GET_CURRENT).optional()? { - Ok(Some((Cow::Borrowed(key), Cow::Borrowed(value)))) + Ok(Some((Cow::Borrowed(key), Cow::Owned(value.to_vec())))) } else { Ok(None) } diff --git a/slasher/tests/random.rs b/slasher/tests/random.rs index 0aaaa63f65c..0ba2986d44b 100644 --- a/slasher/tests/random.rs +++ b/slasher/tests/random.rs @@ -235,3 +235,8 @@ fn no_crash_blocks_example1() { }, ); } + +#[test] +fn no_crash_aug_24() { + random_test(13519442335106054152, TestConfig::default()) +} diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index 4467b807865..58d7f9d8eef 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -134,6 +134,12 @@ impl fmt::Display for Errors { } } +impl Errors { + pub fn num_errors(&self) -> usize { + self.0.len() + } +} + /// Reasons why a candidate might not be ready. #[derive(Debug, Clone, Copy)] pub enum CandidateError { @@ -599,46 +605,41 @@ impl BeaconNodeFallback { F: Fn(&'a BeaconNodeHttpClient) -> R, R: Future>, { - let mut results = vec![]; let mut to_retry = vec![]; let mut retry_unsynced = vec![]; // Run `func` using a `candidate`, returning the value or capturing errors. - // - // We use a macro instead of a closure here since it is not trivial to move `func` into a - // closure. - macro_rules! try_func { - ($candidate: ident) => {{ - inc_counter_vec(&ENDPOINT_REQUESTS, &[$candidate.beacon_node.as_ref()]); + let run_on_candidate = |candidate: &'a CandidateBeaconNode| async { + inc_counter_vec(&ENDPOINT_REQUESTS, &[candidate.beacon_node.as_ref()]); - // There exists a race condition where `func` may be called when the candidate is - // actually not ready. We deem this an acceptable inefficiency. - match func(&$candidate.beacon_node).await { - Ok(val) => results.push(Ok(val)), - Err(e) => { - // If we have an error on this function, make the client as not-ready. - // - // There exists a race condition where the candidate may have been marked - // as ready between the `func` call and now. We deem this an acceptable - // inefficiency. - if matches!(offline_on_failure, OfflineOnFailure::Yes) { - $candidate.set_offline().await; - } - results.push(Err(( - $candidate.beacon_node.to_string(), - Error::RequestFailed(e), - ))); - inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]); + // There exists a race condition where `func` may be called when the candidate is + // actually not ready. We deem this an acceptable inefficiency. + match func(&candidate.beacon_node).await { + Ok(val) => Ok(val), + Err(e) => { + // If we have an error on this function, mark the client as not-ready. + // + // There exists a race condition where the candidate may have been marked + // as ready between the `func` call and now. We deem this an acceptable + // inefficiency. + if matches!(offline_on_failure, OfflineOnFailure::Yes) { + candidate.set_offline().await; } + inc_counter_vec(&ENDPOINT_ERRORS, &[candidate.beacon_node.as_ref()]); + Err((candidate.beacon_node.to_string(), Error::RequestFailed(e))) } - }}; - } + } + }; // First pass: try `func` on all synced and ready candidates. // // This ensures that we always choose a synced node if it is available. + let mut first_batch_futures = vec![]; for candidate in &self.candidates { match candidate.status(RequireSynced::Yes).await { + Ok(_) => { + first_batch_futures.push(run_on_candidate(candidate)); + } Err(CandidateError::NotSynced) if require_synced == false => { // This client is unsynced we will try it after trying all synced clients retry_unsynced.push(candidate); @@ -647,22 +648,24 @@ impl BeaconNodeFallback { // This client was not ready on the first pass, we might try it again later. to_retry.push(candidate); } - Ok(_) => try_func!(candidate), } } + let first_batch_results = futures::future::join_all(first_batch_futures).await; // Second pass: try `func` on ready unsynced candidates. This only runs if we permit // unsynced candidates. // // Due to async race-conditions, it is possible that we will send a request to a candidate // that has been set to an offline/unready status. This is acceptable. - if require_synced == false { - for candidate in retry_unsynced { - try_func!(candidate); - } - } + let second_batch_results = if require_synced == false { + futures::future::join_all(retry_unsynced.into_iter().map(run_on_candidate)).await + } else { + vec![] + }; // Third pass: try again, attempting to make non-ready clients become ready. + let mut third_batch_futures = vec![]; + let mut third_batch_results = vec![]; for candidate in to_retry { // If the candidate hasn't luckily transferred into the correct state in the meantime, // force an update of the state. @@ -676,16 +679,21 @@ impl BeaconNodeFallback { }; match new_status { - Ok(()) => try_func!(candidate), - Err(CandidateError::NotSynced) if require_synced == false => try_func!(candidate), - Err(e) => { - results.push(Err(( - candidate.beacon_node.to_string(), - Error::Unavailable(e), - ))); + Ok(()) => third_batch_futures.push(run_on_candidate(candidate)), + Err(CandidateError::NotSynced) if require_synced == false => { + third_batch_futures.push(run_on_candidate(candidate)) } + Err(e) => third_batch_results.push(Err(( + candidate.beacon_node.to_string(), + Error::Unavailable(e), + ))), } } + third_batch_results.extend(futures::future::join_all(third_batch_futures).await); + + let mut results = first_batch_results; + results.extend(second_batch_results); + results.extend(third_batch_results); let errors: Vec<_> = results.into_iter().filter_map(|res| res.err()).collect(); diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 880f0eaa488..faa157a8592 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -86,7 +86,8 @@ const _: () = assert!({ /// This number is based upon `MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD` value in the /// `beacon_node::network::attestation_service` crate. It is not imported directly to avoid /// bringing in the entire crate. -const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > 2); +const MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD: u64 = 2; +const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD); // The info in the enum variants is displayed in logging, clippy thinks it's dead code. #[derive(Debug)] @@ -121,6 +122,8 @@ pub struct DutyAndProof { pub struct SubscriptionSlots { /// Pairs of `(slot, already_sent)` in slot-descending order. slots: Vec<(Slot, AtomicBool)>, + /// The slot of the duty itself. + duty_slot: Slot, } /// Create a selection proof for `duty`. @@ -172,18 +175,20 @@ impl SubscriptionSlots { .filter(|scheduled_slot| *scheduled_slot > current_slot) .map(|scheduled_slot| (scheduled_slot, AtomicBool::new(false))) .collect(); - Arc::new(Self { slots }) + Arc::new(Self { slots, duty_slot }) } /// Return `true` if we should send a subscription at `slot`. fn should_send_subscription_at(&self, slot: Slot) -> bool { // Iterate slots from smallest to largest looking for one that hasn't been completed yet. - self.slots - .iter() - .rev() - .any(|(scheduled_slot, already_sent)| { - slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed) - }) + slot + MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD <= self.duty_slot + && self + .slots + .iter() + .rev() + .any(|(scheduled_slot, already_sent)| { + slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed) + }) } /// Update our record of subscribed slots to account for successful subscription at `slot`. @@ -737,7 +742,7 @@ async fn poll_beacon_attesters( // If there are any subscriptions, push them out to beacon nodes if !subscriptions.is_empty() { let subscriptions_ref = &subscriptions; - if let Err(e) = duties_service + let subscription_result = duties_service .beacon_nodes .request( RequireSynced::No, @@ -753,15 +758,8 @@ async fn poll_beacon_attesters( .await }, ) - .await - { - error!( - log, - "Failed to subscribe validators"; - "error" => %e - ) - } else { - // Record that subscriptions were successfully sent. + .await; + if subscription_result.as_ref().is_ok() { debug!( log, "Broadcast attestation subscriptions"; @@ -770,6 +768,25 @@ async fn poll_beacon_attesters( for subscription_slots in subscription_slots_to_confirm { subscription_slots.record_successful_subscription_at(current_slot); } + } else if let Err(e) = subscription_result { + if e.num_errors() < duties_service.beacon_nodes.num_total() { + warn!( + log, + "Some subscriptions failed"; + "error" => %e, + ); + // If subscriptions were sent to at least one node, regard that as a success. + // There is some redundancy built into the subscription schedule to handle failures. + for subscription_slots in subscription_slots_to_confirm { + subscription_slots.record_successful_subscription_at(current_slot); + } + } else { + error!( + log, + "All subscriptions failed"; + "error" => %e + ); + } } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 729ff62ee30..dff50582dfe 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -75,6 +75,7 @@ const WAITING_FOR_GENESIS_POLL_TIME: Duration = Duration::from_secs(12); /// This can help ensure that proper endpoint fallback occurs. const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; +const HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT: u32 = 24; const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2; const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; @@ -323,6 +324,8 @@ impl ProductionValidatorClient { Timeouts { attestation: slot_duration / HTTP_ATTESTATION_TIMEOUT_QUOTIENT, attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT, + attestation_subscriptions: slot_duration + / HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT, liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT, proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT, proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT,