Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit b0e6d78

Browse files
committed
Adds LoadedPrograms::next_cooperative_loading_task() and LoadedPrograms::cooperative_loading_task_complete().
1 parent 11c6f81 commit b0e6d78

File tree

2 files changed

+108
-40
lines changed

2 files changed

+108
-40
lines changed

program-runtime/src/loaded_programs.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -908,6 +908,81 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
908908
extracted
909909
}
910910

911+
/// In cooperative loading a TX batch calls this to receive the next task
912+
pub fn next_cooperative_loading_task(
913+
&mut self,
914+
extracted: &Arc<Mutex<ExtractedPrograms>>,
915+
) -> Option<(Pubkey, Arc<LoadedProgram>, bool)> {
916+
// The Mutexes are strictly speaking unnecessary
917+
// because the entire `LoadedPrograms` cache is already locked here.
918+
let extracted = extracted.lock().unwrap();
919+
let (key, (entry, reload)) =
920+
extracted.loading.iter().find(|(_key, (entry, _reload))| {
921+
let LoadedProgramType::Loading(mutex) = &entry.program else {
922+
debug_assert!(false);
923+
return false;
924+
};
925+
let processing = mutex.lock().unwrap().0;
926+
!processing
927+
})?;
928+
let (key, entry, reload) = (*key, entry.clone(), *reload);
929+
drop(extracted);
930+
{
931+
let LoadedProgramType::Loading(mutex) = &entry.program else {
932+
debug_assert!(false);
933+
return None;
934+
};
935+
let processing = &mut mutex.lock().unwrap().0;
936+
*processing = true;
937+
}
938+
Some((key, entry, reload))
939+
}
940+
941+
/// Upon completing a task in cooperative loading a TX batch calls this to submit the result
942+
pub fn cooperative_loading_task_complete(
943+
&mut self,
944+
key: Pubkey,
945+
loading: Arc<LoadedProgram>,
946+
loaded: Arc<LoadedProgram>,
947+
) {
948+
let LoadedProgramType::Loading(mutex) = &loading.program else {
949+
debug_assert!(false);
950+
return;
951+
};
952+
let mut mutex = mutex.lock().unwrap();
953+
let processing = &mut mutex.0;
954+
*processing = false;
955+
let waiting_list_is_empty = {
956+
let fork_graph = self
957+
.fork_graph
958+
.as_ref()
959+
.expect("Program cache doesn't have fork graph.");
960+
let fork_graph = fork_graph
961+
.read()
962+
.expect("Failed to lock fork graph for reading.");
963+
let waiting_list = &mut mutex.1;
964+
waiting_list.retain(|waiting| {
965+
// The Mutex around `waiting` is strictly speaking unnecessary
966+
// because the entire `LoadedPrograms` cache is already locked here.
967+
let mut waiting = waiting.lock().unwrap();
968+
let relation = fork_graph.relationship(loaded.deployment_slot, waiting.loaded.slot);
969+
if loaded.deployment_slot <= self.latest_root_slot
970+
|| matches!(relation, BlockRelation::Equal | BlockRelation::Descendant)
971+
{
972+
waiting.loading.remove(&key);
973+
waiting.loaded.assign_program(key, loaded.clone());
974+
return false;
975+
}
976+
true
977+
});
978+
waiting_list.is_empty()
979+
};
980+
if waiting_list_is_empty {
981+
self.remove_program(key, &loading);
982+
}
983+
self.assign_program(key, loaded);
984+
}
985+
911986
pub fn merge(&mut self, tx_batch_cache: &LoadedProgramsForTxBatch) {
912987
tx_batch_cache.entries.iter().for_each(|(key, entry)| {
913988
self.assign_program(*key, entry.clone());

runtime/src/bank.rs

Lines changed: 33 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ use {
112112
compute_budget_processor::process_compute_budget_instructions,
113113
invoke_context::BuiltinFunctionWithContext,
114114
loaded_programs::{
115-
ExtractedPrograms, LoadProgramMetrics, LoadedProgram, LoadedProgramMatchCriteria,
116-
LoadedProgramType, LoadedPrograms, LoadedProgramsForTxBatch, ProgramRuntimeEnvironment,
115+
LoadProgramMetrics, LoadedProgram, LoadedProgramMatchCriteria, LoadedProgramType,
116+
LoadedPrograms, LoadedProgramsForTxBatch, ProgramRuntimeEnvironment,
117117
ProgramRuntimeEnvironments, WorkingSlot, DELAY_VISIBILITY_SLOT_OFFSET,
118118
},
119119
log_collector::LogCollector,
@@ -5068,50 +5068,43 @@ impl Bank {
50685068
.collect()
50695069
};
50705070

5071-
let ExtractedPrograms {
5072-
loaded: mut loaded_programs_for_txs,
5073-
missing,
5074-
unloaded,
5075-
} = {
5071+
let extracted = {
50765072
// Lock the global cache to figure out which programs need to be loaded
5077-
let loaded_programs_cache = self.loaded_programs_cache.read().unwrap();
5073+
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
50785074
loaded_programs_cache.extract(self, programs_and_slots.into_iter())
50795075
};
50805076

5081-
// Load missing programs while global cache is unlocked
5082-
let missing_programs: Vec<(Pubkey, Arc<LoadedProgram>)> = missing
5083-
.iter()
5084-
.map(|(key, count)| {
5085-
let program = self.load_program(key, false, None);
5086-
program.tx_usage_counter.store(*count, Ordering::Relaxed);
5087-
(*key, program)
5088-
})
5089-
.collect();
5090-
5091-
// Reload unloaded programs while global cache is unlocked
5092-
let unloaded_programs: Vec<(Pubkey, Arc<LoadedProgram>)> = unloaded
5093-
.iter()
5094-
.map(|(key, count)| {
5095-
let program = self.load_program(key, true, None);
5096-
program.tx_usage_counter.store(*count, Ordering::Relaxed);
5097-
(*key, program)
5098-
})
5099-
.collect();
5100-
5101-
// Lock the global cache again to replenish the missing programs
5102-
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
5103-
for (key, program) in missing_programs {
5104-
let entry = loaded_programs_cache.assign_program(key, program);
5105-
// Use the returned entry as that might have been deduplicated globally
5106-
loaded_programs_for_txs.assign_program(key, entry);
5107-
}
5108-
for (key, program) in unloaded_programs {
5109-
let entry = loaded_programs_cache.assign_program(key, program);
5110-
// Use the returned entry as that might have been deduplicated globally
5111-
loaded_programs_for_txs.assign_program(key, entry);
5077+
// Cooperative loading phase
5078+
let mut finished_task = None;
5079+
loop {
5080+
// Critical section for global coordination
5081+
let (key, loading, reload) = {
5082+
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
5083+
if let Some((key, loading, loaded)) = finished_task.take() {
5084+
loaded_programs_cache.cooperative_loading_task_complete(key, loading, loaded);
5085+
}
5086+
if Arc::strong_count(&extracted) == 1 {
5087+
// All the missing entries for this batch have been loaded
5088+
break;
5089+
}
5090+
if let Some(task) = loaded_programs_cache.next_cooperative_loading_task(&extracted)
5091+
{
5092+
task
5093+
} else {
5094+
// Waiting for some other TX batch to complete loading the programs needed by this TX batch
5095+
// TODO: Use a Condvar here
5096+
continue;
5097+
}
5098+
};
5099+
// Load, verify and compile the program outside of the critical section
5100+
let loaded = self.load_program(&key, reload, None);
5101+
finished_task = Some((key, loading, loaded));
51125102
}
51135103

5114-
loaded_programs_for_txs
5104+
// When we get here we should be the only remaining owner
5105+
std::sync::Mutex::into_inner(Arc::into_inner(extracted).unwrap())
5106+
.unwrap()
5107+
.loaded
51155108
}
51165109

51175110
/// Returns a hash map of executable program accounts (program accounts that are not writable

0 commit comments

Comments
 (0)