Skip to content

Commit

Permalink
v2.0: Avoid unneeded start_session() with cleanups (bp: anza-xyz#1815,
Browse files Browse the repository at this point in the history
…anza-xyz#1861) (anza-xyz#1854)

* Avoid unneeded start_session() when spawning (anza-xyz#1815)

* Avoid unneeded start_session() when spawning

* Add comments

(cherry picked from commit 40a9851)

* Apply cosmetic changes to unified scheduler (anza-xyz#1861)

* Apply cosmetic changes to unified scheduler

* Use first instead of old-fashioned firstly

Co-authored-by: Andrew Fitzgerald <apfitzge@gmail.com>

---------

Co-authored-by: Andrew Fitzgerald <apfitzge@gmail.com>

---------

Co-authored-by: Ryo Onodera <ryoqun@gmail.com>
Co-authored-by: Andrew Fitzgerald <apfitzge@gmail.com>
  • Loading branch information
3 people authored and neutrinoks committed Jul 17, 2024
1 parent 442ebcf commit ed3fc93
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 70 deletions.
9 changes: 5 additions & 4 deletions runtime/src/installed_scheduler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ use {
log::*,
solana_program_runtime::timings::ExecuteTimings,
solana_sdk::{
clock::Slot,
hash::Hash,
slot_history::Slot,
transaction::{Result, SanitizedTransaction, TransactionError},
},
std::{
fmt::{self, Debug},
mem,
ops::Deref,
sync::{Arc, RwLock},
thread,
},
};
#[cfg(feature = "dev-context-only-utils")]
Expand Down Expand Up @@ -623,7 +624,7 @@ impl BankWithSchedulerInner {
"wait_for_scheduler_termination(slot: {}, reason: {:?}): started at {:?}...",
bank.slot(),
reason,
std::thread::current(),
thread::current(),
);

let mut scheduler = scheduler.write().unwrap();
Expand Down Expand Up @@ -656,7 +657,7 @@ impl BankWithSchedulerInner {
reason,
was_noop,
result_with_timings.as_ref().map(|(result, _)| result),
std::thread::current(),
thread::current(),
);
trace!(
"wait_for_scheduler_termination(result_with_timings: {:?})",
Expand All @@ -667,7 +668,7 @@ impl BankWithSchedulerInner {
}

fn drop_scheduler(&self) {
if std::thread::panicking() {
if thread::panicking() {
error!(
"BankWithSchedulerInner::drop_scheduler(): slot: {} skipping due to already panicking...",
self.bank.slot(),
Expand Down
139 changes: 73 additions & 66 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,23 +761,6 @@ struct ThreadManager<S: SpawnableScheduler<TH>, TH: TaskHandler> {
handler_threads: Vec<JoinHandle<()>>,
}

impl<TH: TaskHandler> PooledScheduler<TH> {
fn do_spawn(
pool: Arc<SchedulerPool<Self, TH>>,
initial_context: SchedulingContext,
result_with_timings: ResultWithTimings,
) -> Self {
Self::from_inner(
PooledSchedulerInner::<Self, TH> {
thread_manager: ThreadManager::new(pool),
usage_queue_loader: UsageQueueLoader::default(),
},
initial_context,
result_with_timings,
)
}
}

struct HandlerPanicked;
type HandlerResult = std::result::Result<Box<ExecutedTask>, HandlerPanicked>;

Expand Down Expand Up @@ -847,7 +830,15 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
);
}

fn start_threads(&mut self, context: &SchedulingContext) {
// This method must take same set of session-related arguments as start_session() to avoid
// unneeded channel operations to minimize overhead. Starting threads incurs a very high cost
// already... Also, pre-creating threads isn't desirable as well to avoid `Option`-ed types
// for type safety.
fn start_threads(
&mut self,
context: SchedulingContext,
mut result_with_timings: ResultWithTimings,
) {
// Firstly, setup bi-directional messaging between the scheduler and handlers to pass
// around tasks, by creating 2 channels (one for to-be-handled tasks from the scheduler to
// the handlers and the other for finished tasks from the handlers to the scheduler).
Expand Down Expand Up @@ -925,7 +916,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// prioritization further. Consequently, this also contributes to alleviate the known
// heuristic's caveat for the first task of linearized runs, which is described above.
let (mut runnable_task_sender, runnable_task_receiver) =
chained_channel::unbounded::<Task, SchedulingContext>(context.clone());
chained_channel::unbounded::<Task, SchedulingContext>(context);
// Create two handler-to-scheduler channels to prioritize the finishing of blocked tasks,
// because it is more likely that a blocked task will have more blocked tasks behind it,
// which should be scheduled while minimizing the delay to clear buffered linearized runs
Expand All @@ -944,7 +935,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// 4. the handler thread processes the dispatched task.
// 5. the handler thread reply back to the scheduler thread as an executed task.
// 6. the scheduler thread post-processes the executed task.
let scheduler_main_loop = || {
let scheduler_main_loop = {
let handler_count = self.pool.handler_count;
let session_result_sender = self.session_result_sender.clone();
// Taking new_task_receiver here is important to ensure there's a single receiver. In
Expand Down Expand Up @@ -1008,29 +999,14 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
let mut state_machine = unsafe {
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
};
let mut result_with_timings = initialized_result_with_timings();

// The following loop maintains and updates ResultWithTimings as its
// externally-provided mutable state for each session in this way:
//
// 1. Initial result_with_timing is propagated implicitly by the moved variable.
// 2. Subsequent result_with_timings are propagated explicitly from
// the new_task_receiver.recv() invocation located at the end of loop.
'nonaborted_main_loop: loop {
match new_task_receiver.recv() {
Ok(NewTaskPayload::OpenSubchannel((
new_context,
new_result_with_timings,
))) => {
// signal about new SchedulingContext to handler threads
runnable_task_sender
.send_chained_channel(new_context, handler_count)
.unwrap();
result_with_timings = new_result_with_timings;
}
Ok(_) => {
unreachable!();
}
Err(_) => {
// This unusual condition must be triggered by ThreadManager::drop();
break 'nonaborted_main_loop;
}
}

let mut is_finished = false;
while !is_finished {
// ALL recv selectors are eager-evaluated ALWAYS by current crossbeam impl,
Expand Down Expand Up @@ -1081,9 +1057,8 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
Ok(NewTaskPayload::CloseSubchannel) => {
session_ending = true;
}
Ok(NewTaskPayload::OpenSubchannel(_context_and_result_with_timings)) => {
unreachable!();
}
Ok(NewTaskPayload::OpenSubchannel(_context_and_result_with_timings)) =>
unreachable!(),
Err(RecvError) => {
// Mostly likely is that this scheduler is dropped for pruned blocks of
// abandoned forks...
Expand All @@ -1106,15 +1081,36 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
is_finished = session_ending && state_machine.has_no_active_task();
}

if session_ending {
state_machine.reinitialize();
session_result_sender
.send(std::mem::replace(
&mut result_with_timings,
initialized_result_with_timings(),
))
.expect("always outlived receiver");
session_ending = false;
// Finalize the current session after asserting it's explicitly requested so.
assert!(session_ending);
// Send result first because this is blocking the replay code-path.
session_result_sender
.send(result_with_timings)
.expect("always outlived receiver");
state_machine.reinitialize();
session_ending = false;

// Prepare for the new session.
match new_task_receiver.recv() {
Ok(NewTaskPayload::OpenSubchannel((
new_context,
new_result_with_timings,
))) => {
// We just received subsequent (= not initial) session and about to
// enter into the preceding `while(!is_finished) {...}` loop again.
// Before that, propagate new SchedulingContext to handler threads
runnable_task_sender
.send_chained_channel(new_context, handler_count)
.unwrap();
result_with_timings = new_result_with_timings;
}
Err(_) => {
// This unusual condition must be triggered by ThreadManager::drop().
// Initialize result_with_timings with a harmless value...
result_with_timings = initialized_result_with_timings();
break 'nonaborted_main_loop;
}
Ok(_) => unreachable!(),
}
}

Expand Down Expand Up @@ -1147,6 +1143,14 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
let finished_blocked_task_sender = finished_blocked_task_sender.clone();
let finished_idle_task_sender = finished_idle_task_sender.clone();

// The following loop maintains and updates SchedulingContext as its
// externally-provided state for each session in this way:
//
// 1. Initial context is propagated implicitly by the moved runnable_task_receiver,
// which is clone()-d just above for this particular thread.
// 2. Subsequent contexts are propagated explicitly inside `.after_select()` as part of
// `select_biased!`, which are sent from `.send_chained_channel()` in the scheduler
// thread for all-but-initial sessions.
move || loop {
let (task, sender) = select_biased! {
recv(runnable_task_receiver.for_select()) -> message => {
Expand Down Expand Up @@ -1201,7 +1205,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
self.scheduler_thread = Some(
thread::Builder::new()
.name("solScheduler".to_owned())
.spawn_tracked(scheduler_main_loop())
.spawn_tracked(scheduler_main_loop)
.unwrap(),
);

Expand Down Expand Up @@ -1322,13 +1326,14 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {

fn start_session(
&mut self,
context: &SchedulingContext,
context: SchedulingContext,
result_with_timings: ResultWithTimings,
) {
assert!(!self.are_threads_joined());
assert_matches!(self.session_result_with_timings, None);
self.new_task_sender
.send(NewTaskPayload::OpenSubchannel((
context.clone(),
context,
result_with_timings,
)))
.expect("no new session after aborted");
Expand All @@ -1348,7 +1353,7 @@ pub trait SpawnableScheduler<TH: TaskHandler>: InstalledScheduler {

fn spawn(
pool: Arc<SchedulerPool<Self, TH>>,
initial_context: SchedulingContext,
context: SchedulingContext,
result_with_timings: ResultWithTimings,
) -> Self
where
Expand All @@ -1374,21 +1379,23 @@ impl<TH: TaskHandler> SpawnableScheduler<TH> for PooledScheduler<TH> {
) -> Self {
inner
.thread_manager
.start_session(&context, result_with_timings);
.start_session(context.clone(), result_with_timings);
Self { inner, context }
}

fn spawn(
pool: Arc<SchedulerPool<Self, TH>>,
initial_context: SchedulingContext,
context: SchedulingContext,
result_with_timings: ResultWithTimings,
) -> Self {
let mut scheduler = Self::do_spawn(pool, initial_context, result_with_timings);
scheduler
.inner
let mut inner = Self::Inner {
thread_manager: ThreadManager::new(pool),
usage_queue_loader: UsageQueueLoader::default(),
};
inner
.thread_manager
.start_threads(&scheduler.context);
scheduler
.start_threads(context.clone(), result_with_timings);
Self { inner, context }
}
}

Expand Down Expand Up @@ -2770,13 +2777,13 @@ mod tests {

fn spawn(
pool: Arc<SchedulerPool<Self, DefaultTaskHandler>>,
initial_context: SchedulingContext,
context: SchedulingContext,
_result_with_timings: ResultWithTimings,
) -> Self {
AsyncScheduler::<TRIGGER_RACE_CONDITION>(
Mutex::new(initialized_result_with_timings()),
Mutex::new(vec![]),
initial_context,
context,
pool,
)
}
Expand Down

0 comments on commit ed3fc93

Please sign in to comment.