Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Add RestartLastVotedForkSlots for wen_restart. #33239

Merged
merged 20 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ce81bbc
Add RestartLastVotedForkSlots and RestartHeaviestFork for wen_restart.
wen-coding Sep 13, 2023
0edf40e
Fix linter errors.
wen-coding Sep 13, 2023
1ec981b
Merge branch 'master' into wen_restart_gossip_change
wen-coding Sep 13, 2023
a3749b2
Revert RestartHeaviestFork, it will be added in another PR.
wen-coding Sep 13, 2023
79bedf8
Merge branch 'master' into wen_restart_gossip_change
wen-coding Sep 13, 2023
719534d
Merge branch 'master' into wen_restart_gossip_change
wen-coding Sep 14, 2023
25e0117
Update frozen abi message.
wen-coding Sep 15, 2023
1fcc81d
Fix wrong number in test generation, change to pub(crate) to limit sc…
wen-coding Sep 15, 2023
b7077a7
Separate push_epoch_slots and push_restart_last_voted_fork_slots.
wen-coding Sep 15, 2023
354673a
Add RestartLastVotedForkSlots data structure.
wen-coding Sep 17, 2023
0028f64
Remove unused parts to make PR smaller.
wen-coding Sep 18, 2023
46d2054
Remove unused clone.
wen-coding Sep 18, 2023
cc11d42
Use CompressedSlotsVec to share code between EpochSlots and RestartLa…
wen-coding Sep 18, 2023
b0cead0
Merge branch 'master' into wen_restart_gossip_change
wen-coding Sep 18, 2023
5b7a724
Add total_messages to show how many messages are there.
wen-coding Oct 2, 2023
4821d8a
Merge branch 'master' into wen_restart_gossip_change
wen-coding Oct 2, 2023
ae8d01d
Reduce RestartLastVotedForkSlots to one packet (16k slots).
wen-coding Oct 2, 2023
1ee2699
Replace last_vote_slot with shred_version, revert CompressedSlotsVec.
wen-coding Oct 6, 2023
78a9ded
Merge branch 'master' into wen_restart_gossip_change
wen-coding Oct 9, 2023
866228b
Merge branch 'master' into wen_restart_gossip_change
wen-coding Oct 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove unused parts to make PR smaller.
  • Loading branch information
wen-coding committed Sep 18, 2023
commit 0028f648365488cc260eb58a638387cd6b1223e1
127 changes: 2 additions & 125 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use {
},
crds_value::{
self, AccountsHashes, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex,
LegacySnapshotHashes, LowestSlot, NodeInstance, RestartLastVotedForkSlots,
SnapshotHashes, Version, Vote, MAX_WALLCLOCK,
LegacySnapshotHashes, LowestSlot, NodeInstance, SnapshotHashes, Version, Vote,
MAX_WALLCLOCK,
},
duplicate_shred::DuplicateShred,
epoch_slots::EpochSlots,
Expand Down Expand Up @@ -969,44 +969,6 @@ impl ClusterInfo {
}
}

// This is currently only called from one thread during startup, so no race conditions.
// The data format is similar to EpochSlots, but we overwrite previous results each time,
// so here we always start from 0. All pieces of the same push use the same timestamp.
pub fn push_restart_last_voted_fork_slots(
&self,
mut update: &[Slot],
last_vote_bankhash: Hash,
) {
let self_pubkey = self.id();
let last_vote_slot = *update.last().unwrap();
let mut epoch_slot_index = 0;
let mut entries = Vec::default();
let keypair = self.keypair();
let now = timestamp();
while !update.is_empty() {
let mut slots = RestartLastVotedForkSlots::new(
self_pubkey,
now,
last_vote_slot,
last_vote_bankhash,
);
let n = slots.fill(update);
update = &update[n..];
if n > 0 {
let data = CrdsData::RestartLastVotedForkSlots(epoch_slot_index, slots);
let entry = CrdsValue::new_signed(data, &keypair);
entries.push(entry);
}
epoch_slot_index += 1;
}
let mut gossip_crds = self.gossip.crds.write().unwrap();
for entry in entries {
if let Err(err) = gossip_crds.insert(entry, now, GossipRoute::LocalMessage) {
error!("push_restart_last_voted_fork_slots failed: {:?}", err);
}
}
}

fn time_gossip_read_lock<'a>(
&'a self,
label: &'static str,
Expand Down Expand Up @@ -1282,27 +1244,6 @@ impl ClusterInfo {
.collect()
}

/// Returns last-voted-fork-slots inserted since the given cursor.
/// Excludes entries from nodes with unkown or different shred version.
pub fn get_restart_last_voted_fork_slots(
&self,
cursor: &mut Cursor,
) -> Vec<RestartLastVotedForkSlots> {
let self_shred_version = Some(self.my_shred_version());
let gossip_crds = self.gossip.crds.read().unwrap();
gossip_crds
.get_restart_last_voted_fork_slots(cursor)
.filter(|entry| {
let origin = entry.value.pubkey();
gossip_crds.get_shred_version(&origin) == self_shred_version
})
.map(|entry| match &entry.value.data {
CrdsData::RestartLastVotedForkSlots(_index, slots) => slots.clone(),
_ => panic!("this should not happen!"),
})
.collect()
}

