Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,10 @@ config_namespace! {
pub timestamp_format: Option<String>, default = None
pub timestamp_tz_format: Option<String>, default = None
pub time_format: Option<String>, default = None
// The output format for Nulls in the CSV writer.
pub null_value: Option<String>, default = None
// The input regex for Nulls when loading CSVs.
pub null_regex: Option<String>, default = None
pub comment: Option<u8>, default = None
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ object_store = { workspace = true }
parking_lot = { workspace = true }
parquet = { workspace = true, optional = true, default-features = true }
rand = { workspace = true }
regex = { workspace = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is not a new actual dependency as it is already a dependency of arrow-csv

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I remove it I have this error during cargo build:

error[E0432]: unresolved import `regex`
  --> datafusion/core/src/datasource/file_format/csv.rs:59:5
   |
59 | use regex::Regex;
   |     ^^^^^ help: a similar path exists: `datafusion_functions::regex`

For more information about this error, try `rustc --explain E0432`.
error: could not compile `datafusion` (lib) due to 1 previous error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the "better" think to do would be for arrow-rs to re-export the RegEx structure it used as it appears in the public API

sqlparser = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
Expand Down
10 changes: 10 additions & 0 deletions datafusion/core/benches/csv_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ fn criterion_benchmark(c: &mut Criterion) {
)
})
});

group.bench_function("null regex override", |b| {
b.iter(|| {
load_csv(
ctx.clone(),
test_file.path().to_str().unwrap(),
CsvReadOptions::default().null_regex(Some("^NULL$|^$".to_string())),
)
})
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
81 changes: 78 additions & 3 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};
use regex::Regex;

#[derive(Default)]
/// Factory struct used to create [CsvFormatFactory]
Expand Down Expand Up @@ -218,6 +219,13 @@ impl CsvFormat {
self
}

/// Set the regex to use for null values in the CSV reader.
/// - default to treat empty values as null.
pub fn with_null_regex(mut self, null_regex: Option<String>) -> Self {
self.options.null_regex = null_regex;
self
}

/// Returns `Some(true)` if the first line is a header, `Some(false)` if
/// it is not, and `None` if it is not specified.
pub fn has_header(&self) -> Option<bool> {
Expand Down Expand Up @@ -502,6 +510,12 @@ impl CsvFormat {
.with_delimiter(self.options.delimiter)
.with_quote(self.options.quote);

if let Some(null_regex) = &self.options.null_regex {
let regex = Regex::new(null_regex.as_str())
.expect("Unable to parse CSV null regex.");
format = format.with_null_regex(regex);
}

if let Some(escape) = self.options.escape {
format = format.with_escape(escape);
}
Expand Down Expand Up @@ -813,8 +827,67 @@ mod tests {
let state = session_ctx.state();

let projection = None;
let exec =
get_exec(&state, "aggregate_test_100.csv", projection, None, true).await?;
let root = "./tests/data/csv";
let format = CsvFormat::default().with_has_header(true);
let exec = scan_format(
&state,
&format,
root,
"aggregate_test_100_with_nulls.csv",
projection,
None,
)
.await?;

let x: Vec<String> = exec
.schema()
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect();
assert_eq!(
vec![
"c1: Utf8",
"c2: Int64",
"c3: Int64",
"c4: Int64",
"c5: Int64",
"c6: Int64",
"c7: Int64",
"c8: Int64",
"c9: Int64",
"c10: Utf8",
"c11: Float64",
"c12: Float64",
"c13: Utf8",
"c14: Null",
"c15: Utf8"
],
x
);

Ok(())
}

#[tokio::test]
async fn infer_schema_with_null_regex() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();

let projection = None;
let root = "./tests/data/csv";
let format = CsvFormat::default()
.with_has_header(true)
.with_null_regex(Some("^NULL$|^$".to_string()));
let exec = scan_format(
&state,
&format,
root,
"aggregate_test_100_with_nulls.csv",
projection,
None,
)
.await?;

let x: Vec<String> = exec
.schema()
Expand All @@ -836,7 +909,9 @@ mod tests {
"c10: Utf8",
"c11: Float64",
"c12: Float64",
"c13: Utf8"
"c13: Utf8",
"c14: Null",
"c15: Null"
],
x
);
Expand Down
12 changes: 11 additions & 1 deletion datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ pub struct CsvReadOptions<'a> {
pub file_compression_type: FileCompressionType,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<SortExpr>>,
/// Optional regex to match null values
pub null_regex: Option<String>,
}

impl Default for CsvReadOptions<'_> {
Expand All @@ -112,6 +114,7 @@ impl<'a> CsvReadOptions<'a> {
file_compression_type: FileCompressionType::UNCOMPRESSED,
file_sort_order: vec![],
comment: None,
null_regex: None,
}
}

Expand Down Expand Up @@ -212,6 +215,12 @@ impl<'a> CsvReadOptions<'a> {
self.file_sort_order = file_sort_order;
self
}

/// Configure the null parsing regex.
pub fn null_regex(mut self, null_regex: Option<String>) -> Self {
self.null_regex = null_regex;
self
}
}

/// Options that control the reading of Parquet files.
Expand Down Expand Up @@ -534,7 +543,8 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_terminator(self.terminator)
.with_newlines_in_values(self.newlines_in_values)
.with_schema_infer_max_rec(self.schema_infer_max_records)
.with_file_compression_type(self.file_compression_type.to_owned());
.with_file_compression_type(self.file_compression_type.to_owned())
.with_null_regex(self.null_regex.clone());

ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
Expand Down
Loading
Loading