Skip to content

Commit

Permalink
parallelize update_index
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Oct 15, 2021
1 parent 4beabb3 commit 6eb3a9a
Showing 1 changed file with 45 additions and 27 deletions.
72 changes: 45 additions & 27 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1432,9 +1432,13 @@ impl ShrinkStats {
}
}

fn quarter_thread_count() -> usize {
std::cmp::max(2, num_cpus::get() / 4)
}

pub fn make_min_priority_thread_pool() -> ThreadPool {
// Use lower thread count to reduce priority.
let num_threads = std::cmp::max(2, num_cpus::get() / 4);
let num_threads = quarter_thread_count();
rayon::ThreadPoolBuilder::new()
.thread_name(|i| format!("solana-cleanup-accounts-{}", i))
.num_threads(num_threads)
Expand Down Expand Up @@ -5761,28 +5765,38 @@ impl AccountsDb {

// previous_slot_entry_was_cached = true means we just need to assert that after this update is complete
// that there are no items we would have put in reclaims that are not cached
fn update_index(
fn update_index<T: ReadableAccount + Sync>(
&self,
slot: Slot,
infos: Vec<AccountInfo>,
accounts: &[(&Pubkey, &impl ReadableAccount)],
accounts: &[(&Pubkey, &T)],
previous_slot_entry_was_cached: bool,
) -> SlotList<AccountInfo> {
let mut reclaims = SlotList::<AccountInfo>::with_capacity(infos.len() * 2);
for (info, pubkey_account) in infos.into_iter().zip(accounts.iter()) {
let pubkey = pubkey_account.0;
self.accounts_index.upsert(
slot,
pubkey,
pubkey_account.1.owner(),
pubkey_account.1.data(),
&self.account_indexes,
info,
&mut reclaims,
previous_slot_entry_was_cached,
);
}
reclaims
// using a thread pool here results in deadlock panics from bank_hashes.write()
// so, instead we limit how many threads will be created to the same size as the bg thread pool
let chunk_size = std::cmp::max(1, accounts.len() / quarter_thread_count()); // # pubkeys/thread
infos
.par_chunks(chunk_size)
.zip(accounts.par_chunks(chunk_size))
.map(|(infos_chunk, accounts_chunk)| {
let mut reclaims = Vec::with_capacity(infos_chunk.len() / 2);
for (info, pubkey_account) in infos_chunk.iter().zip(accounts_chunk.iter()) {
let pubkey = pubkey_account.0;
self.accounts_index.upsert(
slot,
pubkey,
pubkey_account.1.owner(),
pubkey_account.1.data(),
&self.account_indexes,
*info,
&mut reclaims,
previous_slot_entry_was_cached,
);
}
reclaims
})
.flatten()
.collect::<Vec<_>>()
}

fn should_not_shrink(aligned_bytes: u64, total_bytes: u64, num_stores: usize) -> bool {
Expand Down Expand Up @@ -6129,6 +6143,7 @@ impl AccountsDb {
}

/// Store the account update.
/// only called by tests
pub fn store_uncached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) {
self.store(slot, accounts, false);
}
Expand All @@ -6152,11 +6167,14 @@ impl AccountsDb {
.store_total_data
.fetch_add(total_data as u64, Ordering::Relaxed);

let mut bank_hashes = self.bank_hashes.write().unwrap();
let slot_info = bank_hashes
.entry(slot)
.or_insert_with(BankHashInfo::default);
slot_info.stats.merge(&stats);
{
// we need to drop bank_hashes to prevent deadlocks
let mut bank_hashes = self.bank_hashes.write().unwrap();
let slot_info = bank_hashes
.entry(slot)
.or_insert_with(BankHashInfo::default);
slot_info.stats.merge(&stats);
}

// we use default hashes for now since the same account may be stored to the cache multiple times
self.store_accounts_unfrozen(slot, accounts, None, is_cached_store);
Expand Down Expand Up @@ -6300,10 +6318,10 @@ impl AccountsDb {
);
}

fn store_accounts_frozen<'a>(
fn store_accounts_frozen<'a, T: ReadableAccount + Sync>(
&'a self,
slot: Slot,
accounts: &[(&Pubkey, &impl ReadableAccount)],
accounts: &[(&Pubkey, &T)],
hashes: Option<&[impl Borrow<Hash>]>,
storage_finder: Option<StorageFinder<'a>>,
write_version_producer: Option<Box<dyn Iterator<Item = StoredMetaWriteVersion>>>,
Expand All @@ -6324,10 +6342,10 @@ impl AccountsDb {
)
}

fn store_accounts_custom<'a>(
fn store_accounts_custom<'a, T: ReadableAccount + Sync>(
&'a self,
slot: Slot,
accounts: &[(&Pubkey, &impl ReadableAccount)],
accounts: &[(&Pubkey, &T)],
hashes: Option<&[impl Borrow<Hash>]>,
storage_finder: Option<StorageFinder<'a>>,
write_version_producer: Option<Box<dyn Iterator<Item = u64>>>,
Expand Down

0 comments on commit 6eb3a9a

Please sign in to comment.