Skip to content

Commit

Permalink
Skip gossip requests with different shred version and split lock (#10240
Browse files Browse the repository at this point in the history
)
  • Loading branch information
sakridge authored May 28, 2020
1 parent 9227874 commit 3f508b3
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 60 deletions.
254 changes: 224 additions & 30 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ struct GossipStats {
process_pull_response_count: Counter,
process_pull_response_len: Counter,
process_pull_response_timeout: Counter,
process_pull_response_fail: Counter,
process_pull_response_success: Counter,
process_pull_requests: Counter,
generate_pull_responses: Counter,
process_prune: Counter,
process_push_message: Counter,
prune_received_cache: Counter,
Expand All @@ -228,6 +231,11 @@ struct GossipStats {
push_message: Counter,
new_pull_requests: Counter,
mark_pull_request: Counter,
skip_pull_response_shred_version: Counter,
skip_pull_shred_version: Counter,
skip_push_message_shred_version: Counter,
push_message_count: Counter,
push_message_value_count: Counter,
}

pub struct ClusterInfo {
Expand Down Expand Up @@ -1526,12 +1534,17 @@ impl ClusterInfo {
if contact_info.id == me.id() {
warn!("PullRequest ignored, I'm talking to myself");
inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
} else {
} else if contact_info.shred_version == 0
|| contact_info.shred_version == me.my_shred_version()
|| me.my_shred_version() == 0
{
gossip_pull_data.push(PullData {
from_addr,
caller,
filter,
});
} else {
me.stats.skip_pull_shred_version.add_relaxed(1);
}
}
datapoint_debug!(
Expand Down Expand Up @@ -1620,6 +1633,26 @@ impl ClusterInfo {
}
}

fn update_data_budget(&self, stakes: &HashMap<Pubkey, u64>) {
let mut w_outbound_budget = self.outbound_budget.write().unwrap();

let now = timestamp();
const INTERVAL_MS: u64 = 100;
// allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s
const BYTES_PER_INTERVAL: usize = 5000;
const MAX_BUDGET_MULTIPLE: usize = 5; // allow budget build-up to 5x the interval default

if now - w_outbound_budget.last_timestamp_ms > INTERVAL_MS {
let len = std::cmp::max(stakes.len(), 2);
w_outbound_budget.bytes += len * BYTES_PER_INTERVAL;
w_outbound_budget.bytes = std::cmp::min(
w_outbound_budget.bytes,
MAX_BUDGET_MULTIPLE * len * BYTES_PER_INTERVAL,
);
w_outbound_budget.last_timestamp_ms = now;
}
}

// Pull requests take an incoming bloom filter of contained entries from a node
// and tries to send back to them the values it detects are missing.
fn handle_pull_requests(
Expand All @@ -1632,33 +1665,19 @@ impl ClusterInfo {
let mut caller_and_filters = vec![];
let mut addrs = vec![];
let mut time = Measure::start("handle_pull_requests");
{
let mut w_outbound_budget = me.outbound_budget.write().unwrap();

let now = timestamp();
const INTERVAL_MS: u64 = 100;
// allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s
const BYTES_PER_INTERVAL: usize = 5000;
const MAX_BUDGET_MULTIPLE: usize = 5; // allow budget build-up to 5x the interval default

if now - w_outbound_budget.last_timestamp_ms > INTERVAL_MS {
let len = std::cmp::max(stakes.len(), 2);
w_outbound_budget.bytes += len * BYTES_PER_INTERVAL;
w_outbound_budget.bytes = std::cmp::min(
w_outbound_budget.bytes,
MAX_BUDGET_MULTIPLE * len * BYTES_PER_INTERVAL,
);
w_outbound_budget.last_timestamp_ms = now;
}
}
me.update_data_budget(stakes);
for pull_data in requests {
caller_and_filters.push((pull_data.caller, pull_data.filter));
addrs.push(pull_data.from_addr);
}
let now = timestamp();
let self_id = me.id();

let pull_responses = me
.time_gossip_write_lock("process_pull_reqs", &me.stats.process_pull_requests)
.time_gossip_read_lock("generate_pull_responses", &me.stats.generate_pull_responses)
.generate_pull_responses(&caller_and_filters);

me.time_gossip_write_lock("process_pull_reqs", &me.stats.process_pull_requests)
.process_pull_requests(caller_and_filters, now);

// Filter bad to addresses
Expand Down Expand Up @@ -1755,37 +1774,94 @@ impl ClusterInfo {
Some(packets)
}

// Returns (failed, timeout, success)
fn handle_pull_response(
me: &Self,
from: &Pubkey,
data: Vec<CrdsValue>,
mut crds_values: Vec<CrdsValue>,
timeouts: &HashMap<Pubkey, u64>,
) {
let len = data.len();
) -> (usize, usize, usize) {
let len = crds_values.len();
trace!("PullResponse me: {} from: {} len={}", me.id, from, len);
let (_fail, timeout_count) = me

if let Some(shred_version) = me.lookup_contact_info(from, |ci| ci.shred_version) {
Self::filter_by_shred_version(
from,
&mut crds_values,
shred_version,
me.my_shred_version(),
);
}
let filtered_len = crds_values.len();

let (fail, timeout_count, success) = me
.time_gossip_write_lock("process_pull", &me.stats.process_pull_response)
.process_pull_response(from, timeouts, data, timestamp());
.process_pull_response(from, timeouts, crds_values, timestamp());

me.stats
.skip_pull_response_shred_version
.add_relaxed((len - filtered_len) as u64);
me.stats.process_pull_response_count.add_relaxed(1);
me.stats.process_pull_response_len.add_relaxed(len as u64);
me.stats
.process_pull_response_len
.add_relaxed(filtered_len as u64);
me.stats
.process_pull_response_timeout
.add_relaxed(timeout_count as u64);
me.stats.process_pull_response_fail.add_relaxed(fail as u64);
me.stats
.process_pull_response_success
.add_relaxed(success as u64);

(fail, timeout_count, success)
}

fn filter_by_shred_version(
from: &Pubkey,
crds_values: &mut Vec<CrdsValue>,
shred_version: u16,
my_shred_version: u16,
) {
if my_shred_version != 0 && shred_version != 0 && shred_version != my_shred_version {
// Allow someone to update their own ContactInfo so they
// can change shred versions if needed.
crds_values.retain(|crds_value| match &crds_value.data {
CrdsData::ContactInfo(contact_info) => contact_info.id == *from,
_ => false,
});
}
}

fn handle_push_message(
me: &Self,
recycler: &PacketsRecycler,
from: &Pubkey,
data: Vec<CrdsValue>,
mut crds_values: Vec<CrdsValue>,
stakes: &HashMap<Pubkey, u64>,
) -> Option<Packets> {
let self_id = me.id();
inc_new_counter_debug!("cluster_info-push_message", 1);
me.stats.push_message_count.add_relaxed(1);
let len = crds_values.len();

if let Some(shred_version) = me.lookup_contact_info(from, |ci| ci.shred_version) {
Self::filter_by_shred_version(
from,
&mut crds_values,
shred_version,
me.my_shred_version(),
);
}
let filtered_len = crds_values.len();
me.stats
.push_message_value_count
.add_relaxed(filtered_len as u64);
me.stats
.skip_push_message_shred_version
.add_relaxed((len - filtered_len) as u64);

let updated: Vec<_> = me
.time_gossip_write_lock("process_push", &me.stats.process_push_message)
.process_push_message(from, data, timestamp());
.process_push_message(from, crds_values, timestamp());

let updated_labels: Vec<_> = updated.into_iter().map(|u| u.value.label()).collect();
let prunes_map: HashMap<Pubkey, HashSet<Pubkey>> = me
Expand Down Expand Up @@ -1945,6 +2021,11 @@ impl ClusterInfo {
self.stats.process_pull_requests.clear(),
i64
),
(
"generate_pull_responses",
self.stats.generate_pull_responses.clear(),
i64
),
("process_prune", self.stats.process_prune.clear(), i64),
(
"process_push_message",
Expand Down Expand Up @@ -1974,6 +2055,34 @@ impl ClusterInfo {
i64
),
);
datapoint_info!(
"cluster_info_shred_version_skip",
(
"skip_push_message_shred_version",
self.stats.skip_push_message_shred_version.clear(),
i64
),
(
"skip_pull_response_shred_version",
self.stats.skip_pull_response_shred_version.clear(),
i64
),
(
"skip_pull_shred_version",
self.stats.skip_pull_shred_version.clear(),
i64
),
(
"push_message_count",
self.stats.push_message_count.clear(),
i64
),
(
"push_message_value_count",
self.stats.push_message_value_count.clear(),
i64
),
);

*last_print = Instant::now();
}
Expand Down Expand Up @@ -2289,6 +2398,91 @@ mod tests {
assert!(ClusterInfo::is_spy_node(&node));
}

#[test]
fn test_handle_pull() {
let node = Node::new_localhost();
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info));

let entrypoint_pubkey = Pubkey::new_rand();
let data = test_crds_values(entrypoint_pubkey);
let timeouts = HashMap::new();
assert_eq!(
(0, 0, 1),
ClusterInfo::handle_pull_response(
&cluster_info,
&entrypoint_pubkey,
data.clone(),
&timeouts
)
);

let entrypoint_pubkey2 = Pubkey::new_rand();
assert_eq!(
(1, 0, 0),
ClusterInfo::handle_pull_response(&cluster_info, &entrypoint_pubkey2, data, &timeouts)
);
}

fn test_crds_values(pubkey: Pubkey) -> Vec<CrdsValue> {
let entrypoint = ContactInfo::new_localhost(&pubkey, timestamp());
let entrypoint_crdsvalue =
CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone()));
vec![entrypoint_crdsvalue]
}

