Skip to content

Commit 1814fe7

Browse files
committed
Auto merge of #681 - Mark-Simulacrum:toolchain-per-worker, r=Mark-Simulacrum
Distinct build directories per toolchain We see a number of failures from build scripts or tests failing when run in the same directory. These are 99% likely to be bugs in that crate, but Crater's goal isn't to find bugs in crates but bugs/regressions in rustc, so we should try to avoid sharing a build directory as we did before. In practice this additional level of nesting should be minimally impactful in terms of cache hits, as caching across toolchains wasn't possible anyway. It may even improve things. Our purging algorithm is the same as before.
2 parents 74414da + c58a1fc commit 1814fe7

File tree

5 files changed

+151
-169
lines changed

5 files changed

+151
-169
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ base64 = "0.20.0"
1313
bytes = "1"
1414
chrono = { version = "0.4", features = ["serde"] }
1515
crates-index = "0.18"
16-
crossbeam-utils = "0.8"
1716
crossbeam-channel = "0.5"
1817
csv = "1.0.2"
1918
docsrs-metadata = { git = "https://github.com/rust-lang/docs.rs/" }

src/runner/mod.rs

Lines changed: 19 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ use crate::experiments::{Experiment, Mode};
99
use crate::prelude::*;
1010
use crate::results::{TestResult, WriteResults};
1111
use crate::runner::worker::{DiskSpaceWatcher, Worker};
12-
use crossbeam_utils::thread::{scope, ScopedJoinHandle};
1312
use rustwide::logging::LogStorage;
1413
use rustwide::Workspace;
1514
use std::collections::HashMap;
1615
use std::sync::Mutex;
16+
use std::thread::scope;
1717
use std::time::Duration;
1818

