From 4d9ec8714d775774c51dd6e62dd5020671dfcd46 Mon Sep 17 00:00:00 2001 From: Ben Sully Date: Tue, 30 Oct 2018 09:28:54 +0000 Subject: [PATCH] Don't try to determine compresssion from input This only works for files, not for stdin, because we can't seek back to the start of stdin after checking if we determine that there is no compression, and therefore end up missing out some of the header. Instead, require the user to explicitly specify flags if uncompressed input or output is required. --- src/cli.rs | 12 +++++++++--- src/io.rs | 28 +++++++++++++++------------- src/lib.rs | 1 + src/main.rs | 12 ++++++------ src/split/splitter.rs | 35 +++++++++++++++++++++++------------ src/split/writer.rs | 18 +++++++++++------- 6 files changed, 65 insertions(+), 41 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 4bebfa1..186f71d 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -90,8 +90,14 @@ pub struct Split { #[structopt( short = "u", - long = "uncompressed", - help = "Write output files uncompressed" + long = "uncompressed-input", + help = "Treat input as uncompressed" )] - pub uncompressed: Option, + pub uncompressed: bool, + + #[structopt( + long = "uncompressed-output", + help = "Don't compress output files" + )] + pub uncompressed_output: bool, } diff --git a/src/io.rs b/src/io.rs index 8f7ebd2..676df46 100644 --- a/src/io.rs +++ b/src/io.rs @@ -10,7 +10,13 @@ use crate::error::Result; pub type InputReader = BufReader>; pub type OutputWriter = Box; -pub fn open_data>(path: P) -> Result { +#[derive(Clone, Copy, Debug)] +pub enum Compression { + Uncompressed, + GzipCompression, +} + +pub fn open_data>(path: P, compression: Compression) -> Result { // Read from stdin if input is '-', else try to open the provided file. let reader: Box = match path.as_ref().to_str() { Some(p) if p == "-" => Box::new(std::io::stdin()), @@ -18,22 +24,18 @@ pub fn open_data>(path: P) -> Result { _ => unreachable!(), }; - let gz = GzDecoder::new(reader); - let is_compressed = gz.header().is_some(); - let final_reader: Box = if is_compressed { - Box::new(gz) - } else { - Box::new(gz.into_inner()) + let reader: Box = match compression { + Compression::Uncompressed => Box::new(reader), + Compression::GzipCompression => Box::new(GzDecoder::new(reader)), }; - Ok(BufReader::with_capacity(1024 * 1024, final_reader)) + Ok(BufReader::with_capacity(1024 * 1024, reader)) } -pub fn open_output>(path: P, compressed: bool) -> Result { +pub fn open_output>(path: P, compression: Compression) -> Result { let file = File::create(path)?; - let writer: OutputWriter = if compressed { - Box::new(GzEncoder::new(file, Default::default())) - } else { - Box::new(file) + let writer: OutputWriter = match compression { + Compression::GzipCompression => Box::new(GzEncoder::new(file, Default::default())), + Compression::Uncompressed => Box::new(file), }; Ok(writer) } diff --git a/src/lib.rs b/src/lib.rs index 192fab9..13002ee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,5 +7,6 @@ mod split; pub use { crate::error::{Error, Result}, + crate::io::Compression, crate::split::SplitterBuilder, }; diff --git a/src/main.rs b/src/main.rs index 4f000f8..d041e4c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use env_logger; use structopt::StructOpt; -use ttv::{cli, Result, SplitterBuilder}; +use ttv::{cli, Compression, Result, SplitterBuilder}; fn main() -> Result<()> { env_logger::init(); @@ -9,11 +9,11 @@ fn main() -> Result<()> { match opt.cmd { cli::Command::Split(x) => { let mut splitter = SplitterBuilder::new(&x.input, x.row_splits, x.prop_splits)?; - if let Some(uncompressed) = x.uncompressed { - splitter = splitter.compressed(!uncompressed); - } else { - let should_compress = x.input.ends_with(".gz") | x.input.ends_with(".gzip"); - splitter = splitter.compressed(should_compress); + if x.uncompressed { + splitter = splitter.input_compression(Compression::Uncompressed); + } + if x.uncompressed_output { + splitter = splitter.output_compression(Compression::Uncompressed); } if let Some(seed) = x.seed { splitter = splitter.seed(seed); diff --git a/src/split/splitter.rs b/src/split/splitter.rs index 206d32f..22855ca 100644 --- a/src/split/splitter.rs +++ b/src/split/splitter.rs @@ -8,7 +8,7 @@ use log::{debug, info}; use rand::{prelude::*, prng::ChaChaRng}; use crate::error::{Error, Result}; -use crate::io::open_data; +use crate::io::{open_data, Compression}; use crate::split::{ single::{ProportionSplit, RowSplit, Split, SplitEnum}, splits::{SplitSelection, Splits}, @@ -28,8 +28,10 @@ pub struct SplitterBuilder { chunk_size: Option, /// The total number of rows total_rows: Option, - /// Whether to compress output files - compressed: bool, + /// Compression for input files + input_compression: Compression, + /// Compression for output files + output_compression: Compression, } impl SplitterBuilder { @@ -50,7 +52,8 @@ impl SplitterBuilder { output_prefix: None, chunk_size: None, total_rows: None, - compressed: false, + input_compression: Compression::GzipCompression, + output_compression: Compression::GzipCompression, }) } @@ -84,8 +87,13 @@ impl SplitterBuilder { self } - pub fn compressed(mut self, compressed: bool) -> Self { - self.compressed = compressed; + pub fn input_compression(mut self, input_compression: Compression) -> Self { + self.input_compression = input_compression; + self + } + + pub fn output_compression(mut self, output_compression: Compression) -> Self { + self.output_compression = output_compression; self } @@ -101,7 +109,8 @@ impl SplitterBuilder { output_prefix: self.output_prefix, chunk_size: self.chunk_size, total_rows: self.total_rows, - compressed: self.compressed, + input_compression: self.input_compression, + output_compression: self.output_compression, }) } } @@ -119,8 +128,10 @@ pub struct Splitter { chunk_size: Option, /// The total number of rows total_rows: Option, - /// Whether to compress output files - compressed: bool, + /// Compression for input files + input_compression: Compression, + /// Compression for output files + output_compression: Compression, } impl Splitter { @@ -188,7 +199,7 @@ impl Splitter { &split, self.chunk_size, self.total_rows, - self.compressed, + self.output_compression, )?; senders.insert(split.name().to_string(), split_sender); chunk_writers.append(&mut split_chunk_writers); @@ -202,7 +213,7 @@ impl Splitter { &split, self.chunk_size, self.total_rows, - self.compressed, + self.output_compression, )?; senders.insert(split.name().to_string(), split_sender); chunk_writers.append(&mut split_chunk_writers); @@ -220,7 +231,7 @@ impl Splitter { pool.scope(move |scope| { info!("Reading data from {}", self.input.to_str().unwrap()); - let reader = open_data(&self.input)?; + let reader = open_data(&self.input, self.input_compression)?; info!("Writing header to files"); let mut lines = reader.lines(); diff --git a/src/split/writer.rs b/src/split/writer.rs index c9902e6..b5336ed 100644 --- a/src/split/writer.rs +++ b/src/split/writer.rs @@ -30,7 +30,7 @@ impl SplitWriter { split: &SplitEnum, chunk_size: Option, total_rows: Option, - compressed: bool, + compression: io::Compression, ) -> Result<(Self, Vec)> { let n_chunks = match (split, chunk_size, total_rows) { // Just use one sender since there is no chunking required. @@ -65,7 +65,7 @@ impl SplitWriter { let chunk_writer = ChunkWriter::new( path.clone(), split.name().to_string(), - compressed, + compression, chunk_id, chunk_size, receiver, @@ -123,7 +123,7 @@ impl SplitWriter { pub struct ChunkWriter { path: PathBuf, name: String, - compressed: bool, + compression: io::Compression, pub chunk_id: Option, pub chunk_size: Option, pub receiver: Receiver, @@ -133,7 +133,7 @@ impl ChunkWriter { fn new( path: PathBuf, name: String, - compressed: bool, + compression: io::Compression, chunk_id: Option, chunk_size: Option, receiver: Receiver, @@ -141,7 +141,7 @@ impl ChunkWriter { ChunkWriter { path, name, - compressed, + compression, chunk_id, chunk_size, receiver, @@ -158,14 +158,18 @@ impl ChunkWriter { None => "".to_string(), Some(c) => format!(".{}", c.to_string().pad(4, '0', Alignment::Right, false)), }; + let extension = match self.compression { + io::Compression::GzipCompression => ".gz", + io::Compression::Uncompressed => "", + }; filename.push(format!( "{}.{}{}.csv{}", original_filename.to_string_lossy(), &self.name, chunk_part, - if self.compressed { ".gz" } else { "" }, + extension, )); - io::open_output(filename, self.compressed) + io::open_output(filename, self.compression) } /// Handle writing of a row to this chunk. pub fn handle_row(&self, file: &mut io::OutputWriter, row: String) -> Result<()> {