Skip to content

Commit 6885f04

Browse files
committed
fix: infer data type for null columns and empty tables as Utf8`
1 parent d604790 commit 6885f04

File tree

2 files changed

+76
-49
lines changed

2 files changed

+76
-49
lines changed

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 76 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -253,16 +253,8 @@ mod tests {
253253
let projection = None;
254254
let root = "./tests/data/csv";
255255
let format = CsvFormat::default().with_has_header(true);
256-
let exec = scan_format(
257-
&state,
258-
&format,
259-
None,
260-
root,
261-
"aggregate_test_100_with_nulls.csv",
262-
projection,
263-
None,
264-
)
265-
.await?;
256+
let exec =
257+
get_exec(&state, "aggregate_test_100.csv", projection, None, true).await?;
266258

267259
let x: Vec<String> = exec
268260
.schema()
@@ -285,8 +277,6 @@ mod tests {
285277
"c11: Float64",
286278
"c12: Float64",
287279
"c13: Utf8",
288-
"c14: Null",
289-
"c15: Utf8"
290280
],
291281
x
292282
);
@@ -295,56 +285,95 @@ mod tests {
295285
}
296286

297287
#[tokio::test]
288+
async fn infer_schema_with_null() -> Result<()> {
289+
let session_ctx = SessionContext::new();
290+
let state = session_ctx.state();
291+
let variable_object_store = Arc::new(VariableStream::new(
292+
Bytes::from(
293+
r#"has_nulls,all_nulls,utf8
294+
1,,NULL
295+
,,
296+
2,,NULL"#,
297+
),
298+
1,
299+
));
300+
let object_meta = ObjectMeta {
301+
location: Path::parse("/")?,
302+
last_modified: DateTime::default(),
303+
size: u64::MAX,
304+
e_tag: None,
305+
version: None,
306+
};
307+
308+
let csv_format = CsvFormat::default().with_has_header(true);
309+
let inferred_schema = csv_format
310+
.infer_schema(
311+
&state,
312+
&(variable_object_store.clone() as Arc<dyn ObjectStore>),
313+
&[object_meta],
314+
)
315+
.await?;
316+
317+
let actual_fields: Vec<_> = inferred_schema
318+
.fields()
319+
.iter()
320+
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
321+
.collect();
322+
assert_eq!(
323+
vec!["has_nulls: Int64", "all_nulls: Utf8", "utf8: Utf8"],
324+
actual_fields
325+
);
326+
327+
Ok(())
328+
}
329+
298330
async fn infer_schema_with_null_regex() -> Result<()> {
299331
let session_ctx = SessionContext::new();
300332
let state = session_ctx.state();
333+
let variable_object_store = Arc::new(VariableStream::new(
334+
Bytes::from(
335+
r#"has_nulls,all_nulls,has_nulls_regex,all_nulls_regex
336+
1,,1,NULL
337+
,,NULL,NULL
338+
2,,2,NULL"#,
339+
),
340+
1,
341+
));
342+
let object_meta = ObjectMeta {
343+
location: Path::parse("/")?,
344+
last_modified: DateTime::default(),
345+
size: u64::MAX,
346+
e_tag: None,
347+
version: None,
348+
};
301349

302-
let projection = None;
303-
let root = "./tests/data/csv";
304-
let format = CsvFormat::default()
350+
let csv_format = CsvFormat::default()
305351
.with_has_header(true)
306352
.with_null_regex(Some("^NULL$|^$".to_string()));
307-
let exec = scan_format(
308-
&state,
309-
&format,
310-
None,
311-
root,
312-
"aggregate_test_100_with_nulls.csv",
313-
projection,
314-
None,
315-
)
316-
.await?;
353+
let inferred_schema = csv_format
354+
.infer_schema(
355+
&state,
356+
&(variable_object_store.clone() as Arc<dyn ObjectStore>),
357+
&[object_meta],
358+
)
359+
.await?;
317360

318-
let x: Vec<String> = exec
319-
.schema()
361+
let actual_fields: Vec<_> = inferred_schema
320362
.fields()
321363
.iter()
322364
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
323365
.collect();
324366
assert_eq!(
325367
vec![
326-
"c1: Utf8",
327-
"c2: Int64",
328-
"c3: Int64",
329-
"c4: Int64",
330-
"c5: Int64",
331-
"c6: Int64",
332-
"c7: Int64",
333-
"c8: Int64",
334-
"c9: Int64",
335-
"c10: Utf8",
336-
"c11: Float64",
337-
"c12: Float64",
338-
"c13: Utf8",
339-
"c14: Null",
340-
"c15: Null"
368+
"has_nulls: Int64",
369+
"all_nulls: Utf8",
370+
"has_nulls_regex: Int64",
371+
"all_nulls_regex: Utf8"
341372
],
342-
x
373+
actual_fields
343374
);
344-
345375
Ok(())
346376
}
347-
348377
#[tokio::test]
349378
async fn read_char_column() -> Result<()> {
350379
let session_ctx = SessionContext::new();
@@ -489,8 +518,7 @@ mod tests {
489518
Bytes::from(
490519
r#"c1,c2,c3
491520
1,1.0,
492-
,,
493-
"#,
521+
,,"#,
494522
),
495523
1,
496524
)),
@@ -520,7 +548,7 @@ mod tests {
520548
.collect();
521549

522550
// ensure null chunks don't skew type inference
523-
assert_eq!(vec!["c1: Int64", "c2: Float64", "c3: Null"], actual_fields);
551+
assert_eq!(vec!["c1: Int64", "c2: Float64", "c3: Utf8"], actual_fields);
524552
Ok(())
525553
}
526554

datafusion/datasource-csv/src/file_format.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,6 @@ fn build_schema_helper(names: Vec<String>, types: Vec<HashSet<DataType>>) -> Sch
600600
data_type_possibilities.remove(&DataType::Null);
601601

602602
match data_type_possibilities.len() {
603-
0 => Field::new(field_name, DataType::Null, true),
604603
1 => Field::new(
605604
field_name,
606605
data_type_possibilities.iter().next().unwrap().clone(),

0 commit comments

Comments
 (0)