From ca8fef5855e33a360e1bb9795811f9b60c142a4c Mon Sep 17 00:00:00 2001 From: Jeff Biseda Date: Tue, 4 Jan 2022 00:24:16 -0800 Subject: [PATCH] retransmit consecutive leader blocks (#22157) --- core/src/progress_map.rs | 70 ++++--- core/src/replay_stage.rs | 272 +++++++++++++++++++--------- ledger/src/leader_schedule_utils.rs | 4 + 3 files changed, 232 insertions(+), 114 deletions(-) diff --git a/core/src/progress_map.rs b/core/src/progress_map.rs index bbfdc5754b00d0..3a0f566cb3ce8f 100644 --- a/core/src/progress_map.rs +++ b/core/src/progress_map.rs @@ -437,6 +437,11 @@ impl ProgressMap { .map(|fork_progress| &mut fork_progress.propagated_stats) } + pub fn get_propagated_stats_must_exist(&self, slot: Slot) -> &PropagatedStats { + self.get_propagated_stats(slot) + .unwrap_or_else(|| panic!("slot={} must exist in ProgressMap", slot)) + } + pub fn get_fork_stats(&self, slot: Slot) -> Option<&ForkStats> { self.progress_map .get(&slot) @@ -449,7 +454,13 @@ impl ProgressMap { .map(|fork_progress| &mut fork_progress.fork_stats) } - pub fn get_retransmit_info(&mut self, slot: Slot) -> Option<&mut RetransmitInfo> { + pub fn get_retransmit_info(&self, slot: Slot) -> Option<&RetransmitInfo> { + self.progress_map + .get(&slot) + .map(|fork_progress| &fork_progress.retransmit_info) + } + + pub fn get_retransmit_info_mut(&mut self, slot: Slot) -> Option<&mut RetransmitInfo> { self.progress_map .get_mut(&slot) .map(|fork_progress| &mut fork_progress.retransmit_info) @@ -467,30 +478,13 @@ impl ProgressMap { .and_then(|fork_progress| fork_progress.fork_stats.bank_hash) } - pub fn is_propagated(&self, slot: Slot) -> bool { - let leader_slot_to_check = self.get_latest_leader_slot(slot); - - // prev_leader_slot doesn't exist because already rooted - // or this validator hasn't been scheduled as a leader - // yet. In both cases the latest leader is vacuously - // confirmed - leader_slot_to_check - .map(|leader_slot_to_check| { - // If the leader's stats are None (isn't in the - // progress map), this means that prev_leader slot is - // rooted, so return true - self.get_propagated_stats(leader_slot_to_check) - .map(|stats| stats.is_propagated) - .unwrap_or(true) - }) - .unwrap_or(true) + pub fn is_propagated(&self, slot: Slot) -> Option { + self.get_propagated_stats(slot) + .map(|stats| stats.is_propagated) } - pub fn get_latest_leader_slot(&self, slot: Slot) -> Option { - let propagated_stats = self - .get_propagated_stats(slot) - .expect("All frozen banks must exist in the Progress map"); - + pub fn get_latest_leader_slot_must_exist(&self, slot: Slot) -> Option { + let propagated_stats = self.get_propagated_stats_must_exist(slot); if propagated_stats.is_leader_slot { Some(slot) } else { @@ -498,6 +492,24 @@ impl ProgressMap { } } + pub fn get_leader_propagation_slot_must_exist(&self, slot: Slot) -> (bool, Option) { + if let Some(leader_slot) = self.get_latest_leader_slot_must_exist(slot) { + // If the leader's stats are None (isn't in the + // progress map), this means that prev_leader slot is + // rooted, so return true + ( + self.is_propagated(leader_slot).unwrap_or(true), + Some(leader_slot), + ) + } else { + // prev_leader_slot doesn't exist because already rooted + // or this validator hasn't been scheduled as a leader + // yet. In both cases the latest leader is vacuously + // confirmed + (true, None) + } + } + pub fn my_latest_landed_vote(&self, slot: Slot) -> Option { self.progress_map .get(&slot) @@ -723,27 +735,27 @@ mod test { ); // None of these slot have parents which are confirmed - assert!(!progress_map.is_propagated(9)); - assert!(!progress_map.is_propagated(10)); + assert!(!progress_map.get_leader_propagation_slot_must_exist(9).0); + assert!(!progress_map.get_leader_propagation_slot_must_exist(10).0); // Insert new ForkProgress for slot 8 with no previous leader. // The previous leader before 8, slot 7, does not exist in // progress map, so is_propagated(8) should return true as // this implies the parent is rooted progress_map.insert(8, ForkProgress::new(Hash::default(), Some(7), None, 0, 0)); - assert!(progress_map.is_propagated(8)); + assert!(progress_map.get_leader_propagation_slot_must_exist(8).0); // If we set the is_propagated = true, is_propagated should return true progress_map .get_propagated_stats_mut(9) .unwrap() .is_propagated = true; - assert!(progress_map.is_propagated(9)); + assert!(progress_map.get_leader_propagation_slot_must_exist(9).0); assert!(progress_map.get(&9).unwrap().propagated_stats.is_propagated); // Because slot 9 is now confirmed, then slot 10 is also confirmed b/c 9 // is the last leader slot before 10 - assert!(progress_map.is_propagated(10)); + assert!(progress_map.get_leader_propagation_slot_must_exist(10).0); // If we make slot 10 a leader slot though, even though its previous // leader slot 9 has been confirmed, slot 10 itself is not confirmed @@ -751,6 +763,6 @@ mod test { .get_propagated_stats_mut(10) .unwrap() .is_leader_slot = true; - assert!(!progress_map.is_propagated(10)); + assert!(!progress_map.get_leader_propagation_slot_must_exist(10).0); } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index e3888218073606..5fd50c19ce79a9 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -35,6 +35,7 @@ use { blockstore::Blockstore, blockstore_processor::{self, BlockstoreProcessorError, TransactionStatusSender}, leader_schedule_cache::LeaderScheduleCache, + leader_schedule_utils::first_of_consecutive_leader_slots, }, solana_measure::measure::Measure, solana_metrics::inc_new_counter_info, @@ -606,7 +607,7 @@ impl ReplayStage { for r in heaviest_fork_failures { if let HeaviestForkFailures::NoPropagatedConfirmation(slot) = r { if let Some(latest_leader_slot) = - progress.get_latest_leader_slot(slot) + progress.get_latest_leader_slot_must_exist(slot) { progress.log_propagated_stats(latest_leader_slot, &bank_forks); } @@ -850,36 +851,66 @@ impl ReplayStage { } } + fn maybe_retransmit_unpropagated_slots( + metric_name: &'static str, + retransmit_slots_sender: &RetransmitSlotsSender, + progress: &mut ProgressMap, + latest_leader_slot: Slot, + ) { + let first_leader_group_slot = first_of_consecutive_leader_slots(latest_leader_slot); + + for slot in first_leader_group_slot..=latest_leader_slot { + let is_propagated = progress.is_propagated(slot); + if let Some(retransmit_info) = progress.get_retransmit_info_mut(slot) { + if !is_propagated.expect( + "presence of retransmit_info ensures that propagation status is present", + ) { + if retransmit_info.reached_retransmit_threshold() { + info!( + "Retrying retransmit: latest_leader_slot={} slot={} retransmit_info={:?}", + latest_leader_slot, + slot, + &retransmit_info, + ); + datapoint_info!( + metric_name, + ("latest_leader_slot", latest_leader_slot, i64), + ("slot", slot, i64), + ("retry_iteration", retransmit_info.retry_iteration, i64), + ); + let _ = retransmit_slots_sender.send(slot); + retransmit_info.increment_retry_iteration(); + } else { + debug!( + "Bypass retransmit of slot={} retransmit_info={:?}", + slot, &retransmit_info + ); + } + } + } + } + } + fn retransmit_latest_unpropagated_leader_slot( poh_recorder: &Arc>, retransmit_slots_sender: &RetransmitSlotsSender, progress: &mut ProgressMap, ) { let start_slot = poh_recorder.lock().unwrap().start_slot(); - if let Some(latest_leader_slot) = progress.get_latest_leader_slot(start_slot) { - if !progress - .get_propagated_stats(latest_leader_slot) - .map(|stats| stats.is_propagated) - .unwrap_or(true) - { - warn!("Slot not propagated: slot={}", latest_leader_slot); - let retransmit_info = progress.get_retransmit_info(latest_leader_slot).unwrap(); - if retransmit_info.reached_retransmit_threshold() { - info!( - "Retrying retransmit: start_slot={} latest_leader_slot={} retransmit_info={:?}", - start_slot, latest_leader_slot, &retransmit_info, - ); - datapoint_info!( - "replay_stage-retransmit-timing-based", - ("slot", latest_leader_slot, i64), - ("retry_iteration", retransmit_info.retry_iteration, i64), - ); - let _ = retransmit_slots_sender.send(latest_leader_slot); - retransmit_info.increment_retry_iteration(); - } else { - info!("Bypass retry: {:?}", &retransmit_info); - } - } + + if let (false, Some(latest_leader_slot)) = + progress.get_leader_propagation_slot_must_exist(start_slot) + { + debug!( + "Slot not propagated: start_slot={} latest_leader_slot={}", + start_slot, latest_leader_slot + ); + Self::maybe_retransmit_unpropagated_slots( + "replay_stage-retransmit-timing-based", + retransmit_slots_sender, + progress, + latest_leader_slot, + ); } } @@ -1372,7 +1403,9 @@ impl ReplayStage { // `poh_slot` and `parent_slot`, because they're in the same // `NUM_CONSECUTIVE_LEADER_SLOTS` block, we still skip the propagated // check because it's still within the propagation grace period. - if let Some(latest_leader_slot) = progress_map.get_latest_leader_slot(parent_slot) { + if let Some(latest_leader_slot) = + progress_map.get_latest_leader_slot_must_exist(parent_slot) + { let skip_propagated_check = poh_slot - latest_leader_slot < NUM_CONSECUTIVE_LEADER_SLOTS; if skip_propagated_check { @@ -1384,7 +1417,9 @@ impl ReplayStage { // propagation of `parent_slot`, it checks propagation of the latest ancestor // of `parent_slot` (hence the call to `get_latest_leader_slot()` in the // check above) - progress_map.is_propagated(parent_slot) + progress_map + .get_leader_propagation_slot_must_exist(parent_slot) + .0 } fn should_retransmit(poh_slot: Slot, last_retransmit_slot: &mut Slot) -> bool { @@ -1469,7 +1504,7 @@ impl ReplayStage { ); if !Self::check_propagation_for_start_leader(poh_slot, parent_slot, progress_map) { - let latest_unconfirmed_leader_slot = progress_map.get_latest_leader_slot(parent_slot) + let latest_unconfirmed_leader_slot = progress_map.get_latest_leader_slot_must_exist(parent_slot) .expect("In order for propagated check to fail, latest leader must exist in progress map"); if poh_slot != skipped_slots_info.last_skipped_slot { datapoint_info!( @@ -1485,29 +1520,13 @@ impl ReplayStage { progress_map.log_propagated_stats(latest_unconfirmed_leader_slot, bank_forks); skipped_slots_info.last_skipped_slot = poh_slot; } - if Self::should_retransmit(poh_slot, &mut skipped_slots_info.last_retransmit_slot) { - let retransmit_info = progress_map - .get_retransmit_info(latest_unconfirmed_leader_slot) - .unwrap(); - if retransmit_info.reached_retransmit_threshold() { - info!( - "Retrying retransmit: retransmit_info={:?}", - &retransmit_info - ); - datapoint_info!( - "replay_stage-retransmit", - ("slot", latest_unconfirmed_leader_slot, i64), - ("retry_iteration", retransmit_info.retry_iteration, i64), - ); - let _ = retransmit_slots_sender.send(latest_unconfirmed_leader_slot); - retransmit_info.increment_retry_iteration(); - } else { - info!( - "Bypassing retransmit of my leader slot retransmit_info={:?}", - &retransmit_info - ); - } + Self::maybe_retransmit_unpropagated_slots( + "replay_stage-retransmit", + retransmit_slots_sender, + progress_map, + latest_unconfirmed_leader_slot, + ); } return; } @@ -2345,38 +2364,24 @@ impl ReplayStage { cluster_slots: &ClusterSlots, ) { // If propagation has already been confirmed, return - if progress.is_propagated(slot) { + if progress.get_leader_propagation_slot_must_exist(slot).0 { return; } // Otherwise we have to check the votes for confirmation - let mut slot_vote_tracker = progress - .get_propagated_stats(slot) - .expect("All frozen banks must exist in the Progress map") - .slot_vote_tracker - .clone(); + let mut propagated_stats = progress + .get_propagated_stats_mut(slot) + .unwrap_or_else(|| panic!("slot={} must exist in ProgressMap", slot)); - if slot_vote_tracker.is_none() { - slot_vote_tracker = vote_tracker.get_slot_vote_tracker(slot); - progress - .get_propagated_stats_mut(slot) - .expect("All frozen banks must exist in the Progress map") - .slot_vote_tracker = slot_vote_tracker.clone(); + if propagated_stats.slot_vote_tracker.is_none() { + propagated_stats.slot_vote_tracker = vote_tracker.get_slot_vote_tracker(slot); } + let slot_vote_tracker = propagated_stats.slot_vote_tracker.clone(); - let mut cluster_slot_pubkeys = progress - .get_propagated_stats(slot) - .expect("All frozen banks must exist in the Progress map") - .cluster_slot_pubkeys - .clone(); - - if cluster_slot_pubkeys.is_none() { - cluster_slot_pubkeys = cluster_slots.lookup(slot); - progress - .get_propagated_stats_mut(slot) - .expect("All frozen banks must exist in the Progress map") - .cluster_slot_pubkeys = cluster_slot_pubkeys.clone(); + if propagated_stats.cluster_slot_pubkeys.is_none() { + propagated_stats.cluster_slot_pubkeys = cluster_slots.lookup(slot); } + let cluster_slot_pubkeys = propagated_stats.cluster_slot_pubkeys.clone(); let newly_voted_pubkeys = slot_vote_tracker .as_ref() @@ -2538,7 +2543,10 @@ impl ReplayStage { ) }; - let propagation_confirmed = is_leader_slot || progress.is_propagated(bank.slot()); + let propagation_confirmed = is_leader_slot + || progress + .get_leader_propagation_slot_must_exist(bank.slot()) + .0; if is_locked_out { failure_reasons.push(HeaviestForkFailures::LockedOut(bank.slot())); @@ -2584,7 +2592,7 @@ impl ReplayStage { fork_tip: Slot, bank_forks: &RwLock, ) { - let mut current_leader_slot = progress.get_latest_leader_slot(fork_tip); + let mut current_leader_slot = progress.get_latest_leader_slot_must_exist(fork_tip); let mut did_newly_reach_threshold = false; let root = bank_forks.read().unwrap().root(); loop { @@ -2951,6 +2959,7 @@ pub mod tests { use { super::*, crate::{ + broadcast_stage::RetransmitSlotsReceiver, consensus::Tower, progress_map::{ValidatorStakeInfo, RETRANSMIT_BASE_DELAY_MS}, replay_stage::ReplayStage, @@ -4388,7 +4397,7 @@ pub mod tests { // Make sure is_propagated == false so that the propagation logic // runs in `update_propagation_status` - assert!(!progress_map.is_propagated(10)); + assert!(!progress_map.get_leader_propagation_slot_must_exist(10).0); let vote_tracker = VoteTracker::new(&bank_forks.root_bank()); vote_tracker.insert_vote(10, vote_pubkey); @@ -5015,7 +5024,11 @@ pub mod tests { vote_tracker.insert_vote(root_bank.slot(), *vote_key); } - assert!(!progress.is_propagated(root_bank.slot())); + assert!( + !progress + .get_leader_propagation_slot_must_exist(root_bank.slot()) + .0 + ); // Update propagation status let tower = Tower::new_for_tests(0, 0.67); @@ -5033,7 +5046,11 @@ pub mod tests { ); // Check status is true - assert!(progress.is_propagated(root_bank.slot())); + assert!( + progress + .get_leader_propagation_slot_must_exist(root_bank.slot()) + .0 + ); } #[test] @@ -5984,7 +6001,7 @@ pub mod tests { "retry_iteration=0, elapsed < 2^0 * RETRANSMIT_BASE_DELAY_MS" ); - progress.get_retransmit_info(0).unwrap().retry_time = + progress.get_retransmit_info_mut(0).unwrap().retry_time = Some(Instant::now() - Duration::from_millis(RETRANSMIT_BASE_DELAY_MS + 1)); ReplayStage::retransmit_latest_unpropagated_leader_slot( &poh_recorder, @@ -6013,7 +6030,7 @@ pub mod tests { "retry_iteration=1, elapsed < 2^1 * RETRY_BASE_DELAY_MS" ); - progress.get_retransmit_info(0).unwrap().retry_time = + progress.get_retransmit_info_mut(0).unwrap().retry_time = Some(Instant::now() - Duration::from_millis(RETRANSMIT_BASE_DELAY_MS + 1)); ReplayStage::retransmit_latest_unpropagated_leader_slot( &poh_recorder, @@ -6026,7 +6043,7 @@ pub mod tests { "retry_iteration=1, elapsed < 2^1 * RETRANSMIT_BASE_DELAY_MS" ); - progress.get_retransmit_info(0).unwrap().retry_time = + progress.get_retransmit_info_mut(0).unwrap().retry_time = Some(Instant::now() - Duration::from_millis(2 * RETRANSMIT_BASE_DELAY_MS + 1)); ReplayStage::retransmit_latest_unpropagated_leader_slot( &poh_recorder, @@ -6046,11 +6063,11 @@ pub mod tests { // increment to retry iteration 3 progress - .get_retransmit_info(0) + .get_retransmit_info_mut(0) .unwrap() .increment_retry_iteration(); - progress.get_retransmit_info(0).unwrap().retry_time = + progress.get_retransmit_info_mut(0).unwrap().retry_time = Some(Instant::now() - Duration::from_millis(2 * RETRANSMIT_BASE_DELAY_MS + 1)); ReplayStage::retransmit_latest_unpropagated_leader_slot( &poh_recorder, @@ -6063,7 +6080,7 @@ pub mod tests { "retry_iteration=3, elapsed < 2^3 * RETRANSMIT_BASE_DELAY_MS" ); - progress.get_retransmit_info(0).unwrap().retry_time = + progress.get_retransmit_info_mut(0).unwrap().retry_time = Some(Instant::now() - Duration::from_millis(8 * RETRANSMIT_BASE_DELAY_MS + 1)); ReplayStage::retransmit_latest_unpropagated_leader_slot( &poh_recorder, @@ -6082,6 +6099,91 @@ pub mod tests { ); } + fn receive_slots(retransmit_slots_receiver: &RetransmitSlotsReceiver) -> Vec { + let mut slots = Vec::default(); + while let Ok(slot) = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10)) { + slots.push(slot); + } + slots + } + + #[test] + fn test_maybe_retransmit_unpropagated_slots() { + let ReplayBlockstoreComponents { + validator_node_to_vote_keys, + leader_schedule_cache, + vote_simulator, + .. + } = replay_blockstore_components(None, 10, None::); + + let VoteSimulator { + mut progress, + ref bank_forks, + .. + } = vote_simulator; + + let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); + + let mut prev_index = 0; + for i in (1..10).chain(11..15) { + let bank = Bank::new_from_parent( + bank_forks.read().unwrap().get(prev_index).unwrap(), + &leader_schedule_cache.slot_leader_at(i, None).unwrap(), + i, + ); + progress.insert( + i, + ForkProgress::new_from_bank( + &bank, + bank.collector_id(), + validator_node_to_vote_keys + .get(bank.collector_id()) + .unwrap(), + Some(0), + 0, + 0, + ), + ); + assert!(progress.get_propagated_stats(i).unwrap().is_leader_slot); + bank.freeze(); + bank_forks.write().unwrap().insert(bank); + prev_index = i; + } + + // expect single slot when latest_leader_slot is the start of a consecutive range + let latest_leader_slot = 0; + ReplayStage::maybe_retransmit_unpropagated_slots( + "test", + &retransmit_slots_sender, + &mut progress, + latest_leader_slot, + ); + let received_slots = receive_slots(&retransmit_slots_receiver); + assert_eq!(received_slots, vec![0]); + + // expect range of slots from start of consecutive slots + let latest_leader_slot = 6; + ReplayStage::maybe_retransmit_unpropagated_slots( + "test", + &retransmit_slots_sender, + &mut progress, + latest_leader_slot, + ); + let received_slots = receive_slots(&retransmit_slots_receiver); + assert_eq!(received_slots, vec![4, 5, 6]); + + // expect range of slots skipping a discontinuity in the range + let latest_leader_slot = 11; + ReplayStage::maybe_retransmit_unpropagated_slots( + "test", + &retransmit_slots_sender, + &mut progress, + latest_leader_slot, + ); + let received_slots = receive_slots(&retransmit_slots_receiver); + assert_eq!(received_slots, vec![8, 9, 11]); + } + fn run_compute_and_select_forks( bank_forks: &RwLock, progress: &mut ProgressMap, diff --git a/ledger/src/leader_schedule_utils.rs b/ledger/src/leader_schedule_utils.rs index 365c3075c3225d..3ed5528af5ea90 100644 --- a/ledger/src/leader_schedule_utils.rs +++ b/ledger/src/leader_schedule_utils.rs @@ -61,6 +61,10 @@ pub fn num_ticks_left_in_slot(bank: &Bank, tick_height: u64) -> u64 { bank.ticks_per_slot() - tick_height % bank.ticks_per_slot() } +pub fn first_of_consecutive_leader_slots(slot: Slot) -> Slot { + (slot / NUM_CONSECUTIVE_LEADER_SLOTS) * NUM_CONSECUTIVE_LEADER_SLOTS +} + fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) { // Sort first by stake. If stakes are the same, sort by pubkey to ensure a // deterministic result.