From 943272952f187b46ed20e3d34cc415cdf547513f Mon Sep 17 00:00:00 2001 From: Alex Rudy Date: Tue, 24 Apr 2018 16:02:08 -0400 Subject: [PATCH] Add a simple forward fill command. --- src/cmd/fill.rs | 361 +++++++++++++++++++++++++++++++++++++++++++++ src/cmd/mod.rs | 1 + src/main.rs | 3 + tests/test_fill.rs | 173 ++++++++++++++++++++++ tests/tests.rs | 1 + 5 files changed, 539 insertions(+) create mode 100644 src/cmd/fill.rs create mode 100644 tests/test_fill.rs diff --git a/src/cmd/fill.rs b/src/cmd/fill.rs new file mode 100644 index 000000000..5b144fbdd --- /dev/null +++ b/src/cmd/fill.rs @@ -0,0 +1,361 @@ +use std::collections::hash_map::HashMap; +use std::io; +use std::iter; +use std::ops; + +use csv; + +use CliResult; +use config::{Config, Delimiter}; +use select::{SelectColumns, Selection}; +use util; + +static USAGE: &'static str = " +Fill empty fields in selected columns of a CSV. + +This command fills empty fields in the selected column +using the last seen non-empty field in the CSV. This is +useful to forward-fill values which may only be included +the first time they are encountered. + +The option `--default ` fills all empty values +in the selected columns with the provided default value. + +The option `--first` fills empty values using the first +seen non-empty value in that column, instead of the most +recent non-empty value in that column. + +The option `--backfill` fills empty values at the start of +the CSV with the first valid value in that column. This +requires buffering rows with empty values in the target +column which appear before the first valid value. + +The option `--groupby` groups the rows by the specified +columns before filling in the empty values. Using this +option, empty values are only filled with values which +belong to the same group of rows, as determined by the +columns selected in the `--groupby` option. + +When both `--groupby` and `--backfill` are specified, and the +CSV is not sorted by the `--groupby` columns, rows may be +re-ordered during output due to the buffering of rows +collected before the first valid value. + +Usage: + xsv fill [options] [--] [] + xsv fill --help + +fill options: + -g --groupby Group by specified columns. + -f --first Fill using the first valid value of a column, instead of the latest. + -b --backfill Fill initial empty values with the first valid value. + -v --default Fill using this default value. + +Common options: + -h, --help Display this message + -o, --output Write output to instead of stdout. + -n, --no-headers When set, the first row will not be interpreted + as headers. (i.e., They are not searched, analyzed, + sliced, etc.) + -d, --delimiter The field delimiter for reading CSV data. + Must be a single character. (default: ,) +"; + +type ByteString = Vec; + +type BoxedWriter = csv::Writer>; +type BoxedReader = csv::Reader>; + +#[derive(Deserialize)] +struct Args { + arg_input: Option, + arg_selection: SelectColumns, + flag_output: Option, + flag_no_headers: bool, + flag_delimiter: Option, + flag_groupby: Option, + flag_first: bool, + flag_backfill: bool, + flag_default: Option, +} + +pub fn run(argv: &[&str]) -> CliResult<()> { + let args: Args = util::get_args(USAGE, argv)?; + + let rconfig = Config::new(&args.arg_input) + .delimiter(args.flag_delimiter) + .no_headers(args.flag_no_headers) + .select(args.arg_selection); + + let wconfig = Config::new(&args.flag_output); + + let mut rdr = rconfig.reader()?; + let mut wtr = wconfig.writer()?; + + let headers = rdr.byte_headers()?.clone(); + let select = rconfig.selection(&headers)?; + let groupby = match args.flag_groupby { + Some(value) => Some(value.selection(&headers, !rconfig.no_headers)?), + None => None, + }; + + if !rconfig.no_headers { + rconfig.write_headers(&mut rdr, &mut wtr)?; + } + + let filler = Filler::new(groupby, select) + .use_first_value(args.flag_first) + .backfill_empty_values(args.flag_backfill) + .use_default_value(args.flag_default); + filler.fill(&mut rdr, &mut wtr) +} + +#[derive(PartialEq, Eq, Hash, Clone, Debug)] +struct ByteRecord(Vec); + +impl<'a> From<&'a csv::ByteRecord> for ByteRecord { + fn from(record: &'a csv::ByteRecord) -> Self { + ByteRecord(record.iter().map(|f| f.to_vec()).collect()) + } +} + +impl iter::FromIterator for ByteRecord { + fn from_iter>(iter: T) -> Self { + ByteRecord(Vec::from_iter(iter)) + } +} + +impl iter::IntoIterator for ByteRecord { + type Item = ByteString; + type IntoIter = ::std::vec::IntoIter; + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +impl ops::Deref for ByteRecord { + type Target = [ByteString]; + fn deref(&self) -> &[ByteString] { + &self.0 + } +} + +type GroupKey = Option; +type GroupBuffer = HashMap>; +type Grouper = HashMap; +type GroupKeySelection = Option; + +trait GroupKeyConstructor { + fn key(&self, record: &csv::ByteRecord) -> Result; +} + +impl GroupKeyConstructor for GroupKeySelection { + fn key(&self, record: &csv::ByteRecord) -> Result { + match *self { + Some(ref value) => Ok(Some(value.iter().map(|&i| record[i].to_vec()).collect())), + None => Ok(None), + } + } +} + +#[derive(Debug)] +struct GroupValues { + map: HashMap, + default: Option, +} + +impl GroupValues { + fn new(default: Option) -> Self { + Self { + map: HashMap::new(), + default: default, + } + } +} + +trait GroupMemorizer { + fn fill(&self, selection: &Selection, record: ByteRecord) -> ByteRecord; + fn memorize(&mut self, selection: &Selection, record: &csv::ByteRecord); + fn memorize_first(&mut self, selection: &Selection, record: &csv::ByteRecord); +} + +impl GroupMemorizer for GroupValues { + fn memorize(&mut self, selection: &Selection, record: &csv::ByteRecord) { + for &col in selection.iter().filter(|&col| !record[*col].is_empty()) { + self.map.insert(col, record[col].to_vec()); + } + } + + fn memorize_first(&mut self, selection: &Selection, record: &csv::ByteRecord) { + for &col in selection.iter().filter(|&col| !record[*col].is_empty()) { + self.map.entry(col).or_insert(record[col].to_vec()); + } + } + + fn fill(&self, selection: &Selection, record: ByteRecord) -> ByteRecord { + record + .into_iter() + .enumerate() + .map_selected(selection, |(col, field)| { + ( + col, + if field.is_empty() { + self.default + .clone() + .or_else(|| self.map.get(&col).cloned()) + .unwrap_or_else(|| field.to_vec()) + } else { + field + }, + ) + }) + .map(|(_, field)| field) + .collect() + } +} + +struct Filler { + grouper: Grouper, + groupby: GroupKeySelection, + select: Selection, + buffer: GroupBuffer, + first: bool, + backfill: bool, + default_value: Option, +} + +impl Filler { + fn new(groupby: GroupKeySelection, select: Selection) -> Self { + Self { + grouper: Grouper::new(), + groupby: groupby, + select: select, + buffer: GroupBuffer::new(), + first: false, + backfill: false, + default_value: None, + } + } + + fn use_first_value(mut self, first: bool) -> Self { + self.first = first; + self + } + + fn backfill_empty_values(mut self, backfill: bool) -> Self { + self.backfill = backfill; + self + } + + fn use_default_value(mut self, value: Option) -> Self { + self.default_value = value.map(|v| v.as_bytes().to_vec()); + self + } + + fn fill(mut self, rdr: &mut BoxedReader, wtr: &mut BoxedWriter) -> CliResult<()> { + let mut record = csv::ByteRecord::new(); + + while rdr.read_byte_record(&mut record)? { + // Precompute groupby key + let key = self.groupby.key(&record)?; + + // Record valid fields, and fill empty fields + let default_value = self.default_value.clone(); + let group = self.grouper + .entry(key.clone()) + .or_insert_with(|| GroupValues::new(default_value)); + + match (self.default_value.is_some(), self.first) { + (true, _) => {} + (false, true) => group.memorize_first(&self.select, &record), + (false, false) => group.memorize(&self.select, &record), + }; + + let row = group.fill(&self.select, ByteRecord::from(&record)); + + // Handle buffering rows which still have nulls. + if self.backfill && (self.select.iter().any(|&i| row[i] == b"")) { + self.buffer + .entry(key.clone()) + .or_insert_with(Vec::new) + .push(row); + } else { + if let Some(rows) = self.buffer.remove(&key) { + for buffered_row in rows { + wtr.write_record(group.fill(&self.select, buffered_row).iter())?; + } + } + wtr.write_record(row.iter())?; + } + } + + // Ensure any remaining buffers are dumped at the end. + for (key, rows) in self.buffer { + let group = self.grouper.get(&key).unwrap(); + for buffered_row in rows { + wtr.write_record(group.fill(&self.select, buffered_row).iter())?; + } + } + + wtr.flush()?; + Ok(()) + } +} + +struct MapSelected { + selection: Vec, + selection_index: usize, + index: usize, + iterator: I, + predicate: F, +} + +impl iter::Iterator for MapSelected +where + F: FnMut(I::Item) -> I::Item, +{ + type Item = I::Item; + + fn next(&mut self) -> Option { + let item = match self.iterator.next() { + Some(item) => item, + None => return None + }; + let result = match self.selection_index { + ref mut sidx if (self.selection.get(*sidx) == Some(&self.index)) => { + *sidx += 1; + Some((self.predicate)(item)) + } + _ => Some(item), + }; + self.index += 1; + result + } +} + +trait Selectable +where + Self: iter::Iterator + Sized, +{ + fn map_selected(self, selector: &Selection, predicate: F) -> MapSelected + where + F: FnMut(B) -> B; +} + +impl Selectable for C +where + C: iter::Iterator + Sized, +{ + fn map_selected(self, selector: &Selection, predicate: F) -> MapSelected + where + F: FnMut(B) -> B, + { + MapSelected { + selection: selector.iter().map(|&x| x).collect(), + selection_index: 0, + index: 0, + iterator: self, + predicate: predicate, + } + } +} diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs index a3fb63ab6..db35c6412 100644 --- a/src/cmd/mod.rs +++ b/src/cmd/mod.rs @@ -1,5 +1,6 @@ pub mod cat; pub mod count; +pub mod fill; pub mod fixlengths; pub mod flatten; pub mod fmt; diff --git a/src/main.rs b/src/main.rs index 1eec7501e..077881dd3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -45,6 +45,7 @@ macro_rules! command_list { " cat Concatenate by row or column count Count records + fill Fill empty values fixlengths Makes all records have same length flatten Show one field per line fmt Format CSV output (change field delimiter) @@ -139,6 +140,7 @@ Please choose one of the following commands:", enum Command { Cat, Count, + Fill, FixLengths, Flatten, Fmt, @@ -167,6 +169,7 @@ impl Command { match self { Command::Cat => cmd::cat::run(argv), Command::Count => cmd::count::run(argv), + Command::Fill => cmd::fill::run(argv), Command::FixLengths => cmd::fixlengths::run(argv), Command::Flatten => cmd::flatten::run(argv), Command::Fmt => cmd::fmt::run(argv), diff --git a/tests/test_fill.rs b/tests/test_fill.rs new file mode 100644 index 000000000..f713bd52e --- /dev/null +++ b/tests/test_fill.rs @@ -0,0 +1,173 @@ +use CsvRecord; +use workdir::Workdir; + +fn compare_column(got: &[CsvRecord], expected: &[String], column: usize, skip_header: bool) { + for (value, value_expected) in got.iter() + .skip(if skip_header { 1 } else { 0 }) + .map(|row| &row[column]) + .zip(expected.iter()) + { + assert_eq!(value, value_expected) + } +} + +fn example() -> Vec> { + vec![ + svec!["h1", "h2", "h3"], + svec!["", "baz", "egg"], + svec!["", "foo", ""], + svec!["abc", "baz", "foo"], + svec!["", "baz", "egg"], + svec!["zap", "baz", "foo"], + svec!["bar", "foo", ""], + svec!["bongo", "foo", ""], + svec!["", "foo", "jar"], + svec!["", "baz", "jar"], + svec!["", "foo", "jar"], + ] +} + +#[test] +fn fill_forward() { + let wrk = Workdir::new("fill_forward"); + wrk.create("in.csv", example()); + + let mut cmd = wrk.command("fill"); + cmd.arg("--").arg("1").arg("in.csv"); + + let got: Vec = wrk.read_stdout(&mut cmd); + + // Filled target column + let expected = svec![ + "", "", "abc", "abc", "zap", "bar", "bongo", "bongo", "bongo", "bongo" + ]; + compare_column(&got, &expected, 0, true); + + // Left non-target column alone + let expected = svec!["egg", "", "foo", "egg", "foo", "", "", "jar", "jar", "jar"]; + compare_column(&got, &expected, 2, true); +} + +#[test] +fn fill_forward_both() { + let wrk = Workdir::new("fill_forward"); + wrk.create("in.csv", example()); + + let mut cmd = wrk.command("fill"); + cmd.arg("--").arg("1,3").arg("in.csv"); + + let got: Vec = wrk.read_stdout(&mut cmd); + + // Filled target column + let expected = svec![ + "", "", "abc", "abc", "zap", "bar", "bongo", "bongo", "bongo", "bongo" + ]; + compare_column(&got, &expected, 0, true); + + let expected = svec![ + "egg", "egg", "foo", "egg", "foo", "foo", "foo", "jar", "jar", "jar" + ]; + compare_column(&got, &expected, 2, true); +} + +#[test] +fn fill_forward_groupby() { + let wrk = Workdir::new("fill_forward_groupby").flexible(true); + wrk.create("in.csv", example()); + + let mut cmd = wrk.command("fill"); + cmd.args(&vec!["-g", "2"]).arg("--").arg("1").arg("in.csv"); + + let got: Vec = wrk.read_stdout(&mut cmd); + let expected = svec![ + "", "", "abc", "abc", "zap", "bar", "bongo", "bongo", "zap", "bongo" + ]; + compare_column(&got, &expected, 0, true); +} + +#[test] +fn fill_first_groupby() { + let wrk = Workdir::new("fill_first_groupby").flexible(true); + wrk.create("in.csv", example()); + + let mut cmd = wrk.command("fill"); + cmd.args(&vec!["-g", "2"]) + .arg("--first") + .arg("--") + .arg("1") + .arg("in.csv"); + + let got: Vec = wrk.read_stdout(&mut cmd); + let expected = svec![ + "", "", "abc", "abc", "zap", "bar", "bongo", "bar", "abc", "bar" + ]; + compare_column(&got, &expected, 0, true); +} + +#[test] +fn fill_first() { + let wrk = Workdir::new("fill_first").flexible(true); + wrk.create("in.csv", example()); + + let mut cmd = wrk.command("fill"); + cmd.arg("--first").arg("--").arg("1").arg("in.csv"); + + let got: Vec = wrk.read_stdout(&mut cmd); + let expected = svec![ + "", "", "abc", "abc", "zap", "bar", "bongo", "abc", "abc", "abc" + ]; + compare_column(&got, &expected, 0, true); +} + +#[test] +fn fill_backfill() { + let wrk = Workdir::new("fill_backfill").flexible(true); + wrk.create("in.csv", example()); + + let mut cmd = wrk.command("fill"); + cmd.arg("--backfill").arg("--").arg("1").arg("in.csv"); + + let got: Vec = wrk.read_stdout(&mut cmd); + let expected = svec![ + "abc", "abc", "abc", "abc", "zap", "bar", "bongo", "bongo", "bongo", "bongo" + ]; + compare_column(&got, &expected, 0, true); +} + +#[test] +fn fill_backfill_first() { + let wrk = Workdir::new("fill_backfill").flexible(true); + wrk.create("in.csv", example()); + + let mut cmd = wrk.command("fill"); + cmd.arg("--backfill") + .arg("--first") + .arg("--") + .arg("1") + .arg("in.csv"); + + let got: Vec = wrk.read_stdout(&mut cmd); + let expected = svec![ + "abc", "abc", "abc", "abc", "zap", "bar", "bongo", "abc", "abc", "abc" + ]; + compare_column(&got, &expected, 0, true); +} + +#[test] +fn fill_default() { + let wrk = Workdir::new("fill_default").flexible(true); + wrk.create("in.csv", example()); + + let mut cmd = wrk.command("fill"); + cmd.arg("--default") + .arg("dat") + .arg("--") + .arg("1") + .arg("in.csv"); + + let got: Vec = wrk.read_stdout(&mut cmd); + let expected = svec![ + "dat", "dat", "abc", "dat", "zap", "bar", "bongo", "dat", "dat", "dat" + ]; + compare_column(&got, &expected, 0, true); +} diff --git a/tests/tests.rs b/tests/tests.rs index afd283264..a29d285c1 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -35,6 +35,7 @@ mod workdir; mod test_cat; mod test_count; +mod test_fill; mod test_fixlengths; mod test_flatten; mod test_fmt;