Skip to content

Commit 1441a1f

Browse files
committed
move MainThreadExecutor to schedule_v3
1 parent 3632076 commit 1441a1f

File tree

4 files changed

+74
-60
lines changed

4 files changed

+74
-60
lines changed

crates/bevy_ecs/src/schedule/executor_parallel.rs

+6-16
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
1-
use std::sync::Arc;
2-
3-
use crate as bevy_ecs;
41
use crate::{
52
archetype::ArchetypeComponentId,
63
query::Access,
74
schedule::{ParallelSystemExecutor, SystemContainer},
8-
system::Resource,
5+
schedule_v3::MainThreadExecutor,
96
world::World,
107
};
118
use async_channel::{Receiver, Sender};
12-
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
9+
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool};
1310
#[cfg(feature = "trace")]
1411
use bevy_utils::tracing::Instrument;
1512
use event_listener::Event;
@@ -18,16 +15,6 @@ use fixedbitset::FixedBitSet;
1815
#[cfg(test)]
1916
use scheduling_event::*;
2017

21-
/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread
22-
#[derive(Resource, Default, Clone)]
23-
pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);
24-
25-
impl MainThreadExecutor {
26-
pub fn new() -> Self {
27-
MainThreadExecutor(Arc::new(ThreadExecutor::new()))
28-
}
29-
}
30-
3118
struct SystemSchedulingMetadata {
3219
/// Used to signal the system's task to start the system.
3320
start: Event,
@@ -138,7 +125,10 @@ impl ParallelSystemExecutor for ParallelExecutor {
138125
}
139126
}
140127

141-
let thread_executor = world.get_resource::<MainThreadExecutor>().map(|e| &*e.0);
128+
let thread_executor = world
129+
.get_resource::<MainThreadExecutor>()
130+
.map(|e| e.0.clone());
131+
let thread_executor = thread_executor.as_deref();
142132

143133
ComputeTaskPool::init(TaskPool::default).scope_with_executor(
144134
false,

crates/bevy_ecs/src/schedule_v3/executor/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ mod multi_threaded;
22
mod simple;
33
mod single_threaded;
44

5-
pub use self::multi_threaded::MultiThreadedExecutor;
5+
pub use self::multi_threaded::{MainThreadExecutor, MultiThreadedExecutor};
66
pub use self::simple::SimpleExecutor;
77
pub use self::single_threaded::SingleThreadedExecutor;
88

crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs

+65-42
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
1-
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool};
1+
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
22
use bevy_utils::default;
33
use bevy_utils::syncunsafecell::SyncUnsafeCell;
44
#[cfg(feature = "trace")]
55
use bevy_utils::tracing::{info_span, Instrument};
6+
use std::sync::Arc;
67

78
use async_channel::{Receiver, Sender};
89
use fixedbitset::FixedBitSet;
910

