diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index c717950c0896c4..e815d090ac43b8 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -1432,9 +1432,13 @@ impl ShrinkStats { } } +fn quarter_thread_count() -> usize { + std::cmp::max(2, num_cpus::get() / 4) +} + pub fn make_min_priority_thread_pool() -> ThreadPool { // Use lower thread count to reduce priority. - let num_threads = std::cmp::max(2, num_cpus::get() / 4); + let num_threads = quarter_thread_count(); rayon::ThreadPoolBuilder::new() .thread_name(|i| format!("solana-cleanup-accounts-{}", i)) .num_threads(num_threads) @@ -5761,28 +5765,38 @@ impl AccountsDb { // previous_slot_entry_was_cached = true means we just need to assert that after this update is complete // that there are no items we would have put in reclaims that are not cached - fn update_index( + fn update_index( &self, slot: Slot, infos: Vec, - accounts: &[(&Pubkey, &impl ReadableAccount)], + accounts: &[(&Pubkey, &T)], previous_slot_entry_was_cached: bool, ) -> SlotList { - let mut reclaims = SlotList::::with_capacity(infos.len() * 2); - for (info, pubkey_account) in infos.into_iter().zip(accounts.iter()) { - let pubkey = pubkey_account.0; - self.accounts_index.upsert( - slot, - pubkey, - pubkey_account.1.owner(), - pubkey_account.1.data(), - &self.account_indexes, - info, - &mut reclaims, - previous_slot_entry_was_cached, - ); - } - reclaims + // using a thread pool here results in deadlock panics from bank_hashes.write() + // so, instead we limit how many threads will be created to the same size as the bg thread pool + let chunk_size = std::cmp::max(1, accounts.len() / quarter_thread_count()); // # pubkeys/thread + infos + .par_chunks(chunk_size) + .zip(accounts.par_chunks(chunk_size)) + .map(|(infos_chunk, accounts_chunk)| { + let mut reclaims = Vec::with_capacity(infos_chunk.len() / 2); + for (info, pubkey_account) in infos_chunk.iter().zip(accounts_chunk.iter()) { + let pubkey = pubkey_account.0; + self.accounts_index.upsert( + slot, + pubkey, + pubkey_account.1.owner(), + pubkey_account.1.data(), + &self.account_indexes, + *info, + &mut reclaims, + previous_slot_entry_was_cached, + ); + } + reclaims + }) + .flatten() + .collect::>() } fn should_not_shrink(aligned_bytes: u64, total_bytes: u64, num_stores: usize) -> bool { @@ -6129,6 +6143,7 @@ impl AccountsDb { } /// Store the account update. + /// only called by tests pub fn store_uncached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) { self.store(slot, accounts, false); } @@ -6152,11 +6167,14 @@ impl AccountsDb { .store_total_data .fetch_add(total_data as u64, Ordering::Relaxed); - let mut bank_hashes = self.bank_hashes.write().unwrap(); - let slot_info = bank_hashes - .entry(slot) - .or_insert_with(BankHashInfo::default); - slot_info.stats.merge(&stats); + { + // we need to drop bank_hashes to prevent deadlocks + let mut bank_hashes = self.bank_hashes.write().unwrap(); + let slot_info = bank_hashes + .entry(slot) + .or_insert_with(BankHashInfo::default); + slot_info.stats.merge(&stats); + } // we use default hashes for now since the same account may be stored to the cache multiple times self.store_accounts_unfrozen(slot, accounts, None, is_cached_store); @@ -6300,10 +6318,10 @@ impl AccountsDb { ); } - fn store_accounts_frozen<'a>( + fn store_accounts_frozen<'a, T: ReadableAccount + Sync>( &'a self, slot: Slot, - accounts: &[(&Pubkey, &impl ReadableAccount)], + accounts: &[(&Pubkey, &T)], hashes: Option<&[impl Borrow]>, storage_finder: Option>, write_version_producer: Option>>, @@ -6324,10 +6342,10 @@ impl AccountsDb { ) } - fn store_accounts_custom<'a>( + fn store_accounts_custom<'a, T: ReadableAccount + Sync>( &'a self, slot: Slot, - accounts: &[(&Pubkey, &impl ReadableAccount)], + accounts: &[(&Pubkey, &T)], hashes: Option<&[impl Borrow]>, storage_finder: Option>, write_version_producer: Option>>,