Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion compiler/rustc_data_structures/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ pub use self::freeze::{FreezeLock, FreezeReadGuard, FreezeWriteGuard};
pub use self::lock::{Lock, LockGuard, Mode};
pub use self::mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode};
pub use self::parallel::{
broadcast, join, par_for_each_in, par_map, parallel_guard, scope, spawn, try_par_for_each_in,
broadcast, par_fns, par_for_each_in, par_join, par_map, parallel_guard, spawn,
try_par_for_each_in,
};
pub use self::vec::{AppendOnlyIndexVec, AppendOnlyVec};
pub use self::worker_local::{Registry, WorkerLocal};
Expand Down
78 changes: 34 additions & 44 deletions compiler/rustc_data_structures/src/sync/parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,41 +56,6 @@ where
(a.unwrap(), b.unwrap())
}

/// Runs a list of blocks in parallel. The first block is executed immediately on
/// the current thread. Use that for the longest running block.
#[macro_export]
macro_rules! parallel {
(impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => {
parallel!(impl $fblock [$block, $($c,)*] [$($rest),*])
};
(impl $fblock:block [$($blocks:expr,)*] []) => {
$crate::sync::parallel_guard(|guard| {
$crate::sync::scope(|s| {
$(
let block = $crate::sync::FromDyn::from(|| $blocks);
s.spawn(move |_| {
guard.run(move || block.into_inner()());
});
)*
guard.run(|| $fblock);
});
});
};
($fblock:block, $($blocks:block),*) => {
if $crate::sync::is_dyn_thread_safe() {
// Reverse the order of the later blocks since Rayon executes them in reverse order
// when using a single thread. This ensures the execution order matches that
// of a single threaded rustc.
parallel!(impl $fblock [] [$($blocks),*]);
} else {
$crate::sync::parallel_guard(|guard| {
guard.run(|| $fblock);
$(guard.run(|| $blocks);)*
});
}
};
}

pub fn spawn(func: impl FnOnce() + DynSend + 'static) {
if mode::is_dyn_thread_safe() {
let func = FromDyn::from(func);
Expand All @@ -102,18 +67,43 @@ pub fn spawn(func: impl FnOnce() + DynSend + 'static) {
}
}

// This function only works when `mode::is_dyn_thread_safe()`.
pub fn scope<'scope, OP, R>(op: OP) -> R
where
OP: FnOnce(&rustc_thread_pool::Scope<'scope>) -> R + DynSend,
R: DynSend,
{
let op = FromDyn::from(op);
rustc_thread_pool::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
/// Runs the functions in parallel.
///
/// The first function is executed immediately on the current thread.
/// Use that for the longest running function for better scheduling.
pub fn par_fns(funcs: &mut [&mut (dyn FnMut() + DynSend)]) {
parallel_guard(|guard: &ParallelGuard| {
if mode::is_dyn_thread_safe() {
let funcs = FromDyn::from(funcs);
rustc_thread_pool::scope(|s| {
let Some((first, rest)) = funcs.into_inner().split_at_mut_checked(1) else {
return;
};

// Reverse the order of the later functions since Rayon executes them in reverse
// order when using a single thread. This ensures the execution order matches
// that of a single threaded rustc.
for f in rest.iter_mut().rev() {
let f = FromDyn::from(f);
s.spawn(|_| {
guard.run(|| (f.into_inner())());
});
}

// Run the first function without spawning to
// ensure it executes immediately on this thread.
guard.run(|| first[0]());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth a brief comment about how/why the first function is run on the current thread, to avoid an unnecessary spawn.

});
} else {
for f in funcs {
guard.run(|| f());
}
}
});
}

#[inline]
pub fn join<A, B, RA: DynSend, RB: DynSend>(oper_a: A, oper_b: B) -> (RA, RB)
pub fn par_join<A, B, RA: DynSend, RB: DynSend>(oper_a: A, oper_b: B) -> (RA, RB)
where
A: FnOnce() -> RA + DynSend,
B: FnOnce() -> RB + DynSend,
Expand Down
4 changes: 2 additions & 2 deletions compiler/rustc_incremental/src/persist/save.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fs;
use std::sync::Arc;

use rustc_data_structures::fx::FxIndexMap;
use rustc_data_structures::sync::join;
use rustc_data_structures::sync::par_join;
use rustc_middle::dep_graph::{
DepGraph, SerializedDepGraph, WorkProduct, WorkProductId, WorkProductMap,
};
Expand Down Expand Up @@ -44,7 +44,7 @@ pub(crate) fn save_dep_graph(tcx: TyCtxt<'_>) {
sess.time("assert_dep_graph", || assert_dep_graph(tcx));
sess.time("check_clean", || clean::check_clean_annotations(tcx));

join(
par_join(
move || {
sess.time("incr_comp_persist_dep_graph", || {
if let Err(err) = fs::rename(&staging_dep_graph_path, &dep_graph_path) {
Expand Down
40 changes: 20 additions & 20 deletions compiler/rustc_interface/src/passes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use rustc_codegen_ssa::{CodegenResults, CrateInfo};
use rustc_data_structures::indexmap::IndexMap;
use rustc_data_structures::jobserver::Proxy;
use rustc_data_structures::steal::Steal;
use rustc_data_structures::sync::{AppendOnlyIndexVec, FreezeLock, WorkerLocal};
use rustc_data_structures::{parallel, thousands};
use rustc_data_structures::sync::{AppendOnlyIndexVec, FreezeLock, WorkerLocal, par_fns};
use rustc_data_structures::thousands;
use rustc_errors::timings::TimingSection;
use rustc_expand::base::{ExtCtxt, LintStoreExpand};
use rustc_feature::Features;
Expand Down Expand Up @@ -1052,8 +1052,8 @@ fn run_required_analyses(tcx: TyCtxt<'_>) {

let sess = tcx.sess;
sess.time("misc_checking_1", || {
parallel!(
{
par_fns(&mut [
&mut || {
sess.time("looking_for_entry_point", || tcx.ensure_ok().entry_fn(()));
sess.time("check_externally_implementable_items", || {
tcx.ensure_ok().check_externally_implementable_items(())
Expand All @@ -1065,22 +1065,22 @@ fn run_required_analyses(tcx: TyCtxt<'_>) {

CStore::from_tcx(tcx).report_unused_deps(tcx);
},
{
&mut || {
tcx.ensure_ok().exportable_items(LOCAL_CRATE);
tcx.ensure_ok().stable_order_of_exportable_impls(LOCAL_CRATE);
tcx.par_hir_for_each_module(|module| {
tcx.ensure_ok().check_mod_attrs(module);
tcx.ensure_ok().check_mod_unstable_api_usage(module);
});
},
{
&mut || {
// We force these queries to run,
// since they might not otherwise get called.
// This marks the corresponding crate-level attributes
// as used, and ensures that their values are valid.
tcx.ensure_ok().limits(());
}
);
},
]);
});

rustc_hir_analysis::check_crate(tcx);
Expand Down Expand Up @@ -1156,39 +1156,39 @@ fn analysis(tcx: TyCtxt<'_>, (): ()) {
}

sess.time("misc_checking_3", || {
parallel!(
{
par_fns(&mut [
&mut || {
tcx.ensure_ok().effective_visibilities(());

parallel!(
{
par_fns(&mut [
&mut || {
tcx.par_hir_for_each_module(|module| {
tcx.ensure_ok().check_private_in_public(module)
})
},
{
&mut || {
tcx.par_hir_for_each_module(|module| {
tcx.ensure_ok().check_mod_deathness(module)
});
},
{
&mut || {
sess.time("lint_checking", || {
rustc_lint::check_crate(tcx);
});
},
{
&mut || {
tcx.ensure_ok().clashing_extern_declarations(());
}
);
},
]);
},
{
&mut || {
sess.time("privacy_checking_modules", || {
tcx.par_hir_for_each_module(|module| {
tcx.ensure_ok().check_mod_privacy(module);
});
});
}
);
},
]);

// This check has to be run after all lints are done processing. We don't
// define a lint filter, as all lint checks should have finished at this point.
Expand Down
4 changes: 2 additions & 2 deletions compiler/rustc_lint/src/late.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::any::Any;
use std::cell::Cell;

use rustc_data_structures::stack::ensure_sufficient_stack;
use rustc_data_structures::sync::join;
use rustc_data_structures::sync::par_join;
use rustc_hir::def_id::{LocalDefId, LocalModDefId};
use rustc_hir::{self as hir, AmbigArg, HirId, intravisit as hir_visit};
use rustc_middle::hir::nested_filter;
Expand Down Expand Up @@ -461,7 +461,7 @@ fn late_lint_crate_inner<'tcx, T: LateLintPass<'tcx>>(

/// Performs lint checking on a crate.
pub fn check_crate<'tcx>(tcx: TyCtxt<'tcx>) {
join(
par_join(
|| {
tcx.sess.time("crate_lints", || {
// Run whole crate non-incremental lints
Expand Down
4 changes: 2 additions & 2 deletions compiler/rustc_metadata/src/rmeta/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;

use rustc_data_structures::fx::{FxIndexMap, FxIndexSet};
use rustc_data_structures::memmap::{Mmap, MmapMut};
use rustc_data_structures::sync::{join, par_for_each_in};
use rustc_data_structures::sync::{par_for_each_in, par_join};
use rustc_data_structures::temp_dir::MaybeTempDir;
use rustc_data_structures::thousands::usize_with_underscores;
use rustc_feature::Features;
Expand Down Expand Up @@ -2458,7 +2458,7 @@ pub fn encode_metadata(tcx: TyCtxt<'_>, path: &Path, ref_path: Option<&Path>) {
// Prefetch some queries used by metadata encoding.
// This is not necessary for correctness, but is only done for performance reasons.
// It can be removed if it turns out to cause trouble or be detrimental to performance.
join(
par_join(
|| prefetch_mir(tcx),
|| {
let _ = tcx.exported_non_generic_symbols(LOCAL_CRATE);
Expand Down
4 changes: 2 additions & 2 deletions compiler/rustc_monomorphize/src/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ use std::io::Write;
use std::path::{Path, PathBuf};

use rustc_data_structures::fx::{FxIndexMap, FxIndexSet};
use rustc_data_structures::sync;
use rustc_data_structures::sync::par_join;
use rustc_data_structures::unord::{UnordMap, UnordSet};
use rustc_hir::LangItem;
use rustc_hir::attrs::{InlineAttr, Linkage};
Expand Down Expand Up @@ -1145,7 +1145,7 @@ fn collect_and_partition_mono_items(tcx: TyCtxt<'_>, (): ()) -> MonoItemPartitio
tcx.dcx().abort_if_errors();

let (codegen_units, _) = tcx.sess.time("partition_and_assert_distinct_symbols", || {
sync::join(
par_join(
|| {
let mut codegen_units = partition(tcx, items.iter().copied(), &usage_map);
codegen_units[0].make_primary();
Expand Down
Loading