Skip to content

Commit

Permalink
Cleanup the multithreaded executor (#12969)
Browse files Browse the repository at this point in the history
# Objective
Improve the code quality of the multithreaded executor.

## Solution
 * Remove some unused variables.
 * Use `Mutex::get_mut` where applicable instead of locking.
* Use a `startup_systems` FixedBitset to pre-compute the starting
systems instead of building it bit-by-bit on startup.
* Instead of using `FixedBitset::clear` and `FixedBitset::union_with`,
use `FixedBitset::clone_from` instead, which does only a single copy and
will not allocate if the target bitset has a large enough allocation.
* Replace the `Mutex` around `Conditions` with `SyncUnsafeCell`, and add
a `Context::try_lock` that forces it to be synchronized fetched
alongside the executor lock.

This might produce minimal performance gains, but the focus here is on
the code quality improvements.
  • Loading branch information
james7132 authored Apr 16, 2024
1 parent a5fa32e commit 9dde99f
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 75 deletions.
113 changes: 39 additions & 74 deletions crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
any::Any,
sync::{Arc, Mutex},
sync::{Arc, Mutex, MutexGuard},
};

use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
Expand Down Expand Up @@ -30,7 +30,7 @@ use super::__rust_begin_short_backtrace;
struct Environment<'env, 'sys> {
executor: &'env MultiThreadedExecutor,
systems: &'sys [SyncUnsafeCell<BoxedSystem>],
conditions: Mutex<Conditions<'sys>>,
conditions: SyncUnsafeCell<Conditions<'sys>>,
world_cell: UnsafeWorldCell<'env>,
}

