Skip to content

Commit b55e0fc

Browse files
authored
Merge pull request #76 from cuviper/rayon
Stream the parallel xz/gz tarball generation
2 parents 745a020 + 5926023 commit b55e0fc

File tree

3 files changed

+58
-35
lines changed

3 files changed

+58
-35
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ path = "src/main.rs"
1111
[dependencies]
1212
error-chain = "0.11.0"
1313
flate2 = "1.0.1"
14+
rayon = "0.9"
1415
tar = "0.4.13"
1516
walkdir = "1.0.7"
16-
xz2 = "0.1.3"
17+
xz2 = "0.1.4"
1718

1819
[dependencies.clap]
1920
features = ["yaml"]

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#[macro_use]
1212
extern crate error_chain;
1313
extern crate flate2;
14+
extern crate rayon;
1415
extern crate tar;
1516
extern crate walkdir;
1617
extern crate xz2;

src/tarballer.rs

Lines changed: 55 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,12 @@
99
// except according to those terms.
1010

1111
use std::fs::File;
12-
use std::io::Write;
12+
use std::io::{self, Write, BufWriter};
1313
use std::path::Path;
14-
use std::sync::Arc;
15-
use std::thread;
1614

1715
use flate2;
1816
use flate2::write::GzEncoder;
17+
use rayon;
1918
use tar::{Builder, Header};
2019
use walkdir::WalkDir;
2120
use xz2::write::XzEncoder;
@@ -57,41 +56,42 @@ impl Tarballer {
5756
.chain_err(|| "failed to collect file paths")?;
5857
files.sort_by(|a, b| a.bytes().rev().cmp(b.bytes().rev()));
5958

60-
// Write the tar into both encoded files. We write all directories
61-
// first, so files may be directly created. (see rustup.rs#1092)
62-
let mut builder = Builder::new(Vec::new());
63-
for path in dirs {
64-
let src = Path::new(&self.work_dir).join(&path);
65-
builder.append_dir(&path, &src)
66-
.chain_err(|| format!("failed to tar dir '{}'", src.display()))?;
67-
}
68-
for path in files {
69-
let src = Path::new(&self.work_dir).join(&path);
70-
let file = open_file(&src)?;
71-
builder.append_data(&mut header(&src, &file)?, &path, &file)
72-
.chain_err(|| format!("failed to tar file '{}'", src.display()))?;
73-
}
74-
let contents = builder.into_inner()
75-
.chain_err(|| "failed to finish writing .tar stream")?;
76-
let contents = Arc::new(contents);
77-
7859
// Prepare the .tar.gz file
79-
let contents2 = contents.clone();
80-
let t = thread::spawn(move || {
81-
let mut gz = GzEncoder::new(create_new_file(tar_gz)?,
82-
flate2::Compression::best());
83-
gz.write_all(&contents2).chain_err(|| "failed to write .gz")?;
84-
gz.finish().chain_err(|| "failed to finish .gz")
85-
});
60+
let gz = GzEncoder::new(create_new_file(tar_gz)?, flate2::Compression::best());
8661

8762
// Prepare the .tar.xz file
88-
let mut xz = XzEncoder::new(create_new_file(tar_xz)?, 9);
89-
xz.write_all(&contents).chain_err(|| "failed to write .xz")?;
90-
xz.finish().chain_err(|| "failed to finish .xz")?;
63+
let xz = XzEncoder::new(create_new_file(tar_xz)?, 9);
9164

92-
t.join().unwrap()?;
93-
94-
Ok(())
65+
// Write the tar into both encoded files. We write all directories
66+
// first, so files may be directly created. (see rustup.rs#1092)
67+
let tee = RayonTee(xz, gz);
68+
let buf = BufWriter::with_capacity(1024 * 1024, tee);
69+
let mut builder = Builder::new(buf);
70+
71+
let pool = rayon::Configuration::new().num_threads(2).build().unwrap();
72+
pool.install(move || {
73+
for path in dirs {
74+
let src = Path::new(&self.work_dir).join(&path);
75+
builder.append_dir(&path, &src)
76+
.chain_err(|| format!("failed to tar dir '{}'", src.display()))?;
77+
}
78+
for path in files {
79+
let src = Path::new(&self.work_dir).join(&path);
80+
let file = open_file(&src)?;
81+
builder.append_data(&mut header(&src, &file)?, &path, &file)
82+
.chain_err(|| format!("failed to tar file '{}'", src.display()))?;
83+
}
84+
let RayonTee(xz, gz) = builder.into_inner()
85+
.chain_err(|| "failed to finish writing .tar stream")?
86+
.into_inner().ok().unwrap();
87+
88+
// Finish both encoded files
89+
let (rxz, rgz) = rayon::join(
90+
|| xz.finish().chain_err(|| "failed to finish .tar.xz file"),
91+
|| gz.finish().chain_err(|| "failed to finish .tar.gz file"),
92+
);
93+
rxz.and(rgz).and(Ok(()))
94+
})
9595
}
9696
}
9797

@@ -138,3 +138,24 @@ fn get_recursive_paths<P, Q>(root: P, name: Q) -> Result<(Vec<String>, Vec<Strin
138138
}
139139
Ok((dirs, files))
140140
}
141+
142+
struct RayonTee<A, B>(A, B);
143+
144+
impl<A: Write + Send, B: Write + Send> Write for RayonTee<A, B> {
145+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
146+
self.write_all(buf)?;
147+
Ok(buf.len())
148+
}
149+
150+
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
151+
let (a, b) = (&mut self.0, &mut self.1);
152+
let (ra, rb) = rayon::join(|| a.write_all(buf), || b.write_all(buf));
153+
ra.and(rb)
154+
}
155+
156+
fn flush(&mut self) -> io::Result<()> {
157+
let (a, b) = (&mut self.0, &mut self.1);
158+
let (ra, rb) = rayon::join(|| a.flush(), || b.flush());
159+
ra.and(rb)
160+
}
161+
}

0 commit comments

Comments
 (0)