forked from solana-labs/solana
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrpc_completed_slots_service.rs
58 lines (55 loc) · 2.06 KB
/
rpc_completed_slots_service.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
use {
crate::{rpc_subscriptions::RpcSubscriptions, slot_status_notifier::SlotStatusNotifier},
crossbeam_channel::RecvTimeoutError,
solana_ledger::blockstore::CompletedSlotsReceiver,
solana_rpc_client_api::response::SlotUpdate,
solana_sdk::timing::timestamp,
std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{Builder, JoinHandle},
time::Duration,
},
};
pub const COMPLETE_SLOT_REPORT_SLEEP_MS: u64 = 100;
pub struct RpcCompletedSlotsService;
impl RpcCompletedSlotsService {
pub fn spawn(
completed_slots_receiver: CompletedSlotsReceiver,
rpc_subscriptions: Arc<RpcSubscriptions>,
slot_status_notifier: Option<SlotStatusNotifier>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name("solRpcComplSlot".to_string())
.spawn(move || loop {
// received exit signal, shutdown the service
if exit.load(Ordering::Relaxed) {
break;
}
match completed_slots_receiver
.recv_timeout(Duration::from_millis(COMPLETE_SLOT_REPORT_SLEEP_MS))
{
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => {
info!("RpcCompletedSlotService channel disconnected, exiting.");
break;
}
Ok(slots) => {
for slot in slots {
rpc_subscriptions.notify_slot_update(SlotUpdate::Completed {
slot,
timestamp: timestamp(),
});
if let Some(slot_status_notifier) = &slot_status_notifier {
slot_status_notifier.read().unwrap().notify_completed(slot);
}
}
}
}
})
.unwrap()
}
}