From d4b30adffeeaa715c9126faac1b564515d2bc9df Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 27 Mar 2023 21:52:48 +0000 Subject: [PATCH] reworks gossip crds timeouts (#30468) CrdsGossipPull::make_timeouts iterates over the stakes hashmap and creates a new hashmap which is unnecessary: https://github.com/solana-labs/solana/blob/c032dc275/gossip/src/crds_gossip_pull.rs#L517-L539 The commit instead keeps a reference to the stakes hashmap. --- gossip/benches/crds.rs | 13 ++- gossip/src/cluster_info.rs | 28 +++++-- gossip/src/crds.rs | 148 +++++++++++++++++++++++---------- gossip/src/crds_gossip.rs | 17 ++-- gossip/src/crds_gossip_pull.rs | 89 +++++++++++++------- gossip/tests/crds_gossip.rs | 13 ++- 6 files changed, 209 insertions(+), 99 deletions(-) diff --git a/gossip/benches/crds.rs b/gossip/benches/crds.rs index 9ee78a6aa6118d..176439204768f5 100644 --- a/gossip/benches/crds.rs +++ b/gossip/benches/crds.rs @@ -7,11 +7,11 @@ use { rayon::ThreadPoolBuilder, solana_gossip::{ crds::{Crds, GossipRoute}, - crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, + crds_gossip_pull::{CrdsTimeouts, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, crds_value::CrdsValue, }, solana_sdk::pubkey::Pubkey, - std::collections::HashMap, + std::{collections::HashMap, time::Duration}, test::Bencher, }; @@ -24,8 +24,13 @@ fn bench_find_old_labels(bencher: &mut Bencher) { std::iter::repeat_with(|| (CrdsValue::new_rand(&mut rng, None), rng.gen_range(0, now))) .take(50_000) .for_each(|(v, ts)| assert!(crds.insert(v, ts, GossipRoute::LocalMessage).is_ok())); - let mut timeouts = HashMap::new(); - timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS); + let stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]); + let timeouts = CrdsTimeouts::new( + Pubkey::new_unique(), + CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, // default_timeout + Duration::from_secs(48 * 3600), // epoch_duration + &stakes, + ); bencher.iter(|| { let out = crds.find_old_labels(&thread_pool, now, &timeouts); assert!(out.len() > 10); diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index e8da8ef426cf8c..c024b635ac693d 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -28,7 +28,9 @@ use { crds::{Crds, Cursor, GossipRoute}, crds_gossip::CrdsGossip, crds_gossip_error::CrdsGossipError, - crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, + crds_gossip_pull::{ + CrdsFilter, CrdsTimeouts, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, + }, crds_value::{ self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, IncrementalSnapshotHashes, LowestSlot, NodeInstance, SnapshotHashes, Version, Vote, MAX_WALLCLOCK, @@ -2158,7 +2160,7 @@ impl ClusterInfo { &self, from: &Pubkey, crds_values: Vec, - timeouts: &HashMap, + timeouts: &CrdsTimeouts, ) -> (usize, usize, usize) { let len = crds_values.len(); trace!("PullResponse me: {} from: {} len={}", self.id(), from, len); @@ -3305,9 +3307,13 @@ RPC Enabled Nodes: 1"#; }); let entrypoint_pubkey = solana_sdk::pubkey::new_rand(); let data = test_crds_values(entrypoint_pubkey); - let timeouts = [(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS)] - .into_iter() - .collect(); + let stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]); + let timeouts = CrdsTimeouts::new( + cluster_info.id(), + CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, // default_timeout + Duration::from_secs(48 * 3600), // epoch_duration + &stakes, + ); assert_eq!( (0, 0, 1), ClusterInfo::handle_pull_response( @@ -4081,9 +4087,10 @@ RPC Enabled Nodes: 1"#; let entrypoint_crdsvalue = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(entrypoint.clone())); let cluster_info = Arc::new(cluster_info); + let stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]); let timeouts = cluster_info.gossip.make_timeouts( cluster_info.id(), - &HashMap::default(), // stakes, + &stakes, Duration::from_millis(cluster_info.gossip.pull.crds_timeout), ); ClusterInfo::handle_pull_response( @@ -4729,8 +4736,13 @@ RPC Enabled Nodes: 1"#; }) .take(NO_ENTRIES) .collect(); - let mut timeouts = HashMap::new(); - timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS * 4); + let stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]); + let timeouts = CrdsTimeouts::new( + cluster_info.id(), + CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS * 4, // default_timeout + Duration::from_secs(48 * 3600), // epoch_duration + &stakes, + ); assert_eq!( (0, 0, NO_ENTRIES), cluster_info.handle_pull_response(&entrypoint_pubkey, data, &timeouts) diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index 094e738c893831..64e14f1ff3aeb7 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -27,6 +27,7 @@ use { crate::{ crds_entry::CrdsEntry, + crds_gossip_pull::CrdsTimeouts, crds_shards::CrdsShards, crds_value::{CrdsData, CrdsValue, CrdsValueLabel}, legacy_contact_info::LegacyContactInfo as ContactInfo, @@ -472,15 +473,12 @@ impl Crds { &self, thread_pool: &ThreadPool, now: u64, - timeouts: &HashMap, + timeouts: &CrdsTimeouts, ) -> Vec { - let default_timeout = *timeouts - .get(&Pubkey::default()) - .expect("must have default timeout"); // Given an index of all crd values associated with a pubkey, // returns crds labels of old values to be evicted. let evict = |pubkey, index: &IndexSet| { - let timeout = timeouts.get(pubkey).copied().unwrap_or(default_timeout); + let timeout = timeouts[pubkey]; // If the origin's contact-info hasn't expired yet then preserve // all associated values. let origin = CrdsValueLabel::LegacyContactInfo(*pubkey); @@ -732,7 +730,7 @@ mod tests { signature::{Keypair, Signer}, timing::timestamp, }, - std::{collections::HashSet, iter::repeat_with, net::Ipv4Addr}, + std::{collections::HashSet, iter::repeat_with, net::Ipv4Addr, time::Duration}, }; #[test] @@ -888,17 +886,34 @@ mod tests { crds.insert(val.clone(), 1, GossipRoute::LocalMessage), Ok(()) ); - let mut set = HashMap::new(); - set.insert(Pubkey::default(), 0); - assert!(crds.find_old_labels(&thread_pool, 0, &set).is_empty()); - set.insert(Pubkey::default(), 1); + let pubkey = Pubkey::new_unique(); + let stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]); + let epoch_duration = Duration::from_secs(48 * 3600); + let timeouts = CrdsTimeouts::new( + pubkey, + 0u64, // default_timeout, + epoch_duration, + &stakes, + ); + assert!(crds.find_old_labels(&thread_pool, 0, &timeouts).is_empty()); + let timeouts = CrdsTimeouts::new( + pubkey, + 1u64, // default_timeout, + epoch_duration, + &stakes, + ); assert_eq!( - crds.find_old_labels(&thread_pool, 2, &set), + crds.find_old_labels(&thread_pool, 2, &timeouts), vec![val.label()] ); - set.insert(Pubkey::default(), 2); + let timeouts = CrdsTimeouts::new( + pubkey, + 2u64, // default_timeout, + epoch_duration, + &stakes, + ); assert_eq!( - crds.find_old_labels(&thread_pool, 4, &set), + crds.find_old_labels(&thread_pool, 4, &timeouts), vec![val.label()] ); } @@ -907,24 +922,51 @@ mod tests { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut rng = thread_rng(); let mut crds = Crds::default(); - let mut timeouts = HashMap::new(); let val = CrdsValue::new_rand(&mut rng, None); - timeouts.insert(Pubkey::default(), 3); + let mut stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]); + let timeouts = CrdsTimeouts::new( + Pubkey::new_unique(), + 3, // default_timeout + Duration::from_secs(48 * 3600), // epoch_duration + &stakes, + ); assert_eq!( crds.insert(val.clone(), 0, GossipRoute::LocalMessage), Ok(()) ); assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty()); - timeouts.insert(val.pubkey(), 1); + stakes.insert(val.pubkey(), 1u64); + let timeouts = CrdsTimeouts::new( + Pubkey::new_unique(), + 1, // default_timeout + Duration::from_millis(1), // epoch_duration + &stakes, + ); assert_eq!( crds.find_old_labels(&thread_pool, 2, &timeouts), vec![val.label()] ); - timeouts.insert(val.pubkey(), u64::MAX); + let timeouts = CrdsTimeouts::new( + Pubkey::new_unique(), + 3, // default_timeout + Duration::from_secs(48 * 3600), // epoch_duration + &stakes, + ); assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty()); - timeouts.insert(Pubkey::default(), 1); + let timeouts = CrdsTimeouts::new( + Pubkey::new_unique(), + 1, // default_timeout + Duration::from_secs(48 * 3600), // epoch_duration + &stakes, + ); assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty()); - timeouts.remove(&val.pubkey()); + stakes.remove(&val.pubkey()); + let timeouts = CrdsTimeouts::new( + Pubkey::new_unique(), + 1, // default_timeout + Duration::from_secs(48 * 3600), // epoch_duration + &stakes, + ); assert_eq!( crds.find_old_labels(&thread_pool, 2, &timeouts), vec![val.label()] @@ -940,14 +982,19 @@ mod tests { crds.insert(val.clone(), 1, GossipRoute::LocalMessage), Ok(_) ); - let mut set = HashMap::new(); - set.insert(Pubkey::default(), 1); + let stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]); + let timeouts = CrdsTimeouts::new( + Pubkey::new_unique(), + 1, // default_timeout + Duration::from_secs(48 * 3600), // epoch_duration + &stakes, + ); assert_eq!( - crds.find_old_labels(&thread_pool, 2, &set), + crds.find_old_labels(&thread_pool, 2, &timeouts), vec![val.label()] ); crds.remove(&val.label(), /*now=*/ 0); - assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty()); + assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty()); } #[test] fn test_find_old_records_staked() { @@ -961,28 +1008,35 @@ mod tests { crds.insert(val.clone(), 1, GossipRoute::LocalMessage), Ok(()) ); - let mut set = HashMap::new(); + let mut stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]); + let timeouts = CrdsTimeouts::new( + Pubkey::new_unique(), + 0, // default_timeout + Duration::from_secs(48 * 3600), // epoch_duration + &stakes, + ); //now < timestamp - set.insert(Pubkey::default(), 0); - set.insert(val.pubkey(), 0); - assert!(crds.find_old_labels(&thread_pool, 0, &set).is_empty()); + assert!(crds.find_old_labels(&thread_pool, 0, &timeouts).is_empty()); //pubkey shouldn't expire since its timeout is MAX - set.insert(val.pubkey(), std::u64::MAX); - assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty()); - - //default has max timeout, but pubkey should still expire - set.insert(Pubkey::default(), std::u64::MAX); - set.insert(val.pubkey(), 1); - assert_eq!( - crds.find_old_labels(&thread_pool, 2, &set), - vec![val.label()] + stakes.insert(val.pubkey(), 1u64); + let timeouts = CrdsTimeouts::new( + Pubkey::new_unique(), + 0, // default_timeout + Duration::from_secs(48 * 3600), // epoch_duration + &stakes, ); + assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty()); - set.insert(val.pubkey(), 2); - assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty()); + let timeouts = CrdsTimeouts::new( + Pubkey::new_unique(), + 0, // default_timeout + Duration::from_millis(2), // epoch_duration + &stakes, + ); + assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty()); assert_eq!( - crds.find_old_labels(&thread_pool, 3, &set), + crds.find_old_labels(&thread_pool, 3, &timeouts), vec![val.label()] ); } @@ -1353,17 +1407,19 @@ mod tests { crds.insert(val.clone(), 1, GossipRoute::LocalMessage), Ok(_) ); - let mut set = HashMap::new(); - - //default has max timeout, but pubkey should still expire - set.insert(Pubkey::default(), std::u64::MAX); - set.insert(val.pubkey(), 1); + let stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]); + let timeouts = CrdsTimeouts::new( + Pubkey::new_unique(), + 1, // default_timeout + Duration::from_millis(1), // epoch_duration + &stakes, + ); assert_eq!( - crds.find_old_labels(&thread_pool, 2, &set), + crds.find_old_labels(&thread_pool, 2, &timeouts), vec![val.label()] ); crds.remove(&val.label(), /*now=*/ 0); - assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty()); + assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty()); } #[test] diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index bd1f2064ffd09a..ee0edb1b6e9674 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -10,7 +10,7 @@ use { cluster_info_metrics::GossipStats, crds::{Crds, GossipRoute}, crds_gossip_error::CrdsGossipError, - crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats}, + crds_gossip_pull::{CrdsFilter, CrdsGossipPull, CrdsTimeouts, ProcessPullStats}, crds_gossip_push::CrdsGossipPush, crds_value::{CrdsData, CrdsValue}, duplicate_shred::{self, DuplicateShredIndex, MAX_DUPLICATE_SHREDS}, @@ -258,7 +258,7 @@ impl CrdsGossip { pub fn filter_pull_responses( &self, - timeouts: &HashMap, + timeouts: &CrdsTimeouts, response: Vec, now: u64, process_pull_stats: &mut ProcessPullStats, @@ -292,12 +292,12 @@ impl CrdsGossip { ); } - pub fn make_timeouts( + pub fn make_timeouts<'a>( &self, self_pubkey: Pubkey, - stakes: &HashMap, + stakes: &'a HashMap, epoch_duration: Duration, - ) -> HashMap { + ) -> CrdsTimeouts<'a> { self.pull.make_timeouts(self_pubkey, stakes, epoch_duration) } @@ -306,13 +306,12 @@ impl CrdsGossip { self_pubkey: &Pubkey, thread_pool: &ThreadPool, now: u64, - timeouts: &HashMap, + timeouts: &CrdsTimeouts, ) -> usize { let mut rv = 0; if now > self.pull.crds_timeout { - //sanity check - assert_eq!(timeouts[self_pubkey], std::u64::MAX); - assert!(timeouts.contains_key(&Pubkey::default())); + debug_assert_eq!(timeouts[self_pubkey], u64::MAX); + debug_assert_ne!(timeouts[&Pubkey::default()], 0u64); rv = CrdsGossipPull::purge_active(thread_pool, &self.crds, now, timeouts); } self.crds diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index b68ee7d1ccc921..b7be36282da148 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -41,6 +41,7 @@ use { convert::TryInto, iter::{repeat, repeat_with}, net::SocketAddr, + ops::Index, sync::{ atomic::{AtomicI64, AtomicUsize, Ordering}, Mutex, RwLock, @@ -316,19 +317,18 @@ impl CrdsGossipPull { pub(crate) fn filter_pull_responses( &self, crds: &RwLock, - timeouts: &HashMap, + timeouts: &CrdsTimeouts, responses: Vec, now: u64, stats: &mut ProcessPullStats, ) -> (Vec, Vec, Vec) { let mut active_values = vec![]; let mut expired_values = vec![]; - let default_timeout = timeouts[&Pubkey::default()]; let crds = crds.read().unwrap(); let upsert = |response: CrdsValue| { let owner = response.label().pubkey(); // Check if the crds value is older than the msg_timeout - let timeout = timeouts.get(&owner).copied().unwrap_or(default_timeout); + let timeout = timeouts[&owner]; // Before discarding this value, check if a ContactInfo for the // owner exists in the table. If it doesn't, that implies that this // value can be discarded @@ -514,28 +514,13 @@ impl CrdsGossipPull { ret } - pub(crate) fn make_timeouts( + pub(crate) fn make_timeouts<'a>( &self, self_pubkey: Pubkey, - stakes: &HashMap, + stakes: &'a HashMap, epoch_duration: Duration, - ) -> HashMap { - let extended_timeout = self.crds_timeout.max(epoch_duration.as_millis() as u64); - let default_timeout = if stakes.values().all(|stake| *stake == 0) { - extended_timeout - } else { - self.crds_timeout - }; - stakes - .iter() - .filter(|(_, &stake)| stake > 0u64) - .map(|(&pubkey, _)| pubkey) - .zip(repeat(extended_timeout)) - .chain([ - (Pubkey::default(), default_timeout), - (self_pubkey, u64::MAX), - ]) - .collect() + ) -> CrdsTimeouts<'a> { + CrdsTimeouts::new(self_pubkey, self.crds_timeout, epoch_duration, stakes) } /// Purge values from the crds that are older then `active_timeout` @@ -543,7 +528,7 @@ impl CrdsGossipPull { thread_pool: &ThreadPool, crds: &RwLock, now: u64, - timeouts: &HashMap, + timeouts: &CrdsTimeouts, ) -> usize { let mut crds = crds.write().unwrap(); let labels = crds.find_old_labels(thread_pool, now, timeouts); @@ -559,7 +544,7 @@ impl CrdsGossipPull { &self, crds: &RwLock, from: &Pubkey, - timeouts: &HashMap, + timeouts: &CrdsTimeouts, response: Vec, now: u64, ) -> (usize, usize, usize) { @@ -583,6 +568,49 @@ impl CrdsGossipPull { } } +pub struct CrdsTimeouts<'a> { + pubkey: Pubkey, + stakes: &'a HashMap, + default_timeout: u64, + extended_timeout: u64, +} + +impl<'a> CrdsTimeouts<'a> { + pub fn new( + pubkey: Pubkey, + default_timeout: u64, + epoch_duration: Duration, + stakes: &'a HashMap, + ) -> Self { + let extended_timeout = default_timeout.max(epoch_duration.as_millis() as u64); + let default_timeout = if stakes.values().all(|&stake| stake == 0u64) { + extended_timeout + } else { + default_timeout + }; + Self { + pubkey, + stakes, + default_timeout, + extended_timeout, + } + } +} + +impl<'a> Index<&Pubkey> for CrdsTimeouts<'a> { + type Output = u64; + + fn index(&self, pubkey: &Pubkey) -> &Self::Output { + if pubkey == &self.pubkey { + &u64::MAX + } else if self.stakes.get(pubkey) > Some(&0u64) { + &self.extended_timeout + } else { + &self.default_timeout + } + } +} + #[cfg(test)] pub(crate) mod tests { use { @@ -1218,7 +1246,8 @@ pub(crate) mod tests { ); // purge let node_crds = RwLock::new(node_crds); - let timeouts = node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default()); + let stakes = HashMap::from([(Pubkey::new_unique(), 1u64)]); + let timeouts = node.make_timeouts(node_pubkey, &stakes, Duration::default()); CrdsGossipPull::purge_active(&thread_pool, &node_crds, node.crds_timeout, &timeouts); //verify self is still valid after purge @@ -1333,9 +1362,13 @@ pub(crate) mod tests { let peer_entry = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo( ContactInfo::new_localhost(&peer_pubkey, 0), )); - let mut timeouts = HashMap::new(); - timeouts.insert(Pubkey::default(), node.crds_timeout); - timeouts.insert(peer_pubkey, node.crds_timeout + 1); + let stakes = HashMap::from([(peer_pubkey, 1u64)]); + let timeouts = CrdsTimeouts::new( + Pubkey::new_unique(), + node.crds_timeout, // default_timeout + Duration::from_millis(node.crds_timeout + 1), + &stakes, + ); // inserting a fresh value should be fine. assert_eq!( node.process_pull_response( diff --git a/gossip/tests/crds_gossip.rs b/gossip/tests/crds_gossip.rs index 326713185a60fc..c77c94c598f6f4 100644 --- a/gossip/tests/crds_gossip.rs +++ b/gossip/tests/crds_gossip.rs @@ -10,7 +10,7 @@ use { crds::GossipRoute, crds_gossip::*, crds_gossip_error::CrdsGossipError, - crds_gossip_pull::{ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, + crds_gossip_pull::{CrdsTimeouts, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, crds_value::{CrdsData, CrdsValue, CrdsValueLabel}, legacy_contact_info::LegacyContactInfo as ContactInfo, @@ -345,7 +345,7 @@ fn network_run_push( let node_pubkey = node.keypair.pubkey(); let timeouts = node.gossip.make_timeouts( node_pubkey, - &HashMap::default(), // stakes + &stakes, Duration::from_millis(node.gossip.pull.crds_timeout), ); node.gossip.purge(&node_pubkey, thread_pool, now, &timeouts); @@ -477,8 +477,7 @@ fn network_run_pull( let mut convergance = 0f64; let num = network.len(); let network_values: Vec = network.values().cloned().collect(); - let mut timeouts = HashMap::new(); - timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS); + let stakes = stakes(network); for node in &network_values { let mut ping_cache = node.ping_cache.lock().unwrap(); for other in &network_values { @@ -566,6 +565,12 @@ fn network_run_pull( msgs += rsp.len(); if let Some(node) = network.get(&from) { let mut stats = ProcessPullStats::default(); + let timeouts = CrdsTimeouts::new( + node.keypair.pubkey(), + CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, // default_timeout + Duration::from_secs(48 * 3600), // epoch_duration + &stakes, + ); let (vers, vers_expired_timeout, failed_inserts) = node .gossip .filter_pull_responses(&timeouts, rsp, now, &mut stats);