Skip to content

Commit ff7b54d

Browse files
committed
refactor: move JobState to job_state.rs
Also add two new methods: - `JobState::new` to create new `JobState` - `JobState::run_to_finish` is extracted from old [`doit`][1]. [1]: https://github.com/rust-lang/cargo/blob/7ddcf0fe3348b7e0f4ad4a730eab60a20638ef28/src/cargo/core/compiler/job_queue.rs#L1122-L1168
1 parent 69f4b5c commit ff7b54d

File tree

2 files changed

+221
-201
lines changed

2 files changed

+221
-201
lines changed
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
//! See [`JobState`].
2+
3+
use std::{cell::Cell, marker, sync::Arc};
4+
5+
use cargo_util::ProcessBuilder;
6+
7+
use crate::core::compiler::context::OutputFile;
8+
use crate::core::compiler::future_incompat::FutureBreakageItem;
9+
use crate::util::Queue;
10+
use crate::CargoResult;
11+
12+
use super::{Artifact, DiagDedupe, Job, JobId, Message};
13+
14+
/// A `JobState` is constructed by `JobQueue::run` and passed to `Job::run`. It includes everything
15+
/// necessary to communicate between the main thread and the execution of the job.
16+
///
17+
/// The job may execute on either a dedicated thread or the main thread. If the job executes on the
18+
/// main thread, the `output` field must be set to prevent a deadlock.
19+
pub struct JobState<'a, 'cfg> {
20+
/// Channel back to the main thread to coordinate messages and such.
21+
///
22+
/// When the `output` field is `Some`, care must be taken to avoid calling `push_bounded` on
23+
/// the message queue to prevent a deadlock.
24+
messages: Arc<Queue<Message>>,
25+
26+
/// Normally output is sent to the job queue with backpressure. When the job is fresh
27+
/// however we need to immediately display the output to prevent a deadlock as the
28+
/// output messages are processed on the same thread as they are sent from. `output`
29+
/// defines where to output in this case.
30+
///
31+
/// Currently the `Shell` inside `Config` is wrapped in a `RefCell` and thus can't be passed
32+
/// between threads. This means that it isn't possible for multiple output messages to be
33+
/// interleaved. In the future, it may be wrapped in a `Mutex` instead. In this case
34+
/// interleaving is still prevented as the lock would be held for the whole printing of an
35+
/// output message.
36+
output: Option<&'a DiagDedupe<'cfg>>,
37+
38+
/// The job id that this state is associated with, used when sending
39+
/// messages back to the main thread.
40+
id: JobId,
41+
42+
/// Whether or not we're expected to have a call to `rmeta_produced`. Once
43+
/// that method is called this is dynamically set to `false` to prevent
44+
/// sending a double message later on.
45+
rmeta_required: Cell<bool>,
46+
47+
// Historical versions of Cargo made use of the `'a` argument here, so to
48+
// leave the door open to future refactorings keep it here.
49+
_marker: marker::PhantomData<&'a ()>,
50+
}
51+
52+
impl<'a, 'cfg> JobState<'a, 'cfg> {
53+
pub(super) fn new(
54+
id: JobId,
55+
messages: Arc<Queue<Message>>,
56+
output: Option<&'a DiagDedupe<'cfg>>,
57+
rmeta_required: bool,
58+
) -> Self {
59+
Self {
60+
id,
61+
messages,
62+
output,
63+
rmeta_required: Cell::new(rmeta_required),
64+
_marker: marker::PhantomData,
65+
}
66+
}
67+
68+
pub fn running(&self, cmd: &ProcessBuilder) {
69+
self.messages.push(Message::Run(self.id, cmd.to_string()));
70+
}
71+
72+
pub fn build_plan(
73+
&self,
74+
module_name: String,
75+
cmd: ProcessBuilder,
76+
filenames: Arc<Vec<OutputFile>>,
77+
) {
78+
self.messages
79+
.push(Message::BuildPlanMsg(module_name, cmd, filenames));
80+
}
81+
82+
pub fn stdout(&self, stdout: String) -> CargoResult<()> {
83+
if let Some(dedupe) = self.output {
84+
writeln!(dedupe.config.shell().out(), "{}", stdout)?;
85+
} else {
86+
self.messages.push_bounded(Message::Stdout(stdout));
87+
}
88+
Ok(())
89+
}
90+
91+
pub fn stderr(&self, stderr: String) -> CargoResult<()> {
92+
if let Some(dedupe) = self.output {
93+
let mut shell = dedupe.config.shell();
94+
shell.print_ansi_stderr(stderr.as_bytes())?;
95+
shell.err().write_all(b"\n")?;
96+
} else {
97+
self.messages.push_bounded(Message::Stderr(stderr));
98+
}
99+
Ok(())
100+
}
101+
102+
/// See [`Message::Diagnostic`] and [`Message::WarningCount`].
103+
pub fn emit_diag(&self, level: String, diag: String, fixable: bool) -> CargoResult<()> {
104+
if let Some(dedupe) = self.output {
105+
let emitted = dedupe.emit_diag(&diag)?;
106+
if level == "warning" {
107+
self.messages.push(Message::WarningCount {
108+
id: self.id,
109+
emitted,
110+
fixable,
111+
});
112+
}
113+
} else {
114+
self.messages.push_bounded(Message::Diagnostic {
115+
id: self.id,
116+
level,
117+
diag,
118+
fixable,
119+
});
120+
}
121+
Ok(())
122+
}
123+
124+
/// See [`Message::Warning`].
125+
pub fn warning(&self, warning: String) -> CargoResult<()> {
126+
self.messages.push_bounded(Message::Warning {
127+
id: self.id,
128+
warning,
129+
});
130+
Ok(())
131+
}
132+
133+
/// A method used to signal to the coordinator thread that the rmeta file
134+
/// for an rlib has been produced. This is only called for some rmeta
135+
/// builds when required, and can be called at any time before a job ends.
136+
/// This should only be called once because a metadata file can only be
137+
/// produced once!
138+
pub fn rmeta_produced(&self) {
139+
self.rmeta_required.set(false);
140+
self.messages
141+
.push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
142+
}
143+
144+
/// Drives a [`Job`] to finish. This ensures that a [`Message::Finish`] is
145+
/// sent even if our job panics.
146+
pub(super) fn run_to_finish(self, job: Job) {
147+
let mut sender = FinishOnDrop {
148+
messages: &self.messages,
149+
id: self.id,
150+
result: None,
151+
};
152+
sender.result = Some(job.run(&self));
153+
154+
// If the `rmeta_required` wasn't consumed but it was set
155+
// previously, then we either have:
156+
//
157+
// 1. The `job` didn't do anything because it was "fresh".
158+
// 2. The `job` returned an error and didn't reach the point where
159+
// it called `rmeta_produced`.
160+
// 3. We forgot to call `rmeta_produced` and there's a bug in Cargo.
161+
//
162+
// Ruling out the third, the other two are pretty common for 2
163+
// we'll just naturally abort the compilation operation but for 1
164+
// we need to make sure that the metadata is flagged as produced so
165+
// send a synthetic message here.
166+
if self.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() {
167+
self.messages
168+
.push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
169+
}
170+
171+
// Use a helper struct with a `Drop` implementation to guarantee
172+
// that a `Finish` message is sent even if our job panics. We
173+
// shouldn't panic unless there's a bug in Cargo, so we just need
174+
// to make sure nothing hangs by accident.
175+
struct FinishOnDrop<'a> {
176+
messages: &'a Queue<Message>,
177+
id: JobId,
178+
result: Option<CargoResult<()>>,
179+
}
180+
181+
impl Drop for FinishOnDrop<'_> {
182+
fn drop(&mut self) {
183+
let result = self
184+
.result
185+
.take()
186+
.unwrap_or_else(|| Err(anyhow::format_err!("worker panicked")));
187+
self.messages
188+
.push(Message::Finish(self.id, Artifact::All, result));
189+
}
190+
}
191+
}
192+
193+
pub fn future_incompat_report(&self, report: Vec<FutureBreakageItem>) {
194+
self.messages
195+
.push(Message::FutureIncompatReport(self.id, report));
196+
}
197+
198+
/// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
199+
/// on the passed client.
200+
///
201+
/// This should arrange for the associated client to eventually get a token via
202+
/// `client.release_raw()`.
203+
pub fn will_acquire(&self) {
204+
self.messages.push(Message::NeedsToken(self.id));
205+
}
206+
207+
/// The rustc underlying this Job is informing us that it is done with a jobserver token.
208+
///
209+
/// Note that it does *not* write that token back anywhere.
210+
pub fn release_token(&self) {
211+
self.messages.push(Message::ReleaseToken(self.id));
212+
}
213+
}

0 commit comments

Comments
 (0)