Skip to content

Commit

Permalink
enable repair ping/pong cache (solana-labs#28408)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbiseda authored and gnapoli23 committed Dec 16, 2022
1 parent a03b581 commit 9ba8029
Showing 1 changed file with 71 additions and 1 deletion.
72 changes: 71 additions & 1 deletion core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ struct ServeRepairStats {
orphan: usize,
pong: usize,
ancestor_hashes: usize,
ping_cache_check_failed: usize,
pings_sent: usize,
err_time_skew: usize,
err_malformed: usize,
err_sig_verify: usize,
Expand Down Expand Up @@ -531,6 +533,12 @@ impl ServeRepair {
i64
),
("pong", stats.pong, i64),
(
"ping_cache_check_failed",
stats.ping_cache_check_failed,
i64
),
("pings_sent", stats.pings_sent, i64),
("err_time_skew", stats.err_time_skew, i64),
("err_malformed", stats.err_malformed, i64),
("err_sig_verify", stats.err_sig_verify, i64),
Expand Down Expand Up @@ -659,6 +667,43 @@ impl ServeRepair {
true
}

fn check_ping_cache(
ping_cache: &mut PingCache,
request: &RepairProtocol,
from_addr: &SocketAddr,
identity_keypair: &Keypair,
) -> (bool, Option<Packet>) {
let mut rng = rand::thread_rng();
let mut pingf = move || Ping::new_rand(&mut rng, identity_keypair).ok();
let (check, ping) =
ping_cache.check(Instant::now(), (*request.sender(), *from_addr), &mut pingf);
let ping_pkt = if let Some(ping) = ping {
match request {
RepairProtocol::LegacyWindowIndex(_, _, _)
| RepairProtocol::LegacyHighestWindowIndex(_, _, _)
| RepairProtocol::LegacyOrphan(_, _)
| RepairProtocol::LegacyWindowIndexWithNonce(_, _, _, _)
| RepairProtocol::LegacyHighestWindowIndexWithNonce(_, _, _, _)
| RepairProtocol::LegacyOrphanWithNonce(_, _, _)
| RepairProtocol::WindowIndex { .. }
| RepairProtocol::HighestWindowIndex { .. }
| RepairProtocol::Orphan { .. } => {
let ping = RepairResponse::Ping(ping);
Packet::from_data(Some(from_addr), ping).ok()
}
RepairProtocol::LegacyAncestorHashes(_, _, _)
| RepairProtocol::AncestorHashes { .. } => {
let ping = AncestorHashesResponse::Ping(ping);
Packet::from_data(Some(from_addr), ping).ok()
}
RepairProtocol::Pong(_) => None,
}
} else {
None
};
(check, ping_pkt)
}

fn handle_packets(
&self,
ping_cache: &mut PingCache,
Expand All @@ -672,6 +717,8 @@ impl ServeRepair {
) {
let identity_keypair = self.cluster_info.keypair().clone();
let my_id = identity_keypair.pubkey();
let socket_addr_space = *self.cluster_info.socket_addr_space();
let mut pending_pings = Vec::default();

// iter over the packets
for (i, packet) in packet_batch.iter().enumerate() {
Expand All @@ -683,6 +730,12 @@ impl ServeRepair {
}
};

let from_addr = packet.meta.socket_addr();
if !ContactInfo::is_valid_address(&from_addr, &socket_addr_space) {
stats.err_malformed += 1;
continue;
}

let staked = epoch_staked_nodes
.as_ref()
.map(|nodes| nodes.contains_key(request.sender()))
Expand All @@ -704,7 +757,18 @@ impl ServeRepair {
stats.unsigned_requests += 1;
}

let from_addr = packet.meta.socket_addr();
if !matches!(&request, RepairProtocol::Pong(_)) {
let (check, ping_pkt) =
Self::check_ping_cache(ping_cache, &request, &from_addr, &identity_keypair);
if let Some(ping_pkt) = ping_pkt {
pending_pings.push(ping_pkt);
}
if !check {
// collect stats for ping/pong verification
stats.ping_cache_check_failed += 1;
}
}

stats.processed += 1;
let rsp = match Self::handle_repair(
recycler, &from_addr, blockstore, request, stats, ping_cache,
Expand All @@ -726,6 +790,12 @@ impl ServeRepair {
break;
}
}

if !pending_pings.is_empty() {
stats.pings_sent += pending_pings.len();
let batch = PacketBatch::new(pending_pings);
let _ignore = response_sender.send(batch);
}
}

pub fn ancestor_repair_request_bytes(
Expand Down

0 comments on commit 9ba8029

Please sign in to comment.