Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit c7f8dba

Browse files
committed
Make repair metrics less chatty (#9094)
1 parent 3b526cc commit c7f8dba

File tree

4 files changed

+123
-36
lines changed

4 files changed

+123
-36
lines changed

archiver-lib/src/archiver.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use solana_core::{
1414
gossip_service::GossipService,
1515
packet::{limited_deserialize, PACKET_DATA_SIZE},
1616
repair_service,
17-
repair_service::{RepairService, RepairSlotRange, RepairStrategy},
17+
repair_service::{RepairService, RepairSlotRange, RepairStats, RepairStrategy},
1818
serve_repair::ServeRepair,
1919
shred_fetch_stage::ShredFetchStage,
2020
sigverify_stage::{DisabledSigVerifier, SigVerifyStage},
@@ -839,13 +839,14 @@ impl Archiver {
839839
repair_service::MAX_REPAIR_LENGTH,
840840
&repair_slot_range,
841841
);
842+
let mut repair_stats = RepairStats::default();
842843
//iter over the repairs and send them
843844
if let Ok(repairs) = repairs {
844845
let reqs: Vec<_> = repairs
845846
.into_iter()
846847
.filter_map(|repair_request| {
847848
serve_repair
848-
.map_repair_request(&repair_request)
849+
.map_repair_request(&repair_request, &mut repair_stats)
849850
.map(|result| ((archiver_info.gossip, result), repair_request))
850851
.ok()
851852
})

core/src/repair_service.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,31 @@ use std::{
2020
sync::{Arc, RwLock},
2121
thread::sleep,
2222
thread::{self, Builder, JoinHandle},
23-
time::Duration,
23+
time::{Duration, Instant},
2424
};
2525

26+
#[derive(Default)]
27+
pub struct RepairStatsGroup {
28+
pub count: u64,
29+
pub min: u64,
30+
pub max: u64,
31+
}
32+
33+
impl RepairStatsGroup {
34+
pub fn update(&mut self, slot: u64) {
35+
self.count += 1;
36+
self.min = std::cmp::min(self.min, slot);
37+
self.max = std::cmp::max(self.max, slot);
38+
}
39+
}
40+
41+
#[derive(Default)]
42+
pub struct RepairStats {
43+
pub shred: RepairStatsGroup,
44+
pub highest_shred: RepairStatsGroup,
45+
pub orphan: RepairStatsGroup,
46+
}
47+
2648
pub const MAX_REPAIR_LENGTH: usize = 512;
2749
pub const REPAIR_MS: u64 = 100;
2850
pub const MAX_ORPHANS: usize = 5;
@@ -107,6 +129,8 @@ impl RepairService {
107129
cluster_info,
108130
);
109131
}
132+
let mut repair_stats = RepairStats::default();
133+
let mut last_stats = Instant::now();
110134
loop {
111135
if exit.load(Ordering::Relaxed) {
112136
break;
@@ -148,7 +172,7 @@ impl RepairService {
148172
.into_iter()
149173
.filter_map(|repair_request| {
150174
serve_repair
151-
.repair_request(&repair_request)
175+
.repair_request(&repair_request, &mut repair_stats)
152176
.map(|result| (result, repair_request))
153177
.ok()
154178
})
@@ -161,6 +185,24 @@ impl RepairService {
161185
});
162186
}
163187
}
188+
if last_stats.elapsed().as_secs() > 1 {
189+
let repair_total = repair_stats.shred.count
190+
+ repair_stats.highest_shred.count
191+
+ repair_stats.orphan.count;
192+
if repair_total > 0 {
193+
datapoint_info!(
194+
"serve_repair-repair",
195+
("repair-total", repair_total, i64),
196+
("shred-count", repair_stats.shred.count, i64),
197+
("highest-shred-count", repair_stats.highest_shred.count, i64),
198+
("orphan-count", repair_stats.orphan.count, i64),
199+
("repair-highest-slot", repair_stats.highest_shred.max, i64),
200+
("repair-orphan", repair_stats.orphan.max, i64),
201+
);
202+
}
203+
repair_stats = RepairStats::default();
204+
last_stats = Instant::now();
205+
}
164206
sleep(Duration::from_millis(REPAIR_MS));
165207
}
166208
}

