Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Add RestartLastVotedForkSlots for wen_restart. #33239

Merged
merged 20 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ce81bbc
Add RestartLastVotedForkSlots and RestartHeaviestFork for wen_restart.
wen-coding Sep 13, 2023
0edf40e
Fix linter errors.
wen-coding Sep 13, 2023
1ec981b
Merge branch 'master' into wen_restart_gossip_change
wen-coding Sep 13, 2023
a3749b2
Revert RestartHeaviestFork, it will be added in another PR.
wen-coding Sep 13, 2023
79bedf8
Merge branch 'master' into wen_restart_gossip_change
wen-coding Sep 13, 2023
719534d
Merge branch 'master' into wen_restart_gossip_change
wen-coding Sep 14, 2023
25e0117
Update frozen abi message.
wen-coding Sep 15, 2023
1fcc81d
Fix wrong number in test generation, change to pub(crate) to limit sc…
wen-coding Sep 15, 2023
b7077a7
Separate push_epoch_slots and push_restart_last_voted_fork_slots.
wen-coding Sep 15, 2023
354673a
Add RestartLastVotedForkSlots data structure.
wen-coding Sep 17, 2023
0028f64
Remove unused parts to make PR smaller.
wen-coding Sep 18, 2023
46d2054
Remove unused clone.
wen-coding Sep 18, 2023
cc11d42
Use CompressedSlotsVec to share code between EpochSlots and RestartLa…
wen-coding Sep 18, 2023
b0cead0
Merge branch 'master' into wen_restart_gossip_change
wen-coding Sep 18, 2023
5b7a724
Add total_messages to show how many messages are there.
wen-coding Oct 2, 2023
4821d8a
Merge branch 'master' into wen_restart_gossip_change
wen-coding Oct 2, 2023
ae8d01d
Reduce RestartLastVotedForkSlots to one packet (16k slots).
wen-coding Oct 2, 2023
1ee2699
Replace last_vote_slot with shred_version, revert CompressedSlotsVec.
wen-coding Oct 6, 2023
78a9ded
Merge branch 'master' into wen_restart_gossip_change
wen-coding Oct 9, 2023
866228b
Merge branch 'master' into wen_restart_gossip_change
wen-coding Oct 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert RestartHeaviestFork, it will be added in another PR.
  • Loading branch information