1919
const DISK_SPACE_WATCHER_INTERVAL: Duration = Duration::from_secs(30);
@@ -107,7 +107,6 @@ pub fn run_ex<DB: WriteResults + Sync>(
107107
info!("running tasks in {} threads...", threads_count);
108108

109109
let state = RunnerState::new();
110-
111110
let workers = (0..threads_count)
112111
.map(|i| {
113112
Worker::new(
@@ -128,68 +127,33 @@ pub fn run_ex<DB: WriteResults + Sync>(
128127
&workers,
129128
);
130129

131-
let r = scope(|scope| -> Fallible<()> {
132-
let mut threads = Vec::new();
133-
134-
for worker in &workers {
135-
let join =
136-
scope
137-
.builder()
130+
scope(|scope1| {
131+
std::thread::Builder::new()
132+
.name("disk-space-watcher".into())
133+
.spawn_scoped(scope1, || {
134+
disk_watcher.run();
135+
})
136+
.unwrap();
137+
138+
scope(|scope| {
139+
for worker in &workers {
140+
std::thread::Builder::new()
138141
.name(worker.name().into())
139-
.spawn(move |_| -> Fallible<()> {
142+
.spawn_scoped(scope, move || -> Fallible<()> {
140143
match worker.run() {
141144
Ok(()) => Ok(()),
142145
Err(r) => {
143146
log::warn!("worker {} failed: {:?}", worker.name(), r);
144147
Err(r)
145148
}
146149
}
147-
})?;
148-
threads.push(join);
149-
}
150-
let disk_watcher_thread =
151-
scope
152-
.builder()
153-
.name("disk-space-watcher".into())
154-
.spawn(|_| {
155-
disk_watcher.run();
156-
Ok(())
157-
})?;
158-
159-
let clean_exit = join_threads(threads.into_iter());
160-
disk_watcher.stop();
161-
let disk_watcher_clean_exit = join_threads(std::iter::once(disk_watcher_thread));
150+
})
151+
.unwrap();
152+
}
153+
});
162154

163-
if clean_exit && disk_watcher_clean_exit {
164-
Ok(())
165-
} else {
166-
bail!("some threads returned an error");
167-
}
155+
disk_watcher.stop();
168156
});
169157

170-
match r {
171-
Ok(r) => r,
172-
Err(panic) => std::panic::resume_unwind(panic),
173-
}
174-
}
175-
176-
fn join_threads<'a, I>(iter: I) -> bool
177-
where
178-
I: Iterator<Item = ScopedJoinHandle<'a, Fallible<()>>>,
179-
{
180-
let mut clean_exit = true;
181-
for thread in iter {
182-
match thread.join() {
183-
Ok(Ok(())) => {}
184-
Ok(Err(err)) => {
185-
crate::utils::report_failure(&err);
186-
clean_exit = false;
187-
}
188-
Err(panic) => {
189-
crate::utils::report_panic(&panic);
190-
clean_exit = false;
191-
}
192-
}
193-
}
194-
clean_exit
158+
Ok(())
195159
}

src/runner/tasks.rs

Lines changed: 117 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::runner::{test, RunnerState};
88
use crate::toolchain::Toolchain;
99
use crate::utils;
1010
use rustwide::{Build, BuildDirectory, Workspace};
11+
use std::collections::HashMap;
1112
use std::sync::Mutex;
1213

1314
use rustwide::logging::{self, LogStorage};
@@ -143,126 +144,133 @@ impl Task {
143144
&'s self,
144145
config: &'ctx Config,
145146
workspace: &Workspace,
146-
build_dir: &'ctx Mutex<BuildDirectory>,
147+
build_dir: &'ctx HashMap<&'ctx crate::toolchain::Toolchain, Mutex<BuildDirectory>>,
147148
ex: &'ctx Experiment,
148149
db: &'ctx DB,
149150
state: &'ctx RunnerState,
150151
) -> Fallible<()> {
151-
let (action, test, toolchain, quiet): (_, fn(&TaskCtx<_>, &Build, &_) -> _, _, _) =
152-
match self.step {
153-
TaskStep::BuildAndTest { ref tc, quiet } => {
154-
("testing", test::test_build_and_test, tc, quiet)
155-
}
156-
TaskStep::BuildOnly { ref tc, quiet } => {
157-
("building", test::test_build_only, tc, quiet)
158-
}
159-
TaskStep::CheckOnly { ref tc, quiet } => {
160-
("checking", test::test_check_only, tc, quiet)
161-
}
162-
TaskStep::Clippy { ref tc, quiet } => {
163-
("linting", test::test_clippy_only, tc, quiet)
164-
}
165-
TaskStep::Rustdoc { ref tc, quiet } => {
166-
("documenting", test::test_rustdoc, tc, quiet)
167-
}
168-
TaskStep::UnstableFeatures { ref tc } => (
169-
"checking unstable",
170-
crate::runner::unstable_features::find_unstable_features,
171-
tc,
172-
false,
173-
),
174-
TaskStep::Cleanup => {
175-
// Remove stored logs
176-
state.lock().prepare_logs.remove(&self.krate);
177-
return Ok(());
178-
}
179-
TaskStep::Prepare => {
180-
let storage = LogStorage::from(config);
181-
state
182-
.lock()
183-
.prepare_logs
184-
.insert(self.krate.clone(), storage.clone());
185-
logging::capture(&storage, || {
186-
let rustwide_crate = self.krate.to_rustwide();
187-
for attempt in 1..=15 {
188-
match detect_broken(rustwide_crate.fetch(workspace)) {
189-
Ok(()) => break,
190-
Err(e) => {
191-
if storage.to_string().contains("No space left on device") {
192-
if attempt == 15 {
193-
// If we've failed 15 times, then
194-
// just give up. It's been at least
195-
// 45 seconds, which is enough that
196-
// our disk space check should
197-
// have run at least once in this
198-
// time. If that's not helped, then
199-
// maybe this git repository *is*
200-
// actually too big.
201-
//
202-
// Ideally we'd have some kind of
203-
// per-worker counter and if we hit
204-
// this too often we'd replace the
205-
// machine, but it's not very clear
206-
// what "too often" means here.
207-
return Err(e);
208-
} else {
209-
log::warn!(
210-
"Retrying crate fetch in 3 seconds (attempt {})",
211-
attempt
212-
);
213-
std::thread::sleep(std::time::Duration::from_secs(3));
214-
}
215-
} else {
152+
let (build_dir, action, test, toolchain, quiet): (
153+
_,
154+
_,
155+
fn(&TaskCtx<_>, &Build, &_) -> _,
156+
_,
157+
_,
158+
) = match self.step {
159+
TaskStep::BuildAndTest { ref tc, quiet } => (
160+
&build_dir[tc],
161+
"testing",
162+
test::test_build_and_test,
163+
tc,
164+
quiet,
165+
),
166+
TaskStep::BuildOnly { ref tc, quiet } => {
167+
(&build_dir[tc], "building", test::test_build_only, tc, quiet)
168+
}
169+
TaskStep::CheckOnly { ref tc, quiet } => {
170+
(&build_dir[tc], "checking", test::test_check_only, tc, quiet)
171+
}
172+
TaskStep::Clippy { ref tc, quiet } => {
173+
(&build_dir[tc], "linting", test::test_clippy_only, tc, quiet)
174+
}
175+
TaskStep::Rustdoc { ref tc, quiet } => {
176+
(&build_dir[tc], "documenting", test::test_rustdoc, tc, quiet)
177+
}
178+
TaskStep::UnstableFeatures { ref tc } => (
179+
&build_dir[tc],
180+
"checking unstable",
181+
crate::runner::unstable_features::find_unstable_features,
182+
tc,
183+
false,
184+
),
185+
TaskStep::Cleanup => {
186+
// Remove stored logs
187+
state.lock().prepare_logs.remove(&self.krate);
188+
return Ok(());
189+
}
190+
TaskStep::Prepare => {
191+
let storage = LogStorage::from(config);
192+
state
193+
.lock()
194+
.prepare_logs
195+
.insert(self.krate.clone(), storage.clone());
196+
logging::capture(&storage, || {
197+
let rustwide_crate = self.krate.to_rustwide();
198+
for attempt in 1..=15 {
199+
match detect_broken(rustwide_crate.fetch(workspace)) {
200+
Ok(()) => break,
201+
Err(e) => {
202+
if storage.to_string().contains("No space left on device") {
203+
if attempt == 15 {
204+
// If we've failed 15 times, then
205+
// just give up. It's been at least
206+
// 45 seconds, which is enough that
207+
// our disk space check should
208+
// have run at least once in this
209+
// time. If that's not helped, then
210+
// maybe this git repository *is*
211+
// actually too big.
212+
//
213+
// Ideally we'd have some kind of
214+
// per-worker counter and if we hit
215+
// this too often we'd replace the
216+
// machine, but it's not very clear
217+
// what "too often" means here.
216218
return Err(e);
219+
} else {
220+
log::warn!(
221+
"Retrying crate fetch in 3 seconds (attempt {})",
222+
attempt
223+
);
224+
std::thread::sleep(std::time::Duration::from_secs(3));
217225
}
226+
} else {
227+
return Err(e);
218228
}
219229
}
220230
}
231+
}
221232

222-
if let Crate::GitHub(repo) = &self.krate {
223-
if let Some(sha) = rustwide_crate.git_commit(workspace) {
224-
let updated = GitHubRepo {
225-
sha: Some(sha),
226-
..repo.clone()
227-
};
228-
db.update_crate_version(
229-
ex,
230-
&Crate::GitHub(repo.clone()),
231-
&Crate::GitHub(updated),
232-
)
233-
.with_context(|_| {
234-
format!(
235-
"failed to record the sha of GitHub repo {}",
236-
repo.slug()
237-
)
238-
})?;
239-
} else {
240-
bail!("unable to capture sha for {}", repo.slug());
241-
}
233+
if let Crate::GitHub(repo) = &self.krate {
234+
if let Some(sha) = rustwide_crate.git_commit(workspace) {
235+
let updated = GitHubRepo {
236+
sha: Some(sha),
237+
..repo.clone()
238+
};
239+
db.update_crate_version(
240+
ex,
241+
&Crate::GitHub(repo.clone()),
242+
&Crate::GitHub(updated),
243+
)
244+
.with_context(|_| {
245+
format!("failed to record the sha of GitHub repo {}", repo.slug())
246+
})?;
247+
} else {
248+
bail!("unable to capture sha for {}", repo.slug());
242249
}
243-
Ok(())
244-
})?;
245-
return Ok(());
246-
}
247-
TaskStep::Skip { ref tc } => {
248-
// If a skipped crate is somehow sent to the agent (for example, when a crate was
249-
// added to the experiment and *then* blacklisted) report the crate as skipped
250-
// instead of silently ignoring it.
251-
db.record_result(
252-
ex,
253-
tc,
254-
&self.krate,
255-
None,
256-
config,
257-
EncodingType::Plain,
258-
|| {
259-
warn!("crate skipped");
260-
Ok(TestResult::Skipped)
261-
},
262-
)?;
263-
return Ok(());
264-
}
265-
};
250+
}
251+
Ok(())
252+
})?;
253+
return Ok(());
254+
}
255+
TaskStep::Skip { ref tc } => {
256+
// If a skipped crate is somehow sent to the agent (for example, when a crate was
257+
// added to the experiment and *then* blacklisted) report the crate as skipped
258+
// instead of silently ignoring it.
259+
db.record_result(
260+
ex,
261+
tc,
262+
&self.krate,
263+
None,
264+
config,
265+
EncodingType::Plain,
266+
|| {
267+
warn!("crate skipped");
268+
Ok(TestResult::Skipped)
269+
},
270+
)?;
271+
return Ok(());
272+
}
273+
};
266274

267275
let ctx = TaskCtx::new(
268276
build_dir,

0 commit comments

Comments
 (0)