core/src/serve_repair.rs

Lines changed: 71 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::{
44
cluster_info::{ClusterInfo, ClusterInfoError},
55
contact_info::ContactInfo,
66
packet::Packet,
7+
repair_service::RepairStats,
78
result::{Error, Result},
89
};
910
use bincode::serialize;
@@ -46,6 +47,16 @@ impl RepairType {
4647
}
4748
}
4849

50+
#[derive(Default)]
51+
pub struct ServeRepairStats {
52+
pub total_packets: usize,
53+
pub processed: usize,
54+
pub self_repair: usize,
55+
pub window_index: usize,
56+
pub highest_window_index: usize,
57+
pub orphan: usize,
58+
}
59+
4960
/// Window protocol messages
5061
#[derive(Serialize, Deserialize, Debug)]
5162
enum RepairProtocol {
@@ -104,25 +115,22 @@ impl ServeRepair {
104115
from_addr: &SocketAddr,
105116
blockstore: Option<&Arc<Blockstore>>,
106117
request: RepairProtocol,
118+
stats: &mut ServeRepairStats,
107119
) -> Option<Packets> {
108120
let now = Instant::now();
109121

110122
//TODO verify from is signed
111123
let my_id = me.read().unwrap().keypair.pubkey();
112124
let from = Self::get_repair_sender(&request);
113125
if from.id == my_id {
114-
warn!(
115-
"{}: Ignored received repair request from ME {}",
116-
my_id, from.id,
117-
);
118-
inc_new_counter_debug!("serve_repair-handle-repair--eq", 1);
126+
stats.self_repair += 1;
119127
return None;
120128
}
121129

122130
let (res, label) = {
123131
match &request {
124132
RepairProtocol::WindowIndex(from, slot, shred_index) => {
125-
inc_new_counter_debug!("serve_repair-request-window-index", 1);
133+
stats.window_index += 1;
126134
(
127135
Self::run_window_request(
128136
recycler,
@@ -138,7 +146,7 @@ impl ServeRepair {
138146
}
139147

140148
RepairProtocol::HighestWindowIndex(_, slot, highest_index) => {
141-
inc_new_counter_debug!("serve_repair-request-highest-window-index", 1);
149+
stats.highest_window_index += 1;
142150
(
143151
Self::run_highest_window_request(
144152
recycler,
@@ -151,7 +159,7 @@ impl ServeRepair {
151159
)
152160
}
153161
RepairProtocol::Orphan(_, slot) => {
154-
inc_new_counter_debug!("serve_repair-request-orphan", 1);
162+
stats.orphan += 1;
155163
(
156164
Self::run_orphan(
157165
recycler,
@@ -186,6 +194,7 @@ impl ServeRepair {
186194
requests_receiver: &PacketReceiver,
187195
response_sender: &PacketSender,
188196
max_packets: &mut usize,
197+
stats: &mut ServeRepairStats,
189198
) -> Result<()> {
190199
//TODO cache connections
191200
let timeout = Duration::new(1, 0);
@@ -202,7 +211,7 @@ impl ServeRepair {
202211

203212
let mut time = Measure::start("repair::handle_packets");
204213
for reqs in reqs_v {
205-
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender);
214+
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats);
206215
}
207216
time.stop();
208217
if total_packets >= *max_packets {
@@ -215,6 +224,31 @@ impl ServeRepair {
215224
Ok(())
216225
}
217226

227+
fn report_reset_stats(me: &Arc<RwLock<Self>>, stats: &mut ServeRepairStats) {
228+
if stats.self_repair > 0 {
229+
let my_id = me.read().unwrap().keypair.pubkey();
230+
warn!(
231+
"{}: Ignored received repair requests from ME: {}",
232+
my_id, stats.self_repair,
233+
);
234+
inc_new_counter_debug!("serve_repair-handle-repair--eq", stats.self_repair);
235+
}
236+
237+
debug!(
238+
"repair_listener: total_packets: {} passed: {}",
239+
stats.total_packets, stats.processed
240+
);
241+
242+
inc_new_counter_debug!("serve_repair-request-window-index", stats.window_index);
243+
inc_new_counter_debug!(
244+
"serve_repair-request-highest-window-index",
245+
stats.highest_window_index
246+
);
247+
inc_new_counter_debug!("serve_repair-request-orphan", stats.orphan);
248+
249+
*stats = ServeRepairStats::default();
250+
}
251+
218252
pub fn listen(
219253
me: Arc<RwLock<Self>>,
220254
blockstore: Option<Arc<Blockstore>>,
@@ -228,6 +262,8 @@ impl ServeRepair {
228262
.name("solana-repair-listen".to_string())
229263
.spawn(move || {
230264
let mut max_packets = 1024;
265+
let mut last_print = Instant::now();
266+
let mut stats = ServeRepairStats::default();
231267
loop {
232268
let result = Self::run_listen(
233269
&me,
@@ -236,6 +272,7 @@ impl ServeRepair {
236272
&requests_receiver,
237273
&response_sender,
238274
&mut max_packets,
275+
&mut stats,
239276
);
240277
match result {
241278
Err(Error::RecvTimeoutError(_)) | Ok(_) => {}
@@ -244,6 +281,10 @@ impl ServeRepair {
244281
if exit.load(Ordering::Relaxed) {
245282
return;
246283
}
284+
if last_print.elapsed().as_secs() > 2 {
285+
Self::report_reset_stats(&me, &mut stats);
286+
last_print = Instant::now();
287+
}
247288
thread_mem_usage::datapoint("solana-repair-listen");
248289
}
249290
})
@@ -256,6 +297,7 @@ impl ServeRepair {
256297
blockstore: Option<&Arc<Blockstore>>,
257298
packets: Packets,
258299
response_sender: &PacketSender,
300+
stats: &mut ServeRepairStats,
259301
) {
260302
// iter over the packets, collect pulls separately and process everything else
261303
let allocated = thread_mem_usage::Allocatedp::default();
@@ -265,7 +307,9 @@ impl ServeRepair {
265307
limited_deserialize(&packet.data[..packet.meta.size])
266308
.into_iter()
267309
.for_each(|request| {
268-
let rsp = Self::handle_repair(me, recycler, &from_addr, blockstore, request);
310+
stats.processed += 1;
311+
let rsp =
312+
Self::handle_repair(me, recycler, &from_addr, blockstore, request, stats);
269313
if let Some(rsp) = rsp {
270314
let _ignore_disconnect = response_sender.send(rsp);
271315
}
@@ -295,7 +339,11 @@ impl ServeRepair {
295339
Ok(out)
296340
}
297341

298-
pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec<u8>)> {
342+
pub fn repair_request(
343+
&self,
344+
repair_request: &RepairType,
345+
repair_stats: &mut RepairStats,
346+
) -> Result<(SocketAddr, Vec<u8>)> {
299347
// find a peer that appears to be accepting replication and has the desired slot, as indicated
300348
// by a valid tvu port location
301349
let valid: Vec<_> = self
@@ -308,31 +356,27 @@ impl ServeRepair {
308356
}
309357
let n = thread_rng().gen::<usize>() % valid.len();
310358
let addr = valid[n].serve_repair; // send the request to the peer's serve_repair port
311-
let out = self.map_repair_request(repair_request)?;
359+
let out = self.map_repair_request(repair_request, repair_stats)?;
312360

313361
Ok((addr, out))
314362
}
315363

316-
pub fn map_repair_request(&self, repair_request: &RepairType) -> Result<Vec<u8>> {
364+
pub fn map_repair_request(
365+
&self,
366+
repair_request: &RepairType,
367+
repair_stats: &mut RepairStats,
368+
) -> Result<Vec<u8>> {
317369
match repair_request {
318370
RepairType::Shred(slot, shred_index) => {
319-
datapoint_debug!(
320-
"serve_repair-repair",
321-
("repair-slot", *slot, i64),
322-
("repair-ix", *shred_index, i64)
323-
);
371+
repair_stats.shred.update(*slot);
324372
Ok(self.window_index_request_bytes(*slot, *shred_index)?)
325373
}
326374
RepairType::HighestShred(slot, shred_index) => {
327-
datapoint_info!(
328-
"serve_repair-repair_highest",
329-
("repair-highest-slot", *slot, i64),
330-
("repair-highest-ix", *shred_index, i64)
331-
);
375+
repair_stats.highest_shred.update(*slot);
332376
Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?)
333377
}
334378
RepairType::Orphan(slot) => {
335-
datapoint_info!("serve_repair-repair_orphan", ("repair-orphan", *slot, i64));
379+
repair_stats.orphan.update(*slot);
336380
Ok(self.orphan_bytes(*slot)?)
337381
}
338382
}
@@ -592,7 +636,7 @@ mod tests {
592636
let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
593637
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(me)));
594638
let serve_repair = ServeRepair::new(cluster_info.clone());
595-
let rv = serve_repair.repair_request(&RepairType::Shred(0, 0));
639+
let rv = serve_repair.repair_request(&RepairType::Shred(0, 0), &mut RepairStats::default());
596640
assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));
597641

598642
let serve_repair_addr = socketaddr!([127, 0, 0, 1], 1243);
@@ -613,7 +657,7 @@ mod tests {
613657
};
614658
cluster_info.write().unwrap().insert_info(nxt.clone());
615659
let rv = serve_repair
616-
.repair_request(&RepairType::Shred(0, 0))
660+
.repair_request(&RepairType::Shred(0, 0), &mut RepairStats::default())
617661
.unwrap();
618662
assert_eq!(nxt.serve_repair, serve_repair_addr);
619663
assert_eq!(rv.0, nxt.serve_repair);
@@ -640,7 +684,7 @@ mod tests {
640684
while !one || !two {
641685
//this randomly picks an option, so eventually it should pick both
642686
let rv = serve_repair
643-
.repair_request(&RepairType::Shred(0, 0))
687+
.repair_request(&RepairType::Shred(0, 0), &mut RepairStats::default())
644688
.unwrap();
645689
if rv.0 == serve_repair_addr {
646690
one = true;

metrics/scripts/grafana-provisioning/dashboards/cluster-monitor.json

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6620,8 +6620,8 @@
66206620
"hide": false,
66216621
"measurement": "cluster_info-vote-count",
66226622
"orderByTime": "ASC",
6623-
"policy": "autogen",
6624-
"query": "SELECT sum(\"count\") FROM \"$testnet\".\"autogen\".\"banking_stage-buffered_packets\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
6623+
"policy": "default",
6624+
"query": "SELECT last(\"repair-highest-slot\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
66256625
"rawQuery": true,
66266626
"refId": "A",
66276627
"resultFormat": "time_series",
@@ -6658,7 +6658,7 @@
66586658
],
66596659
"orderByTime": "ASC",
66606660
"policy": "default",
6661-
"query": "SELECT sum(\"count\") FROM \"$testnet\".\"autogen\".\"banking_stage-forwarded_packets\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
6661+
"query": "SELECT last(\"repair-highest-ix\") AS \"ix\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
66626662
"rawQuery": true,
66636663
"refId": "B",
66646664
"resultFormat": "time_series",
@@ -6922,8 +6922,8 @@
69226922
],
69236923
"measurement": "cluster_info-vote-count",
69246924
"orderByTime": "ASC",
6925-
"policy": "autogen",
6926-
"query": "SELECT sum(\"recovered\") AS \"recovered\" FROM \"$testnet\".\"autogen\".\"blockstore-erasure\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval) FILL(0)",
6925+
"policy": "default",
6926+
"query": "SELECT last(\"repair-orphan\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
69276927
"rawQuery": true,
69286928
"refId": "B",
69296929
"resultFormat": "time_series",

0 commit comments

Comments
 (0)