wen-coding committed Sep 13, 2023
commit a3749b2d21073cee6585fa88b728273dec5cb135
132 changes: 3 additions & 129 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use {
},
crds_value::{
self, AccountsHashes, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex,
LegacySnapshotHashes, LowestSlot, NodeInstance, Percent, SnapshotHashes, Version, Vote,
MAX_PERCENT, MAX_WALLCLOCK,
LegacySnapshotHashes, LowestSlot, NodeInstance, SnapshotHashes, Version, Vote,
MAX_WALLCLOCK,
},
duplicate_shred::DuplicateShred,
epoch_slots::EpochSlots,
Expand Down Expand Up @@ -398,8 +398,7 @@ fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) {
CrdsData::LowestSlot(_, _)
| CrdsData::LegacyVersion(_)
| CrdsData::DuplicateShred(_, _)
| CrdsData::RestartLastVotedForkSlots(_, _, _, _)
| CrdsData::RestartHeaviestFork(_, _, _) => {
| CrdsData::RestartLastVotedForkSlots(_, _, _, _) => {
let stake = stakes.get(&value.pubkey()).copied();
stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP
}
Expand Down Expand Up @@ -1009,28 +1008,6 @@ impl ClusterInfo {
}
}

pub fn push_restart_heaviest_fork(
&self,
slot: Slot,
hash: Hash,
percent: f64,
) -> Result<(), String> {
if !(0.0..1.0).contains(&percent) {
return Err(format!(
"heaviest_fork with out of bound percent, ignored: {}",
percent
));
}

let message = CrdsData::RestartHeaviestFork(
slot,
hash,
Percent::new(self.id(), (percent * MAX_PERCENT as f64) as u16),
);
self.push_message(CrdsValue::new_signed(message, &self.keypair()));
Ok(())
}

fn time_gossip_read_lock<'a>(
&'a self,
label: &'static str,
Expand Down Expand Up @@ -1329,26 +1306,6 @@ impl ClusterInfo {
.collect()
}

/// Returns heaviest-fork inserted since the given cursor.
/// Excludes entries from nodes with unkown or different shred version.
pub fn get_restart_heaviest_fork(&self, cursor: &mut Cursor) -> Vec<(Slot, Hash, Percent)> {
let self_shred_version = Some(self.my_shred_version());
let gossip_crds = self.gossip.crds.read().unwrap();
gossip_crds
.get_restart_heaviest_fork(cursor)
.filter(|entry| {
let origin = entry.value.pubkey();
gossip_crds.get_shred_version(&origin) == self_shred_version
})
.map(|entry| match &entry.value.data {
CrdsData::RestartHeaviestFork(last_vote, last_vote_hash, percent) => {
(*last_vote, *last_vote_hash, percent.clone())
}
_ => panic!("this should not happen!"),
})
.collect()
}

/// Returns duplicate-shreds inserted since the given cursor.
pub(crate) fn get_duplicate_shreds(&self, cursor: &mut Cursor) -> Vec<DuplicateShred> {
let gossip_crds = self.gossip.crds.read().unwrap();
Expand Down Expand Up @@ -4178,89 +4135,6 @@ mod tests {
assert_eq!(slots[2].1.from, node_pubkey);
}

#[test]
fn test_push_restart_heaviest_fork() {
solana_logger::setup();
let keypair = Arc::new(Keypair::new());
let pubkey = keypair.pubkey();
let contact_info = ContactInfo::new_localhost(&pubkey, 0);
let cluster_info = ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified);

// make sure empty crds is handled correctly
let mut cursor = Cursor::default();
let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut cursor);
assert_eq!(heaviest_forks, vec![]);

// add new message
let slot1 = 53;
let hash1 = Hash::new_unique();
let percent1 = 0.015;
assert!(cluster_info
.push_restart_heaviest_fork(slot1, hash1, percent1)
.is_ok());
cluster_info.flush_push_queue();

let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut cursor);
assert_eq!(heaviest_forks.len(), 1);
let (slot, hash, percent) = &heaviest_forks[0];
assert_eq!(slot, &slot1);
assert_eq!(hash, &hash1);
assert!((percent.percent as f64 - percent1 * MAX_PERCENT as f64).abs() < f64::EPSILON);
assert_eq!(percent.from, pubkey);

// ignore bad input
assert_eq!(
cluster_info.push_restart_heaviest_fork(slot1, hash1, 1.5),
Err("heaviest_fork with out of bound percent, ignored: 1.5".to_string())
);
assert_eq!(
cluster_info.push_restart_heaviest_fork(slot1, hash1, -0.3),
Err("heaviest_fork with out of bound percent, ignored: -0.3".to_string())
);

// Test with different shred versions.
let mut rng = rand::thread_rng();
let pubkey2 = Pubkey::new_unique();
let mut new_node = LegacyContactInfo::new_rand(&mut rng, Some(pubkey2));
new_node.set_shred_version(42);
let slot2 = 54;
let hash2 = Hash::new_unique();
let percent2 = 0.023;
let entries = vec![
CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new_node)),
CrdsValue::new_unsigned(CrdsData::RestartHeaviestFork(
slot2,
hash2,
Percent::new(pubkey2, (percent2 * MAX_PERCENT as f64) as u16),
)),
];
{
let mut gossip_crds = cluster_info.gossip.crds.write().unwrap();
for entry in entries {
assert!(gossip_crds
.insert(entry, /*now=*/ 0, GossipRoute::LocalMessage)
.is_ok());
}
}
// Should exclude other node's heaviest_fork because of different
// shred-version.
let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut Cursor::default());
assert_eq!(heaviest_forks.len(), 1);
assert_eq!(heaviest_forks[0].2.from, pubkey);
// Match shred versions.
{
let mut node = cluster_info.my_contact_info.write().unwrap();
node.set_shred_version(42);
}
cluster_info.push_self();
cluster_info.flush_push_queue();
// Should now include both epoch slots.
let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut Cursor::default());
assert_eq!(heaviest_forks.len(), 2);
assert_eq!(heaviest_forks[0].2.from, pubkey);
assert_eq!(heaviest_forks[1].2.from, pubkey2);
}

