From bb6675e42d6dedad1a503c1647c16c4eb7f38cef Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 13 Oct 2023 16:45:56 +1100 Subject: [PATCH] Clean up progressive balance slashings further (#4834) * Clean up progressive balance slashings further * Fix Rayon deadlock in test utils * Fix cargo-fmt --- beacon_node/beacon_chain/src/test_utils.rs | 23 +++++++++++-------- .../update_progressive_balances_cache.rs | 3 ++- .../progressive_balances_cache.rs | 21 ++++++++++++----- 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index b1bfdd90509..d99329c7031 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -49,6 +49,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt; use std::marker::PhantomData; use std::str::FromStr; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use store::{config::StoreConfig, HotColdDB, ItemStore, LevelDB, MemoryStore}; @@ -1149,9 +1150,9 @@ where ) -> (Vec>, Vec) { let MakeAttestationOptions { limit, fork } = opts; let committee_count = state.get_committee_count_at_slot(state.slot()).unwrap(); - let attesters = Mutex::new(vec![]); + let num_attesters = AtomicUsize::new(0); - let attestations = state + let (attestations, split_attesters) = state .get_beacon_committees_at_slot(attestation_slot) .expect("should get committees") .iter() @@ -1164,13 +1165,14 @@ where return None; } - let mut attesters = attesters.lock(); if let Some(limit) = limit { - if attesters.len() >= limit { + // This atomics stuff is necessary because we're under a par_iter, + // and Rayon will deadlock if we use a mutex. + if num_attesters.fetch_add(1, Ordering::Relaxed) >= limit { + num_attesters.fetch_sub(1, Ordering::Relaxed); return None; } } - attesters.push(*validator_index); let mut attestation = self .produce_unaggregated_attestation_for_block( @@ -1210,14 +1212,17 @@ where ) .unwrap(); - Some((attestation, subnet_id)) + Some(((attestation, subnet_id), validator_index)) }) - .collect::>() + .unzip::<_, _, Vec<_>, Vec<_>>() }) - .collect::>(); + .unzip::<_, _, Vec<_>, Vec<_>>(); + + // Flatten attesters. + let attesters = split_attesters.into_iter().flatten().collect::>(); - let attesters = attesters.into_inner(); if let Some(limit) = limit { + assert_eq!(limit, num_attesters.load(Ordering::Relaxed)); assert_eq!( limit, attesters.len(), diff --git a/consensus/state_processing/src/common/update_progressive_balances_cache.rs b/consensus/state_processing/src/common/update_progressive_balances_cache.rs index 37e00ad6ba9..fb65e583ba8 100644 --- a/consensus/state_processing/src/common/update_progressive_balances_cache.rs +++ b/consensus/state_processing/src/common/update_progressive_balances_cache.rs @@ -69,9 +69,10 @@ pub fn update_progressive_balances_on_attestation( validator_effective_balance: u64, validator_slashed: bool, ) -> Result<(), BlockProcessingError> { - if is_progressive_balances_enabled(state) && !validator_slashed { + if is_progressive_balances_enabled(state) { state.progressive_balances_cache_mut().on_new_attestation( epoch, + validator_slashed, flag_index, validator_effective_balance, )?; diff --git a/consensus/types/src/beacon_state/progressive_balances_cache.rs b/consensus/types/src/beacon_state/progressive_balances_cache.rs index 35d8d1a3431..c0a8c29052a 100644 --- a/consensus/types/src/beacon_state/progressive_balances_cache.rs +++ b/consensus/types/src/beacon_state/progressive_balances_cache.rs @@ -64,9 +64,13 @@ impl EpochTotalBalances { pub fn on_new_attestation( &mut self, + is_slashed: bool, flag_index: usize, validator_effective_balance: u64, ) -> Result<(), BeaconStateError> { + if is_slashed { + return Ok(()); + } let balance = self .total_flag_balances .get_mut(flag_index) @@ -152,19 +156,24 @@ impl ProgressiveBalancesCache { pub fn on_new_attestation( &mut self, epoch: Epoch, + is_slashed: bool, flag_index: usize, validator_effective_balance: u64, ) -> Result<(), BeaconStateError> { let cache = self.get_inner_mut()?; if epoch == cache.current_epoch { - cache - .current_epoch_cache - .on_new_attestation(flag_index, validator_effective_balance)?; + cache.current_epoch_cache.on_new_attestation( + is_slashed, + flag_index, + validator_effective_balance, + )?; } else if epoch.safe_add(1)? == cache.current_epoch { - cache - .previous_epoch_cache - .on_new_attestation(flag_index, validator_effective_balance)?; + cache.previous_epoch_cache.on_new_attestation( + is_slashed, + flag_index, + validator_effective_balance, + )?; } else { return Err(BeaconStateError::ProgressiveBalancesCacheInconsistent); }