Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
reworks gossip crds timeouts (#30468)
Browse files Browse the repository at this point in the history
CrdsGossipPull::make_timeouts iterates over the stakes hashmap and
creates a new hashmap which is unnecessary:
https://github.com/solana-labs/solana/blob/c032dc275/gossip/src/crds_gossip_pull.rs#L517-L539

The commit instead keeps a reference to the stakes hashmap.
  • Loading branch information
behzadnouri authored Mar 27, 2023
1 parent b53656b commit d4b30ad
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 99 deletions.
13 changes: 9 additions & 4 deletions gossip/benches/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use {
rayon::ThreadPoolBuilder,
solana_gossip::{
crds::{Crds, GossipRoute},
crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
crds_gossip_pull::{CrdsTimeouts, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
crds_value::CrdsValue,
},
solana_sdk::pubkey::Pubkey,
std::collections::HashMap,
std::{collections::HashMap, time::Duration},
test::Bencher,
};

Expand All @@ -24,8 +24,13 @@ fn bench_find_old_labels(bencher: &mut Bencher) {
std::iter::repeat_with(|| (CrdsValue::new_rand(&mut rng, None), rng.gen_range(0, now)))
.take(50_000)
.for_each(|(v, ts)| assert!(crds.insert(v, ts, GossipRoute::LocalMessage).is_ok()));
let mut timeouts = HashMap::new();
timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS);
let stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]);
let timeouts = CrdsTimeouts::new(
Pubkey::new_unique(),
CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, // default_timeout
Duration::from_secs(48 * 3600), // epoch_duration
&stakes,
);
bencher.iter(|| {
let out = crds.find_old_labels(&thread_pool, now, &timeouts);
assert!(out.len() > 10);
Expand Down
28 changes: 20 additions & 8 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use {
crds::{Crds, Cursor, GossipRoute},
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
crds_gossip_pull::{
CrdsFilter, CrdsTimeouts, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
},
crds_value::{
self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, IncrementalSnapshotHashes,
LowestSlot, NodeInstance, SnapshotHashes, Version, Vote, MAX_WALLCLOCK,
Expand Down Expand Up @@ -2158,7 +2160,7 @@ impl ClusterInfo {
&self,
from: &Pubkey,
crds_values: Vec<CrdsValue>,
timeouts: &HashMap<Pubkey, u64>,
timeouts: &CrdsTimeouts,
) -> (usize, usize, usize) {
let len = crds_values.len();
trace!("PullResponse me: {} from: {} len={}", self.id(), from, len);
Expand Down Expand Up @@ -3305,9 +3307,13 @@ RPC Enabled Nodes: 1"#;
});
let entrypoint_pubkey = solana_sdk::pubkey::new_rand();
let data = test_crds_values(entrypoint_pubkey);
let timeouts = [(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS)]
.into_iter()
.collect();
let stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]);
let timeouts = CrdsTimeouts::new(
cluster_info.id(),
CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, // default_timeout
Duration::from_secs(48 * 3600), // epoch_duration
&stakes,
);
assert_eq!(
(0, 0, 1),
ClusterInfo::handle_pull_response(
Expand Down Expand Up @@ -4081,9 +4087,10 @@ RPC Enabled Nodes: 1"#;
let entrypoint_crdsvalue =
CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(entrypoint.clone()));
let cluster_info = Arc::new(cluster_info);
let stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]);
let timeouts = cluster_info.gossip.make_timeouts(
cluster_info.id(),
&HashMap::default(), // stakes,
&stakes,
Duration::from_millis(cluster_info.gossip.pull.crds_timeout),
);
ClusterInfo::handle_pull_response(
Expand Down Expand Up @@ -4729,8 +4736,13 @@ RPC Enabled Nodes: 1"#;
})
.take(NO_ENTRIES)
.collect();
let mut timeouts = HashMap::new();
timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS * 4);
let stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]);
let timeouts = CrdsTimeouts::new(
cluster_info.id(),
CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS * 4, // default_timeout
Duration::from_secs(48 * 3600), // epoch_duration
&stakes,
);
assert_eq!(
(0, 0, NO_ENTRIES),
cluster_info.handle_pull_response(&entrypoint_pubkey, data, &timeouts)
Expand Down
148 changes: 102 additions & 46 deletions gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
use {
crate::{
crds_entry::CrdsEntry,
crds_gossip_pull::CrdsTimeouts,
crds_shards::CrdsShards,
crds_value::{CrdsData, CrdsValue, CrdsValueLabel},
legacy_contact_info::LegacyContactInfo as ContactInfo,
Expand Down Expand Up @@ -472,15 +473,12 @@ impl Crds {
&self,
thread_pool: &ThreadPool,
now: u64,
timeouts: &HashMap<Pubkey, u64>,
timeouts: &CrdsTimeouts,
) -> Vec<CrdsValueLabel> {
let default_timeout = *timeouts
.get(&Pubkey::default())
.expect("must have default timeout");
// Given an index of all crd values associated with a pubkey,
// returns crds labels of old values to be evicted.
let evict = |pubkey, index: &IndexSet<usize>| {
let timeout = timeouts.get(pubkey).copied().unwrap_or(default_timeout);
let timeout = timeouts[pubkey];
// If the origin's contact-info hasn't expired yet then preserve
// all associated values.
let origin = CrdsValueLabel::LegacyContactInfo(*pubkey);
Expand Down Expand Up @@ -732,7 +730,7 @@ mod tests {
signature::{Keypair, Signer},
timing::timestamp,
},
std::{collections::HashSet, iter::repeat_with, net::Ipv4Addr},
std::{collections::HashSet, iter::repeat_with, net::Ipv4Addr, time::Duration},
};

#[test]
Expand Down Expand Up @@ -888,17 +886,34 @@ mod tests {
crds.insert(val.clone(), 1, GossipRoute::LocalMessage),
Ok(())
);
let mut set = HashMap::new();
set.insert(Pubkey::default(), 0);
assert!(crds.find_old_labels(&thread_pool, 0, &set).is_empty());
set.insert(Pubkey::default(), 1);
let pubkey = Pubkey::new_unique();
let stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]);
let epoch_duration = Duration::from_secs(48 * 3600);
let timeouts = CrdsTimeouts::new(
pubkey,
0u64, // default_timeout,
epoch_duration,
&stakes,
);
assert!(crds.find_old_labels(&thread_pool, 0, &timeouts).is_empty());
let timeouts = CrdsTimeouts::new(
pubkey,
1u64, // default_timeout,
epoch_duration,
&stakes,
);
assert_eq!(
crds.find_old_labels(&thread_pool, 2, &set),
crds.find_old_labels(&thread_pool, 2, &timeouts),
vec![val.label()]
);
set.insert(Pubkey::default(), 2);
let timeouts = CrdsTimeouts::new(
pubkey,
2u64, // default_timeout,
epoch_duration,
&stakes,
);
assert_eq!(
crds.find_old_labels(&thread_pool, 4, &set),
crds.find_old_labels(&thread_pool, 4, &timeouts),
vec![val.label()]
);
}
Expand All @@ -907,24 +922,51 @@ mod tests {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut rng = thread_rng();
let mut crds = Crds::default();
let mut timeouts = HashMap::new();
let val = CrdsValue::new_rand(&mut rng, None);
timeouts.insert(Pubkey::default(), 3);
let mut stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]);
let timeouts = CrdsTimeouts::new(
Pubkey::new_unique(),
3, // default_timeout
Duration::from_secs(48 * 3600), // epoch_duration
&stakes,
);
assert_eq!(
crds.insert(val.clone(), 0, GossipRoute::LocalMessage),
Ok(())
);
assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty());
timeouts.insert(val.pubkey(), 1);
stakes.insert(val.pubkey(), 1u64);
let timeouts = CrdsTimeouts::new(
Pubkey::new_unique(),
1, // default_timeout
Duration::from_millis(1), // epoch_duration
&stakes,
);
assert_eq!(
crds.find_old_labels(&thread_pool, 2, &timeouts),
vec![val.label()]
);
timeouts.insert(val.pubkey(), u64::MAX);
let timeouts = CrdsTimeouts::new(
Pubkey::new_unique(),
3, // default_timeout
Duration::from_secs(48 * 3600), // epoch_duration
&stakes,
);
assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty());
timeouts.insert(Pubkey::default(), 1);
let timeouts = CrdsTimeouts::new(
Pubkey::new_unique(),
1, // default_timeout
Duration::from_secs(48 * 3600), // epoch_duration
&stakes,
);
assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty());
timeouts.remove(&val.pubkey());
stakes.remove(&val.pubkey());
let timeouts = CrdsTimeouts::new(
Pubkey::new_unique(),
1, // default_timeout
Duration::from_secs(48 * 3600), // epoch_duration
&stakes,
);
assert_eq!(
crds.find_old_labels(&thread_pool, 2, &timeouts),
vec![val.label()]
Expand All @@ -940,14 +982,19 @@ mod tests {
crds.insert(val.clone(), 1, GossipRoute::LocalMessage),
Ok(_)
);
let mut set = HashMap::new();
set.insert(Pubkey::default(), 1);
let stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]);
let timeouts = CrdsTimeouts::new(
Pubkey::new_unique(),
1, // default_timeout
Duration::from_secs(48 * 3600), // epoch_duration
&stakes,
);
assert_eq!(
crds.find_old_labels(&thread_pool, 2, &set),
crds.find_old_labels(&thread_pool, 2, &timeouts),
vec![val.label()]
);
crds.remove(&val.label(), /*now=*/ 0);
assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty());
assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty());
}
#[test]
fn test_find_old_records_staked() {
Expand All @@ -961,28 +1008,35 @@ mod tests {
crds.insert(val.clone(), 1, GossipRoute::LocalMessage),
Ok(())
);
let mut set = HashMap::new();
let mut stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]);
let timeouts = CrdsTimeouts::new(
Pubkey::new_unique(),
0, // default_timeout
Duration::from_secs(48 * 3600), // epoch_duration
&stakes,
);
//now < timestamp
set.insert(Pubkey::default(), 0);
set.insert(val.pubkey(), 0);
assert!(crds.find_old_labels(&thread_pool, 0, &set).is_empty());
assert!(crds.find_old_labels(&thread_pool, 0, &timeouts).is_empty());