#[test]
fn test_append_entrypoint_to_pulls() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
Expand Down
4 changes: 0 additions & 4 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,8 +637,6 @@ pub(crate) fn submit_gossip_stats(
crds_stats.pull.counts[12],
i64
),
("RestartHeaviestFork-push", crds_stats.push.counts[13], i64),
("RestartHeaviestFork-pull", crds_stats.pull.counts[13], i64),
(
"all-push",
crds_stats.push.counts.iter().sum::<usize>(),
Expand Down Expand Up @@ -686,8 +684,6 @@ pub(crate) fn submit_gossip_stats(
crds_stats.pull.fails[12],
i64
),
("RestartHeaviestFork-push", crds_stats.push.fails[13], i64),
("RestartHeaviestFork-pull", crds_stats.pull.fails[13], i64),
("all-push", crds_stats.push.fails.iter().sum::<usize>(), i64),
("all-pull", crds_stats.pull.fails.iter().sum::<usize>(), i64),
);
Expand Down
94 changes: 3 additions & 91 deletions gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ pub struct Crds {
duplicate_shreds: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Indices of RestartLastVotedForkSlots keyed by insert order.
restart_last_voted_fork_slots: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Indices of RestartHeaviestFork keyed by insert order.
restart_heaviest_fork: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Indices of all crds values associated with a node.
records: HashMap<Pubkey, IndexSet<usize>>,
// Indices of all entries keyed by insert order.
Expand All @@ -107,7 +105,7 @@ pub enum GossipRoute<'a> {
PushMessage(/*from:*/ &'a Pubkey),
}

type CrdsCountsArray = [usize; 14];
type CrdsCountsArray = [usize; 13];

pub(crate) struct CrdsDataStats {
pub(crate) counts: CrdsCountsArray,
Expand Down Expand Up @@ -174,7 +172,6 @@ impl Default for Crds {
epoch_slots: BTreeMap::default(),
duplicate_shreds: BTreeMap::default(),
restart_last_voted_fork_slots: BTreeMap::default(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these values are only going to matter during restart, then we don't need the overhead of this index.
You can just use this function:
https://github.com/solana-labs/solana/blob/6db57f81d/gossip/src/crds.rs#L380-L390
and filter on RestartLastVotedForkSlots.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair, removed now.

restart_heaviest_fork: BTreeMap::default(),
records: HashMap::default(),
entries: BTreeMap::default(),
purged: VecDeque::default(),
Expand Down Expand Up @@ -252,10 +249,6 @@ impl Crds {
self.restart_last_voted_fork_slots
.insert(value.ordinal, entry_index);
}
CrdsData::RestartHeaviestFork(_, _, _) => {
self.restart_heaviest_fork
.insert(value.ordinal, entry_index);
}
_ => (),
};
self.entries.insert(value.ordinal, entry_index);
Expand Down Expand Up @@ -297,11 +290,6 @@ impl Crds {
self.restart_last_voted_fork_slots
.insert(value.ordinal, entry_index);
}
CrdsData::RestartHeaviestFork(_, _, _) => {
self.restart_heaviest_fork.remove(&entry.get().ordinal);
self.restart_heaviest_fork
.insert(value.ordinal, entry_index);
}
_ => (),
}
self.entries.remove(&entry.get().ordinal);
Expand Down Expand Up @@ -417,21 +405,6 @@ impl Crds {
})
}

/// Returns heaviest_fork inserted since the given cursor.
/// Updates the cursor as the values are consumed.
pub(crate) fn get_restart_heaviest_fork<'a>(
&'a self,
cursor: &'a mut Cursor,
) -> impl Iterator<Item = &'a VersionedCrdsValue> {
let range = (Bound::Included(cursor.ordinal()), Bound::Unbounded);
self.restart_heaviest_fork
.range(range)
.map(move |(ordinal, index)| {
cursor.consume(*ordinal);
self.table.index(*index)
})
}

