Skip to content

Commit f615857

Browse files
dhegbergalamb
andcommitted
Support Null regex override in csv parser options.
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent bfabd48 commit f615857

File tree

14 files changed

+294
-62
lines changed

14 files changed

+294
-62
lines changed

datafusion-cli/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1655,7 +1655,10 @@ config_namespace! {
16551655
pub timestamp_format: Option<String>, default = None
16561656
pub timestamp_tz_format: Option<String>, default = None
16571657
pub time_format: Option<String>, default = None
1658+
// The output format for Nulls in the CSV writer.
16581659
pub null_value: Option<String>, default = None
1660+
// The input regex for Nulls when loading CSVs.
1661+
pub null_regex: Option<String>, default = None
16591662
pub comment: Option<u8>, default = None
16601663
}
16611664
}

datafusion/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ object_store = { workspace = true }
121121
parking_lot = { workspace = true }
122122
parquet = { workspace = true, optional = true, default-features = true }
123123
rand = { workspace = true }
124+
regex = { workspace = true }
124125
sqlparser = { workspace = true }
125126
tempfile = { workspace = true }
126127
tokio = { workspace = true }

datafusion/core/benches/csv_load.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,16 @@ fn criterion_benchmark(c: &mut Criterion) {
7575
)
7676
})
7777
});
78+
79+
group.bench_function("null regex override", |b| {
80+
b.iter(|| {
81+
load_csv(
82+
ctx.clone(),
83+
test_file.path().to_str().unwrap(),
84+
CsvReadOptions::default().null_regex(Some("^NULL$|^$".to_string())),
85+
)
86+
})
87+
});
7888
}
7989

8090
criterion_group!(benches, criterion_benchmark);

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement;
5959
use futures::stream::BoxStream;
6060
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
6161
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};
62+
use regex::Regex;
6263

6364
#[derive(Default)]
6465
/// Factory struct used to create [CsvFormatFactory]
@@ -218,6 +219,13 @@ impl CsvFormat {
218219
self
219220
}
220221

222+
/// Set the regex to use for null values in the CSV reader.
223+
/// - default to treat empty values as null.
224+
pub fn with_null_regex(mut self, null_regex: Option<String>) -> Self {
225+
self.options.null_regex = null_regex;
226+
self
227+
}
228+
221229
/// Returns `Some(true)` if the first line is a header, `Some(false)` if
222230
/// it is not, and `None` if it is not specified.
223231
pub fn has_header(&self) -> Option<bool> {
@@ -501,6 +509,14 @@ impl CsvFormat {
501509
)
502510
.with_delimiter(self.options.delimiter)
503511
.with_quote(self.options.quote);
512+
// Support literal NULL or empty string as null
513+
// .with_null_regex(Regex::new(r"^NULL$|^$").unwrap());
514+
515+
if let Some(null_regex) = &self.options.null_regex {
516+
let regex = Regex::new(null_regex.as_str())
517+
.expect("Unable to parse CSV null regex.");
518+
format = format.with_null_regex(regex);
519+
}
504520

505521
if let Some(escape) = self.options.escape {
506522
format = format.with_escape(escape);
@@ -813,8 +829,67 @@ mod tests {
813829
let state = session_ctx.state();
814830

815831
let projection = None;
816-
let exec =
817-
get_exec(&state, "aggregate_test_100.csv", projection, None, true).await?;
832+
let root = "./tests/data/csv";
833+
let format = CsvFormat::default().with_has_header(true);
834+
let exec = scan_format(
835+
&state,
836+
&format,
837+
root,
838+
"aggregate_test_100_with_nulls.csv",
839+
projection,
840+
None,
841+
)
842+
.await?;
843+
844+
let x: Vec<String> = exec
845+
.schema()
846+
.fields()
847+
.iter()
848+
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
849+
.collect();
850+
assert_eq!(
851+
vec![
852+
"c1: Utf8",
853+
"c2: Int64",
854+
"c3: Int64",
855+
"c4: Int64",
856+
"c5: Int64",
857+
"c6: Int64",
858+
"c7: Int64",
859+
"c8: Int64",
860+
"c9: Int64",
861+
"c10: Utf8",
862+
"c11: Float64",
863+
"c12: Float64",
864+
"c13: Utf8",
865+
"c14: Null",
866+
"c15: Utf8"
867+
],
868+
x
869+
);
870+
871+
Ok(())
872+
}
873+
874+
#[tokio::test]
875+
async fn infer_schema_with_null_regex() -> Result<()> {
876+
let session_ctx = SessionContext::new();
877+
let state = session_ctx.state();
878+
879+
let projection = None;
880+
let root = "./tests/data/csv";
881+
let format = CsvFormat::default()
882+
.with_has_header(true)
883+
.with_null_regex(Some("^NULL$|^$".to_string()));
884+
let exec = scan_format(
885+
&state,
886+
&format,
887+
root,
888+
"aggregate_test_100_with_nulls.csv",
889+
projection,
890+
None,
891+
)
892+
.await?;
818893

819894
let x: Vec<String> = exec
820895
.schema()
@@ -836,7 +911,9 @@ mod tests {
836911
"c10: Utf8",
837912
"c11: Float64",
838913
"c12: Float64",
839-
"c13: Utf8"
914+
"c13: Utf8",
915+
"c14: Null",
916+
"c15: Null"
840917
],
841918
x
842919
);

datafusion/core/src/datasource/file_format/options.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ pub struct CsvReadOptions<'a> {
8787
pub file_compression_type: FileCompressionType,
8888
/// Indicates how the file is sorted
8989
pub file_sort_order: Vec<Vec<SortExpr>>,
90+
/// Optional regex to match null values
91+
pub null_regex: Option<String>,
9092
}
9193

9294
impl Default for CsvReadOptions<'_> {
@@ -112,6 +114,7 @@ impl<'a> CsvReadOptions<'a> {
112114
file_compression_type: FileCompressionType::UNCOMPRESSED,
113115
file_sort_order: vec![],
114116
comment: None,
117+
null_regex: None,
115118
}
116119
}
117120

@@ -212,6 +215,12 @@ impl<'a> CsvReadOptions<'a> {
212215
self.file_sort_order = file_sort_order;
213216
self
214217
}
218+
219+
/// Configure the null parsing regex.
220+
pub fn null_regex(mut self, null_regex: Option<String>) -> Self {
221+
self.null_regex = null_regex;
222+
self
223+
}
215224
}
216225

217226
/// Options that control the reading of Parquet files.
@@ -534,7 +543,8 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
534543
.with_terminator(self.terminator)
535544
.with_newlines_in_values(self.newlines_in_values)
536545
.with_schema_infer_max_rec(self.schema_infer_max_records)
537-
.with_file_compression_type(self.file_compression_type.to_owned());
546+
.with_file_compression_type(self.file_compression_type.to_owned())
547+
.with_null_regex(self.null_regex.clone());
538548

539549
ListingOptions::new(Arc::new(file_format))
540550
.with_file_extension(self.file_extension)

0 commit comments

Comments
 (0)