Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup the multithreaded executor #12969

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Cleanup the multithreaded executor
  • Loading branch information
james7132 committed Apr 14, 2024
commit f480bbd3af00c8f39843e6b6c33fc387f49b69e4
41 changes: 11 additions & 30 deletions crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ struct SystemTaskMetadata {
/// The result of running a system that is sent across a channel.
struct SystemResult {
system_index: usize,
success: bool,
}

/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.
Expand All @@ -90,6 +89,7 @@ pub struct MultiThreadedExecutor {
apply_final_deferred: bool,
/// When set, tells the executor that a thread has panicked.
panic_payload: Mutex<Option<Box<dyn Any + Send>>>,
starting_systems: FixedBitSet,
/// Cached tracing span
#[cfg(feature = "trace")]
executor_span: Span,
Expand All @@ -105,8 +105,6 @@ pub struct ExecutorState {
local_thread_running: bool,
/// Returns `true` if an exclusive system is running.
exclusive_running: bool,
/// The number of systems expected to run.
num_systems: usize,
/// The number of systems that are running.
num_running_systems: usize,
/// The number of systems that have completed.
Expand All @@ -127,8 +125,6 @@ pub struct ExecutorState {
completed_systems: FixedBitSet,
/// Systems that have run but have not had their buffers applied.
unapplied_systems: FixedBitSet,
/// When set, stops the executor from running any more systems.
stop_spawning: bool,
}

/// References to data required by the executor.
Expand Down Expand Up @@ -159,6 +155,7 @@ impl SystemExecutor for MultiThreadedExecutor {
let set_count = schedule.set_ids.len();

self.system_completion = ConcurrentQueue::bounded(sys_count.max(1));
self.starting_systems = FixedBitSet::with_capacity(sys_count);
state.evaluated_sets = FixedBitSet::with_capacity(set_count);
state.ready_systems = FixedBitSet::with_capacity(sys_count);
state.ready_systems_copy = FixedBitSet::with_capacity(sys_count);
Expand All @@ -175,6 +172,9 @@ impl SystemExecutor for MultiThreadedExecutor {
is_send: schedule.systems[index].is_send(),
is_exclusive: schedule.systems[index].is_exclusive(),
});
if schedule.system_dependencies[index] == 0 {
self.starting_systems.insert(index);
}
}

state.num_dependencies_remaining = Vec::with_capacity(sys_count);
Expand All @@ -188,8 +188,7 @@ impl SystemExecutor for MultiThreadedExecutor {
) {
let state = self.state.get_mut().unwrap();
// reset counts
state.num_systems = schedule.systems.len();
if state.num_systems == 0 {
if schedule.systems.is_empty() {
return;
}
state.num_running_systems = 0;
Expand All @@ -198,13 +197,8 @@ impl SystemExecutor for MultiThreadedExecutor {
state
.num_dependencies_remaining
.extend_from_slice(&schedule.system_dependencies);

for (system_index, dependencies) in state.num_dependencies_remaining.iter_mut().enumerate()
{
if *dependencies == 0 {
state.ready_systems.insert(system_index);
}
}
debug_assert!(state.ready_systems.is_clear());
state.ready_systems.union_with(&self.starting_systems);

// If stepping is enabled, make sure we skip those systems that should
// not be run.
Expand Down Expand Up @@ -290,7 +284,6 @@ impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
.system_completion
.push(SystemResult {
system_index,
success: res.is_ok(),
})
.unwrap_or_else(|error| unreachable!("{}", error));
if let Err(payload) = res {
Expand Down Expand Up @@ -332,6 +325,7 @@ impl MultiThreadedExecutor {
Self {
state: Mutex::new(ExecutorState::new()),
system_completion: ConcurrentQueue::unbounded(),
starting_systems: FixedBitSet::new(),
apply_final_deferred: true,
panic_payload: Mutex::new(None),
#[cfg(feature = "trace")]
Expand All @@ -344,7 +338,6 @@ impl ExecutorState {
fn new() -> Self {
Self {
system_task_metadata: Vec::new(),
num_systems: 0,
num_running_systems: 0,
num_completed_systems: 0,
num_dependencies_remaining: Vec::new(),
Expand All @@ -358,7 +351,6 @@ impl ExecutorState {
skipped_systems: FixedBitSet::new(),
completed_systems: FixedBitSet::new(),
unapplied_systems: FixedBitSet::new(),
stop_spawning: false,
}
}

Expand Down Expand Up @@ -409,7 +401,7 @@ impl ExecutorState {
ready_systems.union_with(&self.ready_systems);

for system_index in ready_systems.ones() {
assert!(!self.running_systems.contains(system_index));
debug_assert!(!self.running_systems.contains(system_index));
// SAFETY: Caller assured that these systems are not running.
// Therefore, no other reference to this system exists and there is no aliasing.
let system = unsafe { &mut *context.environment.systems[system_index].get() };
Expand Down Expand Up @@ -668,7 +660,7 @@ impl ExecutorState {
fn finish_system_and_handle_dependents(&mut self, result: SystemResult) {
let SystemResult {
system_index,
success,
..
} = result;

if self.system_task_metadata[system_index].is_exclusive {
Expand All @@ -687,10 +679,6 @@ impl ExecutorState {
self.unapplied_systems.insert(system_index);

self.signal_dependents(system_index);

if !success {
self.stop_spawning_systems();
}
}

fn skip_system_and_signal_dependents(&mut self, system_index: usize) {
Expand All @@ -710,13 +698,6 @@ impl ExecutorState {
}
}

fn stop_spawning_systems(&mut self) {
if !self.stop_spawning {
self.num_systems = self.num_completed_systems + self.num_running_systems;
self.stop_spawning = true;
}
}

fn rebuild_active_access(&mut self) {
self.active_access.clear();
for index in self.running_systems.ones() {
Expand Down