Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Bank drop service
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge committed Nov 17, 2021
1 parent db6d291 commit 8242a02
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 9 deletions.
37 changes: 37 additions & 0 deletions core/src/drop_bank_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use solana_measure::measure::Measure;
use solana_runtime::bank::Bank;
use std::{
sync::{mpsc::Receiver, Arc},
thread::{self, Builder, JoinHandle},
};

pub struct DropBankService {
thread_hdl: JoinHandle<()>,
}

impl DropBankService {
pub fn new(bank_receiver: Receiver<Vec<Arc<Bank>>>) -> Self {
let thread_hdl = Builder::new()
.name("sol-drop-b-service".to_string())
.spawn(move || {
for banks in bank_receiver.iter() {
let len = banks.len();
let mut dropped_banks_time = Measure::start("drop_banks");
drop(banks);
dropped_banks_time.stop();
if dropped_banks_time.as_ms() > 10 {
datapoint_info!(
"handle_new_root-dropped_banks",
("elapsed_ms", dropped_banks_time.as_ms(), i64)("len", len, i64)
);
}
}
})
.unwrap();
Self { thread_hdl }
}

pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod commitment_service;
pub mod completed_data_sets_service;
pub mod consensus;
pub mod cost_update_service;
pub mod drop_bank_service;
pub mod duplicate_repair_status;
pub mod fetch_stage;
pub mod fork_choice;
Expand Down
18 changes: 9 additions & 9 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ impl ReplayStage {
cluster_slots_update_sender: ClusterSlotsUpdateSender,
cost_update_sender: Sender<CostUpdate>,
voting_sender: Sender<VoteOp>,
drop_bank_sender: Sender<Vec<Arc<Bank>>>,
) -> Self {
let ReplayStageConfig {
vote_account,
Expand Down Expand Up @@ -644,6 +645,7 @@ impl ReplayStage {
&mut replay_timing,
&voting_sender,
&mut epoch_slots_frozen_slots,
&drop_bank_sender,
);
};
voting_time.stop();
Expand Down Expand Up @@ -1615,6 +1617,7 @@ impl ReplayStage {
replay_timing: &mut ReplayTiming,
voting_sender: &Sender<VoteOp>,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
bank_drop_sender: &Sender<Vec<Arc<Bank>>>,
) {
if bank.is_empty() {
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
Expand Down Expand Up @@ -1665,6 +1668,7 @@ impl ReplayStage {
has_new_vote_been_rooted,
vote_signatures,
epoch_slots_frozen_slots,
bank_drop_sender,
);
rpc_subscriptions.notify_roots(rooted_slots);
if let Some(sender) = bank_notification_sender {
Expand Down Expand Up @@ -2723,21 +2727,17 @@ impl ReplayStage {
has_new_vote_been_rooted: &mut bool,
voted_signatures: &mut Vec<Signature>,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
bank_drop_sender: &Sender<Vec<Arc<Bank>>>,
) {
let removed_banks = bank_forks.write().unwrap().set_root(
new_root,
accounts_background_request_sender,
highest_confirmed_root,
);
let mut dropped_banks_time = Measure::start("handle_new_root::drop_banks");
drop(removed_banks);
dropped_banks_time.stop();
if dropped_banks_time.as_ms() > 10 {
datapoint_info!(
"handle_new_root-dropped_banks",
("elapsed_ms", dropped_banks_time.as_ms(), i64)
);
}
bank_drop_sender
.send(removed_banks)
.unwrap_or_else(|err| warn!("bank drop failed: {:?}", err));

// Dropping the bank_forks write lock and reacquiring as a read lock is
// safe because updates to bank_forks are only made by a single thread.
let r_bank_forks = bank_forks.read().unwrap();
Expand Down
8 changes: 8 additions & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
completed_data_sets_service::CompletedDataSetsSender,
consensus::Tower,
cost_update_service::CostUpdateService,
drop_bank_service::DropBankService,
ledger_cleanup_service::LedgerCleanupService,
replay_stage::{ReplayStage, ReplayStageConfig},
retransmit_stage::RetransmitStage,
Expand Down Expand Up @@ -69,6 +70,7 @@ pub struct Tvu {
accounts_hash_verifier: AccountsHashVerifier,
cost_update_service: CostUpdateService,
voting_service: VotingService,
drop_bank_service: DropBankService,
}

pub struct Sockets {
Expand Down Expand Up @@ -302,6 +304,9 @@ impl Tvu {
cost_update_receiver,
);

let (drop_bank_sender, drop_bank_receiver) = channel();
let drop_bank_service = DropBankService::new(drop_bank_receiver);

let replay_stage = ReplayStage::new(
replay_stage_config,
blockstore.clone(),
Expand All @@ -321,6 +326,7 @@ impl Tvu {
cluster_slots_update_sender,
cost_update_sender,
voting_sender,
drop_bank_sender,
);

let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
Expand Down Expand Up @@ -354,6 +360,7 @@ impl Tvu {
accounts_hash_verifier,
cost_update_service,
voting_service,
drop_bank_service,
}
}

Expand All @@ -369,6 +376,7 @@ impl Tvu {
self.accounts_hash_verifier.join()?;
self.cost_update_service.join()?;
self.voting_service.join()?;
self.drop_bank_service.join()?;
Ok(())
}
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/vote_simulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ impl VoteSimulator {
}

pub fn set_root(&mut self, new_root: Slot) {
let (drop_bank_sender, _drop_bank_receiver) = std::sync::mpsc::channel();
ReplayStage::handle_new_root(
new_root,
&self.bank_forks,
Expand All @@ -215,6 +216,7 @@ impl VoteSimulator {
&mut true,
&mut Vec::new(),
&mut EpochSlotsFrozenSlots::default(),
&drop_bank_sender,
)
}

Expand Down

0 comments on commit 8242a02

Please sign in to comment.