#[test]
fn test_filter_shred_version() {
let from = Pubkey::new_rand();
let my_shred_version = 1;
let other_shred_version = 1;

// Allow same shred_version
let mut values = test_crds_values(from);
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);

// Allow shred_version=0.
let other_shred_version = 0;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);

// Change to sender's ContactInfo version, allow that.
let other_shred_version = 2;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);

let snapshot_hash_data = CrdsValue::new_unsigned(CrdsData::SnapshotHashes(SnapshotHash {
from: Pubkey::new_rand(),
hashes: vec![],
wallclock: 0,
}));
values.push(snapshot_hash_data);
// Change to sender's ContactInfo version, allow that.
let other_shred_version = 2;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);
}

#[test]
fn test_cluster_spy_gossip() {
//check that gossip doesn't try to push to invalid addresses
Expand Down
18 changes: 11 additions & 7 deletions core/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,22 +158,26 @@ impl CrdsGossip {
self.pull.mark_pull_request_creation_time(from, now)
}
/// process a pull request and create a response
pub fn process_pull_requests(
&mut self,
filters: Vec<(CrdsValue, CrdsFilter)>,
now: u64,
) -> Vec<Vec<CrdsValue>> {
pub fn process_pull_requests(&mut self, filters: Vec<(CrdsValue, CrdsFilter)>, now: u64) {
self.pull
.process_pull_requests(&mut self.crds, filters, now)
.process_pull_requests(&mut self.crds, filters, now);
}

pub fn generate_pull_responses(
&self,
filters: &[(CrdsValue, CrdsFilter)],
) -> Vec<Vec<CrdsValue>> {
self.pull.generate_pull_responses(&self.crds, filters)
}

/// process a pull response
pub fn process_pull_response(
&mut self,
from: &Pubkey,
timeouts: &HashMap<Pubkey, u64>,
response: Vec<CrdsValue>,
now: u64,
) -> (usize, usize) {
) -> (usize, usize, usize) {
self.pull
.process_pull_response(&mut self.crds, from, timeouts, response, now)
}
Expand Down
Loading

0 comments on commit 3f508b3

Please sign in to comment.