/// Returns all entries inserted since the given cursor.
pub(crate) fn get_entries<'a>(
&'a self,
Expand Down Expand Up @@ -602,9 +575,6 @@ impl Crds {
CrdsData::RestartLastVotedForkSlots(_, _, _, _) => {
self.restart_last_voted_fork_slots.remove(&value.ordinal);
}
CrdsData::RestartHeaviestFork(_, _, _) => {
self.restart_heaviest_fork.remove(&value.ordinal);
}
_ => (),
}
self.entries.remove(&value.ordinal);
Expand Down Expand Up @@ -646,9 +616,6 @@ impl Crds {
self.restart_last_voted_fork_slots
.insert(value.ordinal, index);
}
CrdsData::RestartHeaviestFork(_, _, _) => {
self.restart_heaviest_fork.insert(value.ordinal, index);
}
_ => (),
};
self.entries.insert(value.ordinal, index);
Expand Down Expand Up @@ -790,7 +757,6 @@ impl CrdsDataStats {
CrdsData::SnapshotHashes(_) => 10,
CrdsData::ContactInfo(_) => 11,
CrdsData::RestartLastVotedForkSlots(_, _, _, _) => 12,
CrdsData::RestartHeaviestFork(_, _, _) => 13,
// Update CrdsCountsArray if new items are added here.
}
}
Expand Down Expand Up @@ -1189,7 +1155,6 @@ mod tests {
usize, // number of votes
usize, // number of epoch slots
usize, // number of restart last voted fork slots
usize, // number of restart heaviest forks
) {
let size = crds.table.len();
let since = if size == 0 || rng.gen() {
Expand Down Expand Up @@ -1250,33 +1215,6 @@ mod tests {
_ => panic!("not a last-voted-fork-slot!"),
}
}
let num_heaviest_forks = crds
.table
.values()
.filter(|v| v.ordinal >= since)
.filter(|v| matches!(v.value.data, CrdsData::RestartHeaviestFork(_, _, _)))
.count();
let mut cursor = Cursor(since);
assert_eq!(
num_heaviest_forks,
crds.get_restart_heaviest_fork(&mut cursor).count()
);
assert_eq!(
cursor.0,
crds.restart_heaviest_fork
.iter()
.last()
.map(|(k, _)| k + 1)
.unwrap_or_default()
.max(since)
);
for value in crds.get_restart_heaviest_fork(&mut Cursor(since)) {
assert!(value.ordinal >= since);
match value.value.data {
CrdsData::RestartHeaviestFork(_, _, _) => (),
_ => panic!("not a heaviest-fork!"),
}
}
let num_votes = crds
.table
.values()
Expand Down Expand Up @@ -1344,11 +1282,6 @@ mod tests {
)
})
.count();
let num_restart_heaviest_forks = crds
.table
.values()
.filter(|v| matches!(v.value.data, CrdsData::RestartHeaviestFork(_, _, _)))
.count();
assert_eq!(
crds.table.len(),
crds.get_entries(&mut Cursor::default()).count()
Expand All @@ -1364,11 +1297,6 @@ mod tests {
crds.get_restart_last_voted_fork_slots(&mut Cursor::default())
.count()
);
assert_eq!(
num_restart_heaviest_forks,
crds.get_restart_heaviest_fork(&mut Cursor::default())
.count()
);
for vote in crds.get_votes(&mut Cursor::default()) {
assert_matches!(vote.value.data, CrdsData::Vote(_, _));
}
Expand All @@ -1383,18 +1311,11 @@ mod tests {
_ => panic!("not a restart-last-voted-fork-slot!"),
}
}
for restart_heaviest_fork in crds.get_restart_heaviest_fork(&mut Cursor::default()) {
match restart_heaviest_fork.value.data {
CrdsData::RestartHeaviestFork(_, _, _) => (),
_ => panic!("not a restart-heaviest-fork!"),
}
}
(
num_nodes,
num_votes,
num_epoch_slots,
num_last_voted_fork_slots,
num_heaviest_forks,
)
}

Expand All @@ -1421,13 +1342,8 @@ mod tests {
assert!(crds.table.len() > 200);
assert_eq!(crds.num_purged() + crds.table.len(), 4096);
assert!(num_inserts > crds.table.len());
let (
num_nodes,
num_votes,
num_epoch_slots,
num_restart_last_voted_fork_slots,
num_restart_heaviest_forks,
) = check_crds_value_indices(&mut rng, &crds);
let (num_nodes, num_votes, num_epoch_slots, num_restart_last_voted_fork_slots) =
check_crds_value_indices(&mut rng, &crds);
assert!(num_nodes * 3 < crds.table.len());
assert!(num_nodes > 100, "num nodes: {num_nodes}");
assert!(num_votes > 100, "num votes: {num_votes}");
Expand All @@ -1436,10 +1352,6 @@ mod tests {
num_restart_last_voted_fork_slots > 0,
"num restart last voted fork slots: {num_restart_last_voted_fork_slots}"
);
assert!(
num_restart_heaviest_forks > 0,
"num restart heaviest forks: {num_restart_heaviest_forks}"
);
// Remove values one by one and assert that nodes indices stay valid.
while !crds.table.is_empty() {
let index = rng.gen_range(0..crds.table.len());
Expand Down
Loading