Expand All @@ -50,7 +50,7 @@ impl<'env, 'sys> Environment<'env, 'sys> {
Environment {
executor,
systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(),
conditions: Mutex::new(Conditions {
conditions: SyncUnsafeCell::new(Conditions {
system_conditions: &mut schedule.system_conditions,
set_conditions: &mut schedule.set_conditions,
sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems,
Expand All @@ -77,7 +77,6 @@ struct SystemTaskMetadata {
/// The result of running a system that is sent across a channel.
struct SystemResult {
system_index: usize,
success: bool,
}

/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.
Expand All @@ -90,6 +89,7 @@ pub struct MultiThreadedExecutor {
apply_final_deferred: bool,
/// When set, tells the executor that a thread has panicked.
panic_payload: Mutex<Option<Box<dyn Any + Send>>>,
starting_systems: FixedBitSet,
/// Cached tracing span
#[cfg(feature = "trace")]
executor_span: Span,
Expand All @@ -105,12 +105,8 @@ pub struct ExecutorState {
local_thread_running: bool,
/// Returns `true` if an exclusive system is running.
exclusive_running: bool,
/// The number of systems expected to run.
num_systems: usize,
/// The number of systems that are running.
num_running_systems: usize,
/// The number of systems that have completed.
num_completed_systems: usize,
/// The number of dependencies each system has that have not completed.
num_dependencies_remaining: Vec<usize>,
/// System sets whose conditions have been evaluated.
Expand All @@ -127,8 +123,6 @@ pub struct ExecutorState {
completed_systems: FixedBitSet,
/// Systems that have run but have not had their buffers applied.
unapplied_systems: FixedBitSet,
/// When set, stops the executor from running any more systems.
stop_spawning: bool,
}

/// References to data required by the executor.
Expand Down Expand Up @@ -159,6 +153,7 @@ impl SystemExecutor for MultiThreadedExecutor {
let set_count = schedule.set_ids.len();

self.system_completion = ConcurrentQueue::bounded(sys_count.max(1));
self.starting_systems = FixedBitSet::with_capacity(sys_count);
state.evaluated_sets = FixedBitSet::with_capacity(set_count);
state.ready_systems = FixedBitSet::with_capacity(sys_count);
state.ready_systems_copy = FixedBitSet::with_capacity(sys_count);
Expand All @@ -175,6 +170,9 @@ impl SystemExecutor for MultiThreadedExecutor {
is_send: schedule.systems[index].is_send(),
is_exclusive: schedule.systems[index].is_exclusive(),
});
if schedule.system_dependencies[index] == 0 {
self.starting_systems.insert(index);
}
}

state.num_dependencies_remaining = Vec::with_capacity(sys_count);
Expand All @@ -188,23 +186,14 @@ impl SystemExecutor for MultiThreadedExecutor {
) {
let state = self.state.get_mut().unwrap();
// reset counts
state.num_systems = schedule.systems.len();
if state.num_systems == 0 {
if schedule.systems.is_empty() {
return;
}
state.num_running_systems = 0;
state.num_completed_systems = 0;
state.num_dependencies_remaining.clear();
state
.num_dependencies_remaining
.extend_from_slice(&schedule.system_dependencies);

for (system_index, dependencies) in state.num_dependencies_remaining.iter_mut().enumerate()
{
if *dependencies == 0 {
state.ready_systems.insert(system_index);
}
}
.clone_from(&schedule.system_dependencies);
state.ready_systems.clone_from(&self.starting_systems);

// If stepping is enabled, make sure we skip those systems that should
// not be run.
Expand All @@ -213,13 +202,12 @@ impl SystemExecutor for MultiThreadedExecutor {
debug_assert_eq!(skipped_systems.len(), state.completed_systems.len());
// mark skipped systems as completed
state.completed_systems |= skipped_systems;
state.num_completed_systems = state.completed_systems.count_ones(..);

// signal the dependencies for each of the skipped systems, as
// though they had run
for system_index in skipped_systems.ones() {
state.signal_dependents(system_index);
state.ready_systems.set(system_index, false);
state.ready_systems.remove(system_index);
}
}

Expand Down Expand Up @@ -251,15 +239,14 @@ impl SystemExecutor for MultiThreadedExecutor {
// Commands should be applied while on the scope's thread, not the executor's thread
let res = apply_deferred(&state.unapplied_systems, systems, world);
if let Err(payload) = res {
let mut panic_payload = self.panic_payload.lock().unwrap();
let panic_payload = self.panic_payload.get_mut().unwrap();
*panic_payload = Some(payload);
}
state.unapplied_systems.clear();
debug_assert!(state.unapplied_systems.is_clear());
}

// check to see if there was a panic
let mut payload = self.panic_payload.lock().unwrap();
let payload = self.panic_payload.get_mut().unwrap();
if let Some(payload) = payload.take() {
std::panic::resume_unwind(payload);
}
Expand Down Expand Up @@ -288,10 +275,7 @@ impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
self.environment
.executor
.system_completion
.push(SystemResult {
system_index,
success: res.is_ok(),
})
.push(SystemResult { system_index })
.unwrap_or_else(|error| unreachable!("{}", error));
if let Err(payload) = res {
eprintln!("Encountered a panic in system `{}`!", &*system.name());
Expand All @@ -304,17 +288,25 @@ impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
self.tick_executor();
}

fn try_lock<'a>(&'a self) -> Option<(&'a mut Conditions<'sys>, MutexGuard<'a, ExecutorState>)> {
let guard = self.environment.executor.state.try_lock().ok()?;
// SAFETY: This is an exclusive access as no other location fetches conditions mutably, and
// is synchronized by the lock on the executor state.
let conditions = unsafe { &mut *self.environment.conditions.get() };
Some((conditions, guard))
}

fn tick_executor(&self) {
// Ensure that the executor handles any events pushed to the system_completion queue by this thread.
// If this thread acquires the lock, the exector runs after the push() and they are processed.
// If this thread does not acquire the lock, then the is_empty() check on the other thread runs
// after the lock is released, which is after try_lock() failed, which is after the push()
// on this thread, so the is_empty() check will see the new events and loop.
loop {
let Ok(mut guard) = self.environment.executor.state.try_lock() else {
let Some((conditions, mut guard)) = self.try_lock() else {
return;
};
guard.tick(self);
guard.tick(self, conditions);
// Make sure we drop the guard before checking system_completion.is_empty(), or we could lose events.
drop(guard);
if self.environment.executor.system_completion.is_empty() {
Expand All @@ -332,6 +324,7 @@ impl MultiThreadedExecutor {
Self {
state: Mutex::new(ExecutorState::new()),
system_completion: ConcurrentQueue::unbounded(),
starting_systems: FixedBitSet::new(),
apply_final_deferred: true,
panic_payload: Mutex::new(None),
#[cfg(feature = "trace")]
Expand All @@ -344,9 +337,7 @@ impl ExecutorState {
fn new() -> Self {
Self {
system_task_metadata: Vec::new(),
num_systems: 0,
num_running_systems: 0,
num_completed_systems: 0,
num_dependencies_remaining: Vec::new(),
active_access: default(),
local_thread_running: false,
Expand All @@ -358,11 +349,10 @@ impl ExecutorState {
skipped_systems: FixedBitSet::new(),
completed_systems: FixedBitSet::new(),
unapplied_systems: FixedBitSet::new(),
stop_spawning: false,
}
}

fn tick(&mut self, context: &Context) {
fn tick(&mut self, context: &Context, conditions: &mut Conditions) {
#[cfg(feature = "trace")]
let _span = context.environment.executor.executor_span.enter();

Expand All @@ -376,7 +366,7 @@ impl ExecutorState {
// - `finish_system_and_handle_dependents` has updated the currently running systems.
// - `rebuild_active_access` locks access for all currently running systems.
unsafe {
self.spawn_system_tasks(context);
self.spawn_system_tasks(context, conditions);
}
}

Expand All @@ -385,17 +375,11 @@ impl ExecutorState {
/// have been mutably borrowed (such as the systems currently running).
/// - `world_cell` must have permission to access all world data (not counting
/// any world data that is claimed by systems currently running on this executor).
unsafe fn spawn_system_tasks(&mut self, context: &Context) {
unsafe fn spawn_system_tasks(&mut self, context: &Context, conditions: &mut Conditions) {
if self.exclusive_running {
return;
}

let mut conditions = context
.environment
.conditions
.try_lock()
.expect("Conditions should only be locked while owning the executor state");

// can't borrow since loop mutably borrows `self`
let mut ready_systems = std::mem::take(&mut self.ready_systems_copy);

Expand All @@ -405,19 +389,18 @@ impl ExecutorState {
while check_for_new_ready_systems {
check_for_new_ready_systems = false;

ready_systems.clear();
ready_systems.union_with(&self.ready_systems);
ready_systems.clone_from(&self.ready_systems);

for system_index in ready_systems.ones() {
assert!(!self.running_systems.contains(system_index));
debug_assert!(!self.running_systems.contains(system_index));
// SAFETY: Caller assured that these systems are not running.
// Therefore, no other reference to this system exists and there is no aliasing.
let system = unsafe { &mut *context.environment.systems[system_index].get() };

if !self.can_run(
system_index,
system,
&mut conditions,
conditions,
context.environment.world_cell,
) {
// NOTE: exclusive systems with ambiguities are susceptible to
Expand All @@ -427,7 +410,7 @@ impl ExecutorState {
continue;
}

self.ready_systems.set(system_index, false);
self.ready_systems.remove(system_index);

// SAFETY: `can_run` returned true, which means that:
// - It must have called `update_archetype_component_access` for each run condition.
Expand All @@ -436,7 +419,7 @@ impl ExecutorState {
!self.should_run(
system_index,
system,
&mut conditions,
conditions,
context.environment.world_cell,
)
} {
Expand Down Expand Up @@ -523,11 +506,9 @@ impl ExecutorState {
return false;
}

// PERF: use an optimized clear() + extend() operation
let meta_access =
&mut self.system_task_metadata[system_index].archetype_component_access;
meta_access.clear();
meta_access.extend(system.archetype_component_access());
self.system_task_metadata[system_index]
.archetype_component_access
.clone_from(system.archetype_component_access());
}

true
Expand Down Expand Up @@ -666,10 +647,7 @@ impl ExecutorState {
}

fn finish_system_and_handle_dependents(&mut self, result: SystemResult) {
let SystemResult {
system_index,
success,
} = result;
let SystemResult { system_index, .. } = result;

if self.system_task_metadata[system_index].is_exclusive {
self.exclusive_running = false;
Expand All @@ -681,20 +659,14 @@ impl ExecutorState {

debug_assert!(self.num_running_systems >= 1);
self.num_running_systems -= 1;
self.num_completed_systems += 1;
self.running_systems.set(system_index, false);
self.running_systems.remove(system_index);
self.completed_systems.insert(system_index);
self.unapplied_systems.insert(system_index);

self.signal_dependents(system_index);

if !success {
self.stop_spawning_systems();
}
}

fn skip_system_and_signal_dependents(&mut self, system_index: usize) {
self.num_completed_systems += 1;
self.completed_systems.insert(system_index);
self.signal_dependents(system_index);
}
Expand All @@ -710,13 +682,6 @@ impl ExecutorState {
}
}

fn stop_spawning_systems(&mut self) {
if !self.stop_spawning {
self.num_systems = self.num_completed_systems + self.num_running_systems;
self.stop_spawning = true;
}
}

fn rebuild_active_access(&mut self) {
self.active_access.clear();
for index in self.running_systems.ones() {
Expand Down
4 changes: 3 additions & 1 deletion crates/bevy_utils/src/syncunsafecell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,14 @@ impl<T> SyncUnsafeCell<[T]> {
/// assert_eq!(slice_cell.len(), 3);
/// ```
pub fn as_slice_of_cells(&self) -> &[SyncUnsafeCell<T>] {
let self_ptr: *const SyncUnsafeCell<[T]> = ptr::from_ref(self);
let slice_ptr = self_ptr as *const [SyncUnsafeCell<T>];
// SAFETY: `UnsafeCell<T>` and `SyncUnsafeCell<T>` have #[repr(transparent)]
// therefore:
// - `SyncUnsafeCell<T>` has the same layout as `T`
// - `SyncUnsafeCell<[T]>` has the same layout as `[T]`
// - `SyncUnsafeCell<[T]>` has the same layout as `[SyncUnsafeCell<T>]`
unsafe { &*(ptr::from_ref(self) as *const [SyncUnsafeCell<T>]) }
unsafe { &*slice_ptr }
}
}

Expand Down

0 comments on commit 9dde99f

Please sign in to comment.