//pubkey shouldn't expire since its timeout is MAX
set.insert(val.pubkey(), std::u64::MAX);
assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty());

//default has max timeout, but pubkey should still expire
set.insert(Pubkey::default(), std::u64::MAX);
set.insert(val.pubkey(), 1);
assert_eq!(
crds.find_old_labels(&thread_pool, 2, &set),
vec![val.label()]
stakes.insert(val.pubkey(), 1u64);
let timeouts = CrdsTimeouts::new(
Pubkey::new_unique(),
0, // default_timeout
Duration::from_secs(48 * 3600), // epoch_duration
&stakes,
);
assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty());

set.insert(val.pubkey(), 2);
assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty());
let timeouts = CrdsTimeouts::new(
Pubkey::new_unique(),
0, // default_timeout
Duration::from_millis(2), // epoch_duration
&stakes,
);
assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty());
assert_eq!(
crds.find_old_labels(&thread_pool, 3, &set),
crds.find_old_labels(&thread_pool, 3, &timeouts),
vec![val.label()]
);
}
Expand Down Expand Up @@ -1353,17 +1407,19 @@ mod tests {
crds.insert(val.clone(), 1, GossipRoute::LocalMessage),
Ok(_)
);
let mut set = HashMap::new();

//default has max timeout, but pubkey should still expire
set.insert(Pubkey::default(), std::u64::MAX);
set.insert(val.pubkey(), 1);
let stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]);
let timeouts = CrdsTimeouts::new(
Pubkey::new_unique(),
1, // default_timeout
Duration::from_millis(1), // epoch_duration
&stakes,
);
assert_eq!(
crds.find_old_labels(&thread_pool, 2, &set),
crds.find_old_labels(&thread_pool, 2, &timeouts),
vec![val.label()]
);
crds.remove(&val.label(), /*now=*/ 0);
assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty());
assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty());
}

