Skip to content

Commit

Permalink
Revert "Gossip PullRequests tend to return a lot of duplicates. (#10326
Browse files Browse the repository at this point in the history
…)" (#10455) (#10557)

automerge
  • Loading branch information
mergify[bot] authored Jun 14, 2020
1 parent d27f24e commit 605f490
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 69 deletions.
14 changes: 1 addition & 13 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ pub struct ClusterInfo {
my_contact_info: RwLock<ContactInfo>,
id: Pubkey,
stats: GossipStats,
start_time: u64,
}

impl Default for ClusterInfo {
Expand Down Expand Up @@ -399,7 +398,6 @@ impl ClusterInfo {
let id = contact_info.id;
let me = Self {
gossip: RwLock::new(CrdsGossip::default()),
start_time: timestamp(),
keypair,
entrypoint: RwLock::new(None),
outbound_budget: RwLock::new(DataBudget {
Expand Down Expand Up @@ -434,7 +432,6 @@ impl ClusterInfo {
my_contact_info: RwLock::new(my_contact_info),
id: *new_id,
stats: GossipStats::default(),
start_time: self.start_time,
}
}

Expand Down Expand Up @@ -1727,18 +1724,9 @@ impl ClusterInfo {
let now = timestamp();
let self_id = me.id();

//skip messages that are likely to be pushed
let min_filter_time = me.start_time + 10 * CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
let push_timer = if min_filter_time < now {
// reason for / 3 is to allow push_self which has a /2 timeout to propagate
// first through push before responding with those values.
Some(now - CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 3)
} else {
None
};
let pull_responses = me
.time_gossip_read_lock("generate_pull_responses", &me.stats.generate_pull_responses)
.generate_pull_responses(&caller_and_filters, push_timer);
.generate_pull_responses(&caller_and_filters);

me.time_gossip_write_lock("process_pull_reqs", &me.stats.process_pull_requests)
.process_pull_requests(caller_and_filters, now);
Expand Down
3 changes: 1 addition & 2 deletions core/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,8 @@ impl CrdsGossip {
pub fn generate_pull_responses(
&self,
filters: &[(CrdsValue, CrdsFilter)],
now: Option<u64>,
) -> Vec<Vec<CrdsValue>> {
self.pull.generate_pull_responses(&self.crds, filters, now)
self.pull.generate_pull_responses(&self.crds, filters)
}

pub fn filter_pull_responses(
Expand Down
57 changes: 4 additions & 53 deletions core/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ impl CrdsFilter {
accum
}
pub fn test_mask(&self, item: &Hash) -> bool {
if self.mask_bits == 0 {
return true;
}
// only consider the highest mask_bits bits from the hash and set the rest to 1.
let ones = (!0u64).checked_shr(self.mask_bits).unwrap_or(!0u64);
let bits = Self::hash_as_u64(item) | ones;
Expand Down Expand Up @@ -238,9 +235,8 @@ impl CrdsGossipPull {
&self,
crds: &Crds,
requests: &[(CrdsValue, CrdsFilter)],
now: Option<u64>,
) -> Vec<Vec<CrdsValue>> {
Self::filter_crds_values(crds, requests, now)
self.filter_crds_values(crds, requests)
}

// Checks if responses should be inserted and
Expand Down Expand Up @@ -363,17 +359,12 @@ impl CrdsGossipPull {
}
/// filter values that fail the bloom filter up to max_bytes
fn filter_crds_values(
&self,
crds: &Crds,
filters: &[(CrdsValue, CrdsFilter)],
now: Option<u64>,
) -> Vec<Vec<CrdsValue>> {
let mut ret = vec![vec![]; filters.len()];
let now = now.unwrap_or(u64::MAX);
for v in crds.table.values() {
//skip messages that are newer than now
if v.insert_timestamp > now {
continue;
}
filters.iter().enumerate().for_each(|(i, (_, filter))| {
if !filter.contains(&v.value_hash) {
ret[i].push(v.value.clone());
Expand Down Expand Up @@ -664,7 +655,7 @@ mod test {
let mut dest = CrdsGossipPull::default();
let (_, filters, caller) = req.unwrap();
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let rsp = dest.generate_pull_responses(&dest_crds, &filters, None);
let rsp = dest.generate_pull_responses(&dest_crds, &filters);
dest.process_pull_requests(&mut dest_crds, filters, 1);
assert!(rsp.iter().all(|rsp| rsp.is_empty()));
assert!(dest_crds.lookup(&caller.label()).is_some());
Expand Down Expand Up @@ -735,7 +726,7 @@ mod test {
);
let (_, filters, caller) = req.unwrap();
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let mut rsp = dest.generate_pull_responses(&dest_crds, &filters, None);
let mut rsp = dest.generate_pull_responses(&dest_crds, &filters);
dest.process_pull_requests(&mut dest_crds, filters, 0);
// if there is a false positive this is empty
// prob should be around 0.1 per iteration
Expand Down Expand Up @@ -777,37 +768,6 @@ mod test {
}
assert!(done);
}
#[test]
fn test_filter_crds_values() {
let mut node_crds = Crds::default();
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));

let caller = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));

let node_label = entry.label();
let node_pubkey = node_label.pubkey();
node_crds.insert(entry, 1).unwrap();
let mut filter = CrdsFilter::new_rand(1, 128);
filter.mask_bits = 0;
let requests = [(caller, filter)];

let rsp = CrdsGossipPull::filter_crds_values(&node_crds, &requests, None);
assert_eq!(rsp[0][0].pubkey(), node_pubkey);

//skip 1 since its newer than 0
let rsp = CrdsGossipPull::filter_crds_values(&node_crds, &requests, Some(0));
assert!(rsp[0].is_empty());

let rsp = CrdsGossipPull::filter_crds_values(&node_crds, &requests, Some(1));
assert_eq!(rsp[0][0].pubkey(), node_pubkey);
}

#[test]
fn test_gossip_purge() {
let mut node_crds = Crds::default();
Expand Down Expand Up @@ -871,15 +831,6 @@ mod test {
let h: Hash = hash(h.as_ref());
assert!(!filter.contains(&h));
}

#[test]
fn test_crds_filter_test_mask_default() {
let filter = CrdsFilter::default();
assert_eq!(filter.mask_bits, 0);
let h: Hash = Hash::default();
assert_eq!(filter.test_mask(&h), true);
}

#[test]
fn test_crds_filter_add_mask() {
let mut filter = CrdsFilter::new_rand(1000, 10);
Expand Down
2 changes: 1 addition & 1 deletion core/tests/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ fn network_run_pull(
let rsp = node
.lock()
.unwrap()
.generate_pull_responses(&filters, None)
.generate_pull_responses(&filters)
.into_iter()
.flatten()
.collect();
Expand Down

0 comments on commit 605f490

Please sign in to comment.