1011
use crate::{
1112
archetype::ArchetypeComponentId,
13+
prelude::Resource,
1214
query::Access,
1315
schedule_v3::{
1416
is_apply_system_buffers, BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule,
@@ -17,6 +19,8 @@ use crate::{
1719
world::World,
1820
};
1921

22+
use crate as bevy_ecs;
23+
2024
/// A funky borrow split of [`SystemSchedule`] required by the [`MultiThreadedExecutor`].
2125
struct SyncUnsafeSchedule<'a> {
2226
systems: &'a [SyncUnsafeCell<BoxedSystem>],
@@ -145,60 +149,69 @@ impl SystemExecutor for MultiThreadedExecutor {
145149
}
146150
}
147151

152+
let thread_executor = world
153+
.get_resource::<MainThreadExecutor>()
154+
.map(|e| e.0.clone());
155+
let thread_executor = thread_executor.as_deref();
156+
148157
let world = SyncUnsafeCell::from_mut(world);
149158
let SyncUnsafeSchedule {
150159
systems,
151160
mut conditions,
152161
} = SyncUnsafeSchedule::new(schedule);
153162

154-
ComputeTaskPool::init(TaskPool::default).scope(|scope| {
155-
// the executor itself is a `Send` future so that it can run
156-
// alongside systems that claim the local thread
157-
let executor = async {
158-
while self.num_completed_systems < num_systems {
159-
// SAFETY: self.ready_systems does not contain running systems
160-
unsafe {
161-
self.spawn_system_tasks(scope, systems, &mut conditions, world);
162-
}
163-
164-
if self.num_running_systems > 0 {
165-
// wait for systems to complete
166-
let index = self
167-
.receiver
168-
.recv()
169-
.await
170-
.unwrap_or_else(|error| unreachable!("{}", error));
163+
ComputeTaskPool::init(TaskPool::default).scope_with_executor(
164+
false,
165+
thread_executor,
166+
|scope| {
167+
// the executor itself is a `Send` future so that it can run
168+
// alongside systems that claim the local thread
169+
let executor = async {
170+
while self.num_completed_systems < num_systems {
171+
// SAFETY: self.ready_systems does not contain running systems
172+
unsafe {
173+
self.spawn_system_tasks(scope, systems, &mut conditions, world);
174+
}
171175

172-
self.finish_system_and_signal_dependents(index);
176+
if self.num_running_systems > 0 {
177+
// wait for systems to complete
178+
let index = self
179+
.receiver
180+
.recv()
181+
.await
182+
.unwrap_or_else(|error| unreachable!("{}", error));
173183

174-
while let Ok(index) = self.receiver.try_recv() {
175184
self.finish_system_and_signal_dependents(index);
176-
}
177185

178-
self.rebuild_active_access();
186+
while let Ok(index) = self.receiver.try_recv() {
187+
self.finish_system_and_signal_dependents(index);
188+
}
189+
190+
self.rebuild_active_access();
191+
}
179192
}
180-
}
181193

182-
// SAFETY: all systems have completed
183-
let world = unsafe { &mut *world.get() };
184-
apply_system_buffers(&self.unapplied_systems, systems, world);
185-
self.unapplied_systems.clear();
186-
debug_assert!(self.unapplied_systems.is_clear());
187-
188-
debug_assert!(self.ready_systems.is_clear());
189-
debug_assert!(self.running_systems.is_clear());
190-
self.active_access.clear();
191-
self.evaluated_sets.clear();
192-
self.skipped_systems.clear();
193-
self.completed_systems.clear();
194-
};
194+
// SAFETY: all systems have completed
195+
let world = unsafe { &mut *world.get() };
196+
apply_system_buffers(&self.unapplied_systems, systems, world);
197+
self.unapplied_systems.clear();
198+
debug_assert!(self.unapplied_systems.is_clear());
195199

196-
#[cfg(feature = "trace")]
197-
let executor_span = info_span!("schedule_task");
198-
#[cfg(feature = "trace")]
199-
let executor = executor.instrument(executor_span);
200-
scope.spawn(executor);
201-
});
200+
debug_assert!(self.ready_systems.is_clear());
201+
debug_assert!(self.running_systems.is_clear());
202+
self.active_access.clear();
203+
self.evaluated_sets.clear();
204+
self.skipped_systems.clear();
205+
self.completed_systems.clear();
206+
};
207+
208+
#[cfg(feature = "trace")]
209+
let executor_span = info_span!("schedule_task");
210+
#[cfg(feature = "trace")]
211+
let executor = executor.instrument(executor_span);
212+
scope.spawn(executor);
213+
},
214+
);
202215
}
203216
}
204217

@@ -573,3 +586,13 @@ fn evaluate_and_fold_conditions(conditions: &mut [BoxedCondition], world: &World
573586
})
574587
.fold(true, |acc, res| acc && res)
575588
}
589+
590+
/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread
591+
#[derive(Resource, Default, Clone)]
592+
pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);
593+
594+
impl MainThreadExecutor {
595+
pub fn new() -> Self {
596+
MainThreadExecutor(Arc::new(ThreadExecutor::new()))
597+
}
598+
}

crates/bevy_render/src/pipelined_rendering.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ use async_channel::{Receiver, Sender};
22

33
use bevy_app::{App, AppLabel, Plugin, SubApp};
44
use bevy_ecs::{
5-
schedule::{MainThreadExecutor, StageLabel, SystemStage},
5+
schedule::{StageLabel, SystemStage},
6+
schedule_v3::MainThreadExecutor,
67
system::Resource,
78
world::{Mut, World},
89
};

0 commit comments

Comments
 (0)