#[test]
Expand Down
17 changes: 8 additions & 9 deletions gossip/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use {
cluster_info_metrics::GossipStats,
crds::{Crds, GossipRoute},
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats},
crds_gossip_pull::{CrdsFilter, CrdsGossipPull, CrdsTimeouts, ProcessPullStats},
crds_gossip_push::CrdsGossipPush,
crds_value::{CrdsData, CrdsValue},
duplicate_shred::{self, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
Expand Down Expand Up @@ -258,7 +258,7 @@ impl CrdsGossip {

pub fn filter_pull_responses(
&self,
timeouts: &HashMap<Pubkey, u64>,
timeouts: &CrdsTimeouts,
response: Vec<CrdsValue>,
now: u64,
process_pull_stats: &mut ProcessPullStats,
Expand Down Expand Up @@ -292,12 +292,12 @@ impl CrdsGossip {
);
}

pub fn make_timeouts(
pub fn make_timeouts<'a>(
&self,
self_pubkey: Pubkey,
stakes: &HashMap<Pubkey, u64>,
stakes: &'a HashMap<Pubkey, u64>,
epoch_duration: Duration,
) -> HashMap<Pubkey, u64> {
) -> CrdsTimeouts<'a> {
self.pull.make_timeouts(self_pubkey, stakes, epoch_duration)
}

Expand All @@ -306,13 +306,12 @@ impl CrdsGossip {
self_pubkey: &Pubkey,
thread_pool: &ThreadPool,
now: u64,
timeouts: &HashMap<Pubkey, u64>,
timeouts: &CrdsTimeouts,
) -> usize {
let mut rv = 0;
if now > self.pull.crds_timeout {
//sanity check
assert_eq!(timeouts[self_pubkey], std::u64::MAX);
assert!(timeouts.contains_key(&Pubkey::default()));
debug_assert_eq!(timeouts[self_pubkey], u64::MAX);
debug_assert_ne!(timeouts[&Pubkey::default()], 0u64);
rv = CrdsGossipPull::purge_active(thread_pool, &self.crds, now, timeouts);
}
self.crds
Expand Down
Loading

0 comments on commit d4b30ad

Please sign in to comment.