Skip to content

RFC of WIP: Concurrent Cargo #1764

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ time = "0.1"
toml = "0.1"
url = "0.2"
winapi = "0.1"
errno = "0.1.2"

[dependencies.file-lock]
git = "https://github.com/Byron/file-lock"
branch = "cargo-adjustments"

[dev-dependencies]
tempdir = "0.3"
Expand Down
2 changes: 2 additions & 0 deletions src/cargo/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ extern crate threadpool;
extern crate time;
extern crate toml;
extern crate url;
extern crate file_lock;
extern crate errno;

use std::env;
use std::error::Error;
Expand Down
37 changes: 29 additions & 8 deletions src/cargo/ops/cargo_rustc/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use std::sync::mpsc::{channel, Sender, Receiver};
use threadpool::ThreadPool;
use term::color::YELLOW;

use core::{Package, PackageId, Resolve, PackageSet};
use core::{Package, PackageId, Resolve, PackageSet, Target};
use util::{Config, DependencyQueue, Fresh, Dirty, Freshness};
use util::{CargoResult, Dependency, profile};
use util::{CargoResult, Dependency, profile, CargoLock, human, ChainError, short_hash};

use super::job::Job;

Expand All @@ -19,7 +19,7 @@ use super::job::Job;
pub struct JobQueue<'a> {
pool: ThreadPool,
queue: DependencyQueue<(&'a PackageId, Stage),
(&'a Package, Vec<(Job, Freshness)>)>,
(&'a Package, Vec<(Job, Freshness, Option<Target>)>)>,
tx: Sender<Message>,
rx: Receiver<Message>,
resolve: &'a Resolve,
Expand Down Expand Up @@ -81,7 +81,7 @@ impl<'a> JobQueue<'a> {
}

pub fn queue(&mut self, pkg: &'a Package, stage: Stage)
-> &mut Vec<(Job, Freshness)> {
-> &mut Vec<(Job, Freshness, Option<Target>)> {
self.pkgids.insert(pkg.package_id());
&mut self.queue.queue(&(self.resolve, self.packages), Fresh,
(pkg.package_id(), stage),
Expand Down Expand Up @@ -112,7 +112,7 @@ impl<'a> JobQueue<'a> {

// Now that all possible work has been scheduled, wait for a piece
// of work to finish. If any package fails to build then we stop
// scheduling work as quickly as possibly.
// scheduling work as quickly as possible.
let (id, stage, fresh, result) = self.rx.recv().unwrap();
info!(" end: {} {:?}", id, stage);
let id = *self.pkgids.iter().find(|&k| *k == &id).unwrap();
Expand Down Expand Up @@ -149,7 +149,7 @@ impl<'a> JobQueue<'a> {
/// freshness of all upstream dependencies. This function will schedule all
/// work in `jobs` to be executed.
fn run(&mut self, pkg: &'a Package, stage: Stage, fresh: Freshness,
jobs: Vec<(Job, Freshness)>, config: &Config) -> CargoResult<()> {
jobs: Vec<(Job, Freshness, Option<Target>)>, config: &Config) -> CargoResult<()> {
let njobs = jobs.len();
let amt = if njobs == 0 {1} else {njobs as u32};
let id = pkg.package_id().clone();
Expand All @@ -163,18 +163,39 @@ impl<'a> JobQueue<'a> {
fresh: fresh,
});

let lock_kind = try!(CargoLock::lock_kind(config));

let mut total_fresh = fresh;
let mut running = Vec::new();
debug!("start {:?} at {:?} for {}", total_fresh, stage, pkg);
for (job, job_freshness) in jobs.into_iter() {
for (job, job_freshness, target) in jobs.into_iter() {
let lock_path = config.target_dir(pkg)
.join("locks").join(&format!("{}-{}-{}-{}.{:?}.lock",
pkg.name(),
pkg.version(),
short_hash(&pkg),
short_hash(&target),
stage));
debug!("job: {:?} ({:?})", job_freshness, total_fresh);
let fresh = job_freshness.combine(fresh);
total_fresh = total_fresh.combine(fresh);
let my_tx = self.tx.clone();
let id = id.clone();
let (desc_tx, desc_rx) = channel();
let lock_kind = lock_kind.clone();
let lock_path = lock_path.clone();
self.pool.execute(move|| {
my_tx.send((id, stage, fresh, job.run(fresh, desc_tx))).unwrap();
let mut fl = CargoLock::new(lock_path, lock_kind);
let lock = fl.lock().chain_error(|| {
human(format!("Failed to lock {} at stage '{:?}'",
id, stage))
});

my_tx.send((id, stage, fresh, lock.and_then(|_| {
let r = job.run(fresh, desc_tx);
drop(fl);
r
}))).unwrap();
});
// only the first message of each job is processed
match desc_rx.recv() {
Expand Down
24 changes: 22 additions & 2 deletions src/cargo/ops/cargo_rustc/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use std::io;
use std::path::{PathBuf, Path};

use core::Package;
use util::Config;
use util::{Config, CargoLock, LockKind};
use util::hex::short_hash;

pub struct Layout {
Expand All @@ -60,6 +60,7 @@ pub struct Layout {
build: PathBuf,
fingerprint: PathBuf,
examples: PathBuf,
lock_kind: Option<LockKind>
}

pub struct LayoutProxy<'a> {
Expand All @@ -78,7 +79,14 @@ impl Layout {
path.push(Path::new(triple).file_stem().unwrap());
}
path.push(dest);
Layout::at(path)

let mut l = Layout::at(path);
// Ignore errors here as we cannot return it. I
match CargoLock::lock_kind(config) {
Ok(kind) => { l.lock_kind = Some(kind); },
Err(err) => debug!("Layout::new(): {}", err),
}
l
}

pub fn at(root: PathBuf) -> Layout {
Expand All @@ -89,10 +97,22 @@ impl Layout {
fingerprint: root.join(".fingerprint"),
examples: root.join("examples"),
root: root,
lock_kind: None,
}
}

pub fn prepare(&mut self) -> io::Result<()> {

let mut fl = CargoLock::new(self.root.join(".layout-init.lock"),
self.lock_kind.clone()
.expect("Must not call prepare() if Layout was \
instantiated with at()"));

// Too low-level to provide detailed lock information. If it doesn't work,
// we will resort to the previous, racy behaviour which should only possibly fail
// during fs::create_dir_all()
fl.lock().ok();

if fs::metadata(&self.root).is_err() {
try!(fs::create_dir_all(&self.root));
}
Expand Down
19 changes: 15 additions & 4 deletions src/cargo/ops/cargo_rustc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ fn compile<'a, 'cfg>(targets: &[(&'a Target, &'a Profile)],
}
(false, false, _) => jobs.queue(pkg, Stage::Binaries),
};
dst.push((Job::new(dirty, fresh), freshness));
dst.push((Job::new(dirty, fresh), freshness, Some(target.clone())));
}
drop(profiling_marker);

Expand Down Expand Up @@ -279,7 +279,7 @@ fn compile<'a, 'cfg>(targets: &[(&'a Target, &'a Profile)],
let (dirty, fresh, freshness) =
try!(custom_build::prepare(pkg, target, req, cx));
let run_custom = jobs.queue(pkg, Stage::RunCustomBuild);
run_custom.push((Job::new(dirty, fresh), freshness));
run_custom.push((Job::new(dirty, fresh), freshness, Some(target.clone())));
}

// If we didn't actually run the custom build command, then there's no
Expand Down Expand Up @@ -319,9 +319,9 @@ fn prepare_init<'a, 'cfg>(cx: &mut Context<'a, 'cfg>,
if cx.requested_target().is_some() {
let (plugin1, plugin2) = fingerprint::prepare_init(cx, pkg,
Kind::Host);
init.push((Job::new(plugin1, plugin2), Fresh));
init.push((Job::new(plugin1, plugin2), Fresh, None));
}
init.push((Job::new(target1, target2), Fresh));
init.push((Job::new(target1, target2), Fresh, None));
}

fn rustc(package: &Package, target: &Target, profile: &Profile,
Expand Down Expand Up @@ -376,6 +376,17 @@ fn rustc(package: &Package, target: &Target, profile: &Profile,
let cwd = cx.config.cwd().to_path_buf();

Ok((Work::new(move |desc_tx| {
// Check if all the files exist. If so, another collaborating process will have
// created them. Note that we only get to do this if we either waited on the lock
// or got it ourselves without waiting. In the latter case, we wouldn't have to do
// anything, but can't differentiate one case from the other.
// TODO(ST): is there a way to verify the contents of the files is valid ?
// If the previous operation that left them there aborted, this might just
// not be the case
if filenames.iter().all(|filename| fs::metadata(&root.join(filename)).is_ok()) {
return Ok(())
}

debug!("about to run: {}", rustc);

// Only at runtime have we discovered what the extra -L and -l
Expand Down
108 changes: 81 additions & 27 deletions src/cargo/sources/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ use url::Url;
use core::{Source, SourceId, PackageId, Package, Summary, Registry};
use core::dependency::{Dependency, Kind};
use sources::{PathSource, git};
use util::{CargoResult, Config, internal, ChainError, ToUrl, human};
use util::{CargoResult, Config, internal, ChainError, ToUrl, human, CargoLock};
use util::{hex, Sha256};
use ops;

Expand Down Expand Up @@ -278,6 +278,12 @@ impl<'cfg> RegistrySource<'cfg> {
/// initialize a fresh new directory and git checkout. No remotes will be
/// configured by default.
fn open(&self) -> CargoResult<git2::Repository> {
let mut fl = CargoLock::new(self.config.home().join(".registry-open-or-init.lock"),
try!(CargoLock::lock_kind(self.config)));

try!(fl.lock().chain_error(|| {
human("Failed to lock registry for opening or initialization")
}));
match git2::Repository::open(&self.checkout_path) {
Ok(repo) => return Ok(repo),
Err(..) => {}
Expand All @@ -302,37 +308,75 @@ impl<'cfg> RegistrySource<'cfg> {
let filename = format!("{}-{}.crate", pkg.name(), pkg.version());
let dst = self.cache_path.join(&filename);
if fs::metadata(&dst).is_ok() { return Ok(dst) }
try!(self.config.shell().status("Downloading", pkg));

try!(fs::create_dir_all(dst.parent().unwrap()));
let expected_hash = try!(self.hash(pkg));
let handle = match self.handle {
Some(ref mut handle) => handle,
None => {
self.handle = Some(try!(ops::http_handle(self.config)));
self.handle.as_mut().unwrap()

let mut lfp = dst.parent().unwrap().to_path_buf();
lfp.set_file_name(format!(".{}.lock", filename));
let mut fl = CargoLock::new(lfp, try!(CargoLock::lock_kind(self.config)));

try!(fl.lock().chain_error(|| {
human(format!("Failed to lock download for crate {}", pkg))
}));

if let Ok(mut file) = fs::File::open(&dst) {
// somebody downloaded the file for us - we just verify it is what
// we expect it to be. Otherwise we fail.
// In any case, we drop the lock now to allow others to do the same
// we do right now.
drop(fl);

let actual = {
// TODO: don't load it into memory. Ideally, Sha256 would support
// `Write` so io::copy() can be used
let mut buf = Vec::<u8>::new();
try!(file.read_to_end(&mut buf));
drop(file);

let mut state = Sha256::new();
state.update(&buf);
state.finish()
};

if actual.to_hex() != expected_hash {
return Err(human(format!("Failed to verify the checksum of `{}`\
at '{}'",
pkg, dst.display())))
}
};
// TODO: don't download into memory (curl-rust doesn't expose it)
let resp = try!(handle.get(url.to_string()).follow_redirects(true).exec());
if resp.get_code() != 200 && resp.get_code() != 0 {
return Err(internal(format!("Failed to get 200 response from {}\n{}",
url, resp)))
}

// Verify what we just downloaded
let actual = {
let mut state = Sha256::new();
state.update(resp.get_body());
state.finish()
};
if actual.to_hex() != expected_hash {
return Err(human(format!("Failed to verify the checksum of `{}`",
pkg)))
}
Ok(dst)
} else {
try!(self.config.shell().status("Downloading", pkg));

try!(try!(File::create(&dst)).write_all(resp.get_body()));
Ok(dst)
let handle = match self.handle {
Some(ref mut handle) => handle,
None => {
self.handle = Some(try!(ops::http_handle(self.config)));
self.handle.as_mut().unwrap()
}
};
// TODO: don't download into memory (curl-rust doesn't expose it)
let resp = try!(handle.get(url.to_string()).follow_redirects(true).exec());
if resp.get_code() != 200 && resp.get_code() != 0 {
return Err(internal(format!("Failed to get 200 response from {}\n{}",
url, resp)))
}

// Verify what we just downloaded
let actual = {
let mut state = Sha256::new();
state.update(resp.get_body());
state.finish()
};
if actual.to_hex() != expected_hash {
return Err(human(format!("Failed to verify the checksum of `{}`",
pkg)))
}

try!(try!(File::create(&dst)).write_all(resp.get_body()));
Ok(dst)
}
}

/// Return the hash listed for a specified PackageId.
Expand Down Expand Up @@ -449,9 +493,19 @@ impl<'cfg> RegistrySource<'cfg> {
fn do_update(&mut self) -> CargoResult<()> {
if self.updated { return Ok(()) }

let repo = try!(self.open());

let mut fl = CargoLock::new(self.checkout_path.join(".registry-update.lock"),
try!(CargoLock::lock_kind(self.config)));

try!(fl.lock().chain_error(|| {
human("Failed to lock registry for update")
}));

// NOTE(ST): The repo might just have been updated by a collaborative process. We will
// just do it again, which is fast and save.
try!(self.config.shell().status("Updating",
format!("registry `{}`", self.source_id.url())));
let repo = try!(self.open());

// git fetch origin
let url = self.source_id.url().to_string();
Expand Down
Loading