/// Returns duplicate-shreds inserted since the given cursor.
pub(crate) fn get_duplicate_shreds(&self, cursor: &mut Cursor) -> Vec<DuplicateShred> {
let gossip_crds = self.gossip.crds.read().unwrap();
Expand Down Expand Up @@ -4063,70 +4004,6 @@ mod tests {
assert_eq!(slots[1].from, node_pubkey);
}

#[test]
fn test_push_restart_last_voted_fork_slots() {
solana_logger::setup();
let keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0);
let cluster_info = ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified);
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default());
assert!(slots.is_empty());
let mut update: Vec<Slot> = vec![0];
for i in 0..81 {
for j in 0..1000 {
update.push(i * 1050 + j);
}
}
cluster_info.push_restart_last_voted_fork_slots(&update, Hash::default());

let mut cursor = Cursor::default();
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut cursor);
assert_eq!(slots.len(), 2);
assert_eq!(slots[0].slots.to_slots(0).len(), 42468);
assert_eq!(slots[1].slots.to_slots(0).len(), 38532);

let slots = cluster_info.get_restart_last_voted_fork_slots(&mut cursor);
assert!(slots.is_empty());

// Test with different shred versions.
let mut rng = rand::thread_rng();
let node_pubkey = Pubkey::new_unique();
let mut node = LegacyContactInfo::new_rand(&mut rng, Some(node_pubkey));
node.set_shred_version(42);
let slots = RestartLastVotedForkSlots::new_rand(&mut rng, Some(node_pubkey));
let entries = vec![
CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(node)),
CrdsValue::new_unsigned(CrdsData::RestartLastVotedForkSlots(0, slots)),
];
{
let mut gossip_crds = cluster_info.gossip.crds.write().unwrap();
for entry in entries {
assert!(gossip_crds
.insert(entry, /*now=*/ 0, GossipRoute::LocalMessage)
.is_ok());
}
}
// Should exclude other node's last-voted-fork-slot because of different
// shred-version.
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default());
assert_eq!(slots.len(), 2);
assert_eq!(slots[0].slots.from, cluster_info.id());
assert_eq!(slots[1].slots.from, cluster_info.id());
// Match shred versions.
{
let mut node = cluster_info.my_contact_info.write().unwrap();
node.set_shred_version(42);
}
cluster_info.push_self();
cluster_info.flush_push_queue();
// Should now include both epoch slots.
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default());
assert_eq!(slots.len(), 3);
assert_eq!(slots[0].slots.from, cluster_info.id());
assert_eq!(slots[1].slots.from, cluster_info.id());
assert_eq!(slots[2].slots.from, node_pubkey);
}

#[test]
fn test_append_entrypoint_to_pulls() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
Expand Down
95 changes: 2 additions & 93 deletions gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ pub struct Crds {
epoch_slots: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Indices of DuplicateShred keyed by insert order.
duplicate_shreds: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Indices of RestartLastVotedForkSlots keyed by insert order.
restart_last_voted_fork_slots: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Indices of all crds values associated with a node.
records: HashMap<Pubkey, IndexSet<usize>>,
// Indices of all entries keyed by insert order.
Expand Down Expand Up @@ -171,7 +169,6 @@ impl Default for Crds {
votes: BTreeMap::default(),
epoch_slots: BTreeMap::default(),
duplicate_shreds: BTreeMap::default(),
restart_last_voted_fork_slots: BTreeMap::default(),
records: HashMap::default(),
entries: BTreeMap::default(),
purged: VecDeque::default(),
Expand Down Expand Up @@ -245,10 +242,6 @@ impl Crds {
CrdsData::DuplicateShred(_, _) => {
self.duplicate_shreds.insert(value.ordinal, entry_index);
}
CrdsData::RestartLastVotedForkSlots(_, _) => {
self.restart_last_voted_fork_slots
.insert(value.ordinal, entry_index);
}
_ => (),
};
self.entries.insert(value.ordinal, entry_index);
Expand Down Expand Up @@ -284,12 +277,6 @@ impl Crds {
self.duplicate_shreds.remove(&entry.get().ordinal);
self.duplicate_shreds.insert(value.ordinal, entry_index);
}
CrdsData::RestartLastVotedForkSlots(_, _) => {
self.restart_last_voted_fork_slots
.remove(&entry.get().ordinal);
self.restart_last_voted_fork_slots
.insert(value.ordinal, entry_index);
}
_ => (),
}
self.entries.remove(&entry.get().ordinal);
Expand Down Expand Up @@ -390,21 +377,6 @@ impl Crds {
})
}

/// Returns last_voted_fork_slots inserted since the given cursor.
/// Updates the cursor as the values are consumed.
pub(crate) fn get_restart_last_voted_fork_slots<'a>(
&'a self,
cursor: &'a mut Cursor,
) -> impl Iterator<Item = &'a VersionedCrdsValue> {
let range = (Bound::Included(cursor.ordinal()), Bound::Unbounded);
self.restart_last_voted_fork_slots
.range(range)
.map(move |(ordinal, index)| {
cursor.consume(*ordinal);
self.table.index(*index)
})
}

