Skip to content

Commit

Permalink
Don't try to determine compresssion from input
Browse files Browse the repository at this point in the history
This only works for files, not for stdin, because we can't seek back to
the start of stdin after checking if we determine that there is no
compression, and therefore end up missing out some of the header.

Instead, require the user to explicitly specify flags if uncompressed
input or output is required.
  • Loading branch information
sd2k committed Oct 30, 2018
1 parent 59d3e0a commit 4d9ec87
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 41 deletions.
12 changes: 9 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,14 @@ pub struct Split {

#[structopt(
short = "u",
long = "uncompressed",
help = "Write output files uncompressed"
long = "uncompressed-input",
help = "Treat input as uncompressed"
)]
pub uncompressed: Option<bool>,
pub uncompressed: bool,

#[structopt(
long = "uncompressed-output",
help = "Don't compress output files"
)]
pub uncompressed_output: bool,
}
28 changes: 15 additions & 13 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,32 @@ use crate::error::Result;
pub type InputReader = BufReader<Box<Read>>;
pub type OutputWriter = Box<Write>;

pub fn open_data<P: AsRef<Path>>(path: P) -> Result<InputReader> {
#[derive(Clone, Copy, Debug)]
pub enum Compression {
Uncompressed,
GzipCompression,
}

pub fn open_data<P: AsRef<Path>>(path: P, compression: Compression) -> Result<InputReader> {
// Read from stdin if input is '-', else try to open the provided file.
let reader: Box<Read> = match path.as_ref().to_str() {
Some(p) if p == "-" => Box::new(std::io::stdin()),
Some(p) => Box::new(File::open(p)?),
_ => unreachable!(),
};

let gz = GzDecoder::new(reader);
let is_compressed = gz.header().is_some();
let final_reader: Box<Read> = if is_compressed {
Box::new(gz)
} else {
Box::new(gz.into_inner())
let reader: Box<Read> = match compression {
Compression::Uncompressed => Box::new(reader),
Compression::GzipCompression => Box::new(GzDecoder::new(reader)),
};
Ok(BufReader::with_capacity(1024 * 1024, final_reader))
Ok(BufReader::with_capacity(1024 * 1024, reader))
}

pub fn open_output<P: AsRef<Path>>(path: P, compressed: bool) -> Result<OutputWriter> {
pub fn open_output<P: AsRef<Path>>(path: P, compression: Compression) -> Result<OutputWriter> {
let file = File::create(path)?;
let writer: OutputWriter = if compressed {
Box::new(GzEncoder::new(file, Default::default()))
} else {
Box::new(file)
let writer: OutputWriter = match compression {
Compression::GzipCompression => Box::new(GzEncoder::new(file, Default::default())),
Compression::Uncompressed => Box::new(file),
};
Ok(writer)
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ mod split;

pub use {
crate::error::{Error, Result},
crate::io::Compression,
crate::split::SplitterBuilder,
};
12 changes: 6 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use env_logger;
use structopt::StructOpt;

use ttv::{cli, Result, SplitterBuilder};
use ttv::{cli, Compression, Result, SplitterBuilder};

fn main() -> Result<()> {
env_logger::init();
let opt = cli::Opt::from_args();
match opt.cmd {
cli::Command::Split(x) => {
let mut splitter = SplitterBuilder::new(&x.input, x.row_splits, x.prop_splits)?;
if let Some(uncompressed) = x.uncompressed {
splitter = splitter.compressed(!uncompressed);
} else {
let should_compress = x.input.ends_with(".gz") | x.input.ends_with(".gzip");
splitter = splitter.compressed(should_compress);
if x.uncompressed {
splitter = splitter.input_compression(Compression::Uncompressed);
}
if x.uncompressed_output {
splitter = splitter.output_compression(Compression::Uncompressed);
}
if let Some(seed) = x.seed {
splitter = splitter.seed(seed);
Expand Down
35 changes: 23 additions & 12 deletions src/split/splitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use log::{debug, info};
use rand::{prelude::*, prng::ChaChaRng};

use crate::error::{Error, Result};
use crate::io::open_data;
use crate::io::{open_data, Compression};
use crate::split::{
single::{ProportionSplit, RowSplit, Split, SplitEnum},
splits::{SplitSelection, Splits},
Expand All @@ -28,8 +28,10 @@ pub struct SplitterBuilder {
chunk_size: Option<u64>,
/// The total number of rows
total_rows: Option<u64>,
/// Whether to compress output files
compressed: bool,
/// Compression for input files
input_compression: Compression,
/// Compression for output files
output_compression: Compression,
}

impl SplitterBuilder {
Expand All @@ -50,7 +52,8 @@ impl SplitterBuilder {
output_prefix: None,
chunk_size: None,
total_rows: None,
compressed: false,
input_compression: Compression::GzipCompression,
output_compression: Compression::GzipCompression,
})
}

Expand Down Expand Up @@ -84,8 +87,13 @@ impl SplitterBuilder {
self
}

pub fn compressed(mut self, compressed: bool) -> Self {
self.compressed = compressed;
pub fn input_compression(mut self, input_compression: Compression) -> Self {
self.input_compression = input_compression;
self
}

pub fn output_compression(mut self, output_compression: Compression) -> Self {
self.output_compression = output_compression;
self
}

Expand All @@ -101,7 +109,8 @@ impl SplitterBuilder {
output_prefix: self.output_prefix,
chunk_size: self.chunk_size,
total_rows: self.total_rows,
compressed: self.compressed,
input_compression: self.input_compression,
output_compression: self.output_compression,
})
}
}
Expand All @@ -119,8 +128,10 @@ pub struct Splitter {
chunk_size: Option<u64>,
/// The total number of rows
total_rows: Option<u64>,
/// Whether to compress output files
compressed: bool,
/// Compression for input files
input_compression: Compression,
/// Compression for output files
output_compression: Compression,
}

impl Splitter {
Expand Down Expand Up @@ -188,7 +199,7 @@ impl Splitter {
&split,
self.chunk_size,
self.total_rows,
self.compressed,
self.output_compression,
)?;
senders.insert(split.name().to_string(), split_sender);
chunk_writers.append(&mut split_chunk_writers);
Expand All @@ -202,7 +213,7 @@ impl Splitter {
&split,
self.chunk_size,
self.total_rows,
self.compressed,
self.output_compression,
)?;
senders.insert(split.name().to_string(), split_sender);
chunk_writers.append(&mut split_chunk_writers);
Expand All @@ -220,7 +231,7 @@ impl Splitter {

pool.scope(move |scope| {
info!("Reading data from {}", self.input.to_str().unwrap());
let reader = open_data(&self.input)?;
let reader = open_data(&self.input, self.input_compression)?;

info!("Writing header to files");
let mut lines = reader.lines();
Expand Down
18 changes: 11 additions & 7 deletions src/split/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl SplitWriter {
split: &SplitEnum,
chunk_size: Option<u64>,
total_rows: Option<u64>,
compressed: bool,
compression: io::Compression,
) -> Result<(Self, Vec<ChunkWriter>)> {
let n_chunks = match (split, chunk_size, total_rows) {
// Just use one sender since there is no chunking required.
Expand Down Expand Up @@ -65,7 +65,7 @@ impl SplitWriter {
let chunk_writer = ChunkWriter::new(
path.clone(),
split.name().to_string(),
compressed,
compression,
chunk_id,
chunk_size,
receiver,
Expand Down Expand Up @@ -123,7 +123,7 @@ impl SplitWriter {
pub struct ChunkWriter {
path: PathBuf,
name: String,
compressed: bool,
compression: io::Compression,
pub chunk_id: Option<u64>,
pub chunk_size: Option<u64>,
pub receiver: Receiver<String>,
Expand All @@ -133,15 +133,15 @@ impl ChunkWriter {
fn new(
path: PathBuf,
name: String,
compressed: bool,
compression: io::Compression,
chunk_id: Option<u64>,
chunk_size: Option<u64>,
receiver: Receiver<String>,
) -> Self {
ChunkWriter {
path,
name,
compressed,
compression,
chunk_id,
chunk_size,
receiver,
Expand All @@ -158,14 +158,18 @@ impl ChunkWriter {
None => "".to_string(),
Some(c) => format!(".{}", c.to_string().pad(4, '0', Alignment::Right, false)),
};
let extension = match self.compression {
io::Compression::GzipCompression => ".gz",
io::Compression::Uncompressed => "",
};
filename.push(format!(
"{}.{}{}.csv{}",
original_filename.to_string_lossy(),
&self.name,
chunk_part,
if self.compressed { ".gz" } else { "" },
extension,
));
io::open_output(filename, self.compressed)
io::open_output(filename, self.compression)
}
/// Handle writing of a row to this chunk.
pub fn handle_row(&self, file: &mut io::OutputWriter, row: String) -> Result<()> {
Expand Down

0 comments on commit 4d9ec87

Please sign in to comment.