/// Returns all entries inserted since the given cursor.
pub(crate) fn get_entries<'a>(
&'a self,
Expand Down Expand Up @@ -572,9 +544,6 @@ impl Crds {
CrdsData::DuplicateShred(_, _) => {
self.duplicate_shreds.remove(&value.ordinal);
}
CrdsData::RestartLastVotedForkSlots(_, _) => {
self.restart_last_voted_fork_slots.remove(&value.ordinal);
}
_ => (),
}
self.entries.remove(&value.ordinal);
Expand Down Expand Up @@ -612,10 +581,6 @@ impl Crds {
CrdsData::DuplicateShred(_, _) => {
self.duplicate_shreds.insert(value.ordinal, index);
}
CrdsData::RestartLastVotedForkSlots(_, _) => {
self.restart_last_voted_fork_slots
.insert(value.ordinal, index);
}
_ => (),
};
self.entries.insert(value.ordinal, index);
Expand Down Expand Up @@ -1154,7 +1119,6 @@ mod tests {
usize, // number of nodes
usize, // number of votes
usize, // number of epoch slots
usize, // number of restart last voted fork slots
) {
let size = crds.table.len();
let since = if size == 0 || rng.gen() {
Expand Down Expand Up @@ -1183,33 +1147,6 @@ mod tests {
assert!(value.ordinal >= since);
assert_matches!(value.value.data, CrdsData::EpochSlots(_, _));
}
let num_last_voted_fork_slots = crds
.table
.values()
.filter(|v| v.ordinal >= since)
.filter(|v| matches!(v.value.data, CrdsData::RestartLastVotedForkSlots(_, _)))
.count();
let mut cursor = Cursor(since);
assert_eq!(
num_last_voted_fork_slots,
crds.get_restart_last_voted_fork_slots(&mut cursor).count()
);
assert_eq!(
cursor.0,
crds.restart_last_voted_fork_slots
.iter()
.last()
.map(|(k, _)| k + 1)
.unwrap_or_default()
.max(since)
);
for value in crds.get_restart_last_voted_fork_slots(&mut Cursor(since)) {
assert!(value.ordinal >= since);
match value.value.data {
CrdsData::RestartLastVotedForkSlots(_, _) => (),
_ => panic!("not a last-voted-fork-slot!"),
}
}
let num_votes = crds
.table
.values()
Expand Down Expand Up @@ -1267,11 +1204,6 @@ mod tests {
.values()
.filter(|v| matches!(v.value.data, CrdsData::EpochSlots(_, _)))
.count();
let num_restart_last_voted_fork_slots = crds
.table
.values()
.filter(|v| matches!(v.value.data, CrdsData::RestartLastVotedForkSlots(_, _)))
.count();
assert_eq!(
crds.table.len(),
crds.get_entries(&mut Cursor::default()).count()
Expand All @@ -1282,31 +1214,13 @@ mod tests {
num_epoch_slots,
crds.get_epoch_slots(&mut Cursor::default()).count()
);
assert_eq!(
num_restart_last_voted_fork_slots,
crds.get_restart_last_voted_fork_slots(&mut Cursor::default())
.count()
);
for vote in crds.get_votes(&mut Cursor::default()) {
assert_matches!(vote.value.data, CrdsData::Vote(_, _));
}
for epoch_slots in crds.get_epoch_slots(&mut Cursor::default()) {
assert_matches!(epoch_slots.value.data, CrdsData::EpochSlots(_, _));
}
for restart_last_voted_fork_slots in
crds.get_restart_last_voted_fork_slots(&mut Cursor::default())
{
match restart_last_voted_fork_slots.value.data {
CrdsData::RestartLastVotedForkSlots(_, _) => (),
_ => panic!("not a restart-last-voted-fork-slot!"),
}
}
(
num_nodes,
num_votes,
num_epoch_slots,
num_last_voted_fork_slots,
)
(num_nodes, num_votes, num_epoch_slots)
}

#[test]
Expand All @@ -1332,16 +1246,11 @@ mod tests {
assert!(crds.table.len() > 200);
assert_eq!(crds.num_purged() + crds.table.len(), 4096);
assert!(num_inserts > crds.table.len());
let (num_nodes, num_votes, num_epoch_slots, num_restart_last_voted_fork_slots) =
check_crds_value_indices(&mut rng, &crds);
let (num_nodes, num_votes, num_epoch_slots) = check_crds_value_indices(&mut rng, &crds);
assert!(num_nodes * 3 < crds.table.len());
assert!(num_nodes > 100, "num nodes: {num_nodes}");
assert!(num_votes > 100, "num votes: {num_votes}");
assert!(num_epoch_slots > 100, "num epoch slots: {num_epoch_slots}");
assert!(
num_restart_last_voted_fork_slots > 0,
"num restart last voted fork slots: {num_restart_last_voted_fork_slots}"
);
// Remove values one by one and assert that nodes indices stay valid.
while !crds.table.is_empty() {
let index = rng.gen_range(0..crds.table.len());
Expand Down
Loading