Skip to content

Commit 7b16d6b

Browse files
authored
Support csv truncated rows in datafusion (apache#17465)
1 parent 2a8435e commit 7b16d6b

File tree

16 files changed

+300
-5
lines changed

16 files changed

+300
-5
lines changed

datafusion/common/src/config.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2521,6 +2521,10 @@ config_namespace! {
25212521
// The input regex for Nulls when loading CSVs.
25222522
pub null_regex: Option<String>, default = None
25232523
pub comment: Option<u8>, default = None
2524+
// Whether to allow truncated rows when parsing.
2525+
// By default this is set to false and will error if the CSV rows have different lengths.
2526+
// When set to true then it will allow records with less than the expected number of columns
2527+
pub truncated_rows: Option<bool>, default = None
25242528
}
25252529
}
25262530

@@ -2613,6 +2617,15 @@ impl CsvOptions {
26132617
self
26142618
}
26152619

2620+
/// Whether to allow truncated rows when parsing.
2621+
/// By default this is set to false and will error if the CSV rows have different lengths.
2622+
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
2623+
/// If the record’s schema is not nullable, then it will still return an error.
2624+
pub fn with_truncated_rows(mut self, allow: bool) -> Self {
2625+
self.truncated_rows = Some(allow);
2626+
self
2627+
}
2628+
26162629
/// The delimiter character.
26172630
pub fn delimiter(&self) -> u8 {
26182631
self.delimiter

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 178 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ mod tests {
4848
use datafusion_physical_plan::{collect, ExecutionPlan};
4949

5050
use arrow::array::{
51-
BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray,
51+
Array, BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray,
5252
};
5353
use arrow::compute::concat_batches;
5454
use arrow::csv::ReaderBuilder;
@@ -1256,4 +1256,181 @@ mod tests {
12561256
.build_decoder();
12571257
DecoderDeserializer::new(CsvDecoder::new(decoder))
12581258
}
1259+
1260+
fn csv_deserializer_with_truncated(
1261+
batch_size: usize,
1262+
schema: &Arc<Schema>,
1263+
) -> impl BatchDeserializer<Bytes> {
1264+
// using Arrow's ReaderBuilder and enabling truncated_rows
1265+
let decoder = ReaderBuilder::new(schema.clone())
1266+
.with_batch_size(batch_size)
1267+
.with_truncated_rows(true) // <- enable runtime truncated_rows
1268+
.build_decoder();
1269+
DecoderDeserializer::new(CsvDecoder::new(decoder))
1270+
}
1271+
1272+
#[tokio::test]
1273+
async fn infer_schema_with_truncated_rows_true() -> Result<()> {
1274+
let session_ctx = SessionContext::new();
1275+
let state = session_ctx.state();
1276+
1277+
// CSV: header has 3 columns, but first data row has only 2 columns, second row has 3
1278+
let csv_data = Bytes::from("a,b,c\n1,2\n3,4,5\n");
1279+
let variable_object_store = Arc::new(VariableStream::new(csv_data, 1));
1280+
let object_meta = ObjectMeta {
1281+
location: Path::parse("/")?,
1282+
last_modified: DateTime::default(),
1283+
size: u64::MAX,
1284+
e_tag: None,
1285+
version: None,
1286+
};
1287+
1288+
// Construct CsvFormat and enable truncated_rows via CsvOptions
1289+
let csv_options = CsvOptions::default().with_truncated_rows(true);
1290+
let csv_format = CsvFormat::default()
1291+
.with_has_header(true)
1292+
.with_options(csv_options)
1293+
.with_schema_infer_max_rec(10);
1294+
1295+
let inferred_schema = csv_format
1296+
.infer_schema(
1297+
&state,
1298+
&(variable_object_store.clone() as Arc<dyn ObjectStore>),
1299+
&[object_meta],
1300+
)
1301+
.await?;
1302+
1303+
// header has 3 columns; inferred schema should also have 3
1304+
assert_eq!(inferred_schema.fields().len(), 3);
1305+
1306+
// inferred columns should be nullable
1307+
for f in inferred_schema.fields() {
1308+
assert!(f.is_nullable());
1309+
}
1310+
1311+
Ok(())
1312+
}
1313+
#[test]
1314+
fn test_decoder_truncated_rows_runtime() -> Result<()> {
1315+
// Synchronous test: Decoder API used here is synchronous
1316+
let schema = csv_schema(); // helper already defined in file
1317+
1318+
// Construct a decoder that enables truncated_rows at runtime
1319+
let mut deserializer = csv_deserializer_with_truncated(10, &schema);
1320+
1321+
// Provide two rows: first row complete, second row missing last column
1322+
let input = Bytes::from("0,0.0,true,0-string\n1,1.0,true\n");
1323+
deserializer.digest(input);
1324+
1325+
// Finish and collect output
1326+
deserializer.finish();
1327+
1328+
let output = deserializer.next()?;
1329+
match output {
1330+
DeserializerOutput::RecordBatch(batch) => {
1331+
// ensure at least two rows present
1332+
assert!(batch.num_rows() >= 2);
1333+
// column 4 (index 3) should be a StringArray where second row is NULL
1334+
let col4 = batch
1335+
.column(3)
1336+
.as_any()
1337+
.downcast_ref::<StringArray>()
1338+
.expect("column 4 should be StringArray");
1339+
1340+
// first row present, second row should be null
1341+
assert!(!col4.is_null(0));
1342+
assert!(col4.is_null(1));
1343+
}
1344+
other => panic!("expected RecordBatch but got {other:?}"),
1345+
}
1346+
Ok(())
1347+
}
1348+
1349+
#[tokio::test]
1350+
async fn infer_schema_truncated_rows_false_error() -> Result<()> {
1351+
let session_ctx = SessionContext::new();
1352+
let state = session_ctx.state();
1353+
1354+
// CSV: header has 4 cols, first data row has 3 cols -> truncated at end
1355+
let csv_data = Bytes::from("id,a,b,c\n1,foo,bar\n2,foo,bar,baz\n");
1356+
let variable_object_store = Arc::new(VariableStream::new(csv_data, 1));
1357+
let object_meta = ObjectMeta {
1358+
location: Path::parse("/")?,
1359+
last_modified: DateTime::default(),
1360+
size: u64::MAX,
1361+
e_tag: None,
1362+
version: None,
1363+
};
1364+
1365+
// CsvFormat without enabling truncated_rows (default behavior = false)
1366+
let csv_format = CsvFormat::default()
1367+
.with_has_header(true)
1368+
.with_schema_infer_max_rec(10);
1369+
1370+
let res = csv_format
1371+
.infer_schema(
1372+
&state,
1373+
&(variable_object_store.clone() as Arc<dyn ObjectStore>),
1374+
&[object_meta],
1375+
)
1376+
.await;
1377+
1378+
// Expect an error due to unequal lengths / incorrect number of fields
1379+
assert!(
1380+
res.is_err(),
1381+
"expected infer_schema to error on truncated rows when disabled"
1382+
);
1383+
1384+
// Optional: check message contains indicative text (two known possibilities)
1385+
if let Err(err) = res {
1386+
let msg = format!("{err}");
1387+
assert!(
1388+
msg.contains("Encountered unequal lengths")
1389+
|| msg.contains("incorrect number of fields"),
1390+
"unexpected error message: {msg}",
1391+
);
1392+
}
1393+
1394+
Ok(())
1395+
}
1396+
1397+
#[tokio::test]
1398+
async fn test_read_csv_truncated_rows_via_tempfile() -> Result<()> {
1399+
use std::io::Write;
1400+
1401+
// create a SessionContext
1402+
let ctx = SessionContext::new();
1403+
1404+
// Create a temp file with a .csv suffix so the reader accepts it
1405+
let mut tmp = tempfile::Builder::new().suffix(".csv").tempfile()?; // ensures path ends with .csv
1406+
// CSV has header "a,b,c". First data row is truncated (only "1,2"), second row is complete.
1407+
write!(tmp, "a,b,c\n1,2\n3,4,5\n")?;
1408+
let path = tmp.path().to_str().unwrap().to_string();
1409+
1410+
// Build CsvReadOptions: header present, enable truncated_rows.
1411+
// (Use the exact builder method your crate exposes: `truncated_rows(true)` here,
1412+
// if the method name differs in your codebase use the appropriate one.)
1413+
let options = CsvReadOptions::default().truncated_rows(true);
1414+
1415+
println!("options: {}, path: {path}", options.truncated_rows);
1416+
1417+
// Call the API under test
1418+
let df = ctx.read_csv(&path, options).await?;
1419+
1420+
// Collect the results and combine batches so we can inspect columns
1421+
let batches = df.collect().await?;
1422+
let combined = concat_batches(&batches[0].schema(), &batches)?;
1423+
1424+
// Column 'c' is the 3rd column (index 2). The first data row was truncated -> should be NULL.
1425+
let col_c = combined.column(2);
1426+
assert!(
1427+
col_c.is_null(0),
1428+
"expected first row column 'c' to be NULL due to truncated row"
1429+
);
1430+
1431+
// Also ensure we read at least one row
1432+
assert!(combined.num_rows() >= 2);
1433+
1434+
Ok(())
1435+
}
12591436
}

datafusion/core/src/datasource/file_format/options.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ pub struct CsvReadOptions<'a> {
9191
pub file_sort_order: Vec<Vec<SortExpr>>,
9292
/// Optional regex to match null values
9393
pub null_regex: Option<String>,
94+
/// Whether to allow truncated rows when parsing.
95+
/// By default this is set to false and will error if the CSV rows have different lengths.
96+
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
97+
/// If the record’s schema is not nullable, then it will still return an error.
98+
pub truncated_rows: bool,
9499
}
95100

96101
impl Default for CsvReadOptions<'_> {
@@ -117,6 +122,7 @@ impl<'a> CsvReadOptions<'a> {
117122
file_sort_order: vec![],
118123
comment: None,
119124
null_regex: None,
125+
truncated_rows: false,
120126
}
121127
}
122128

@@ -223,6 +229,15 @@ impl<'a> CsvReadOptions<'a> {
223229
self.null_regex = null_regex;
224230
self
225231
}
232+
233+
/// Configure whether to allow truncated rows when parsing.
234+
/// By default this is set to false and will error if the CSV rows have different lengths
235+
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
236+
/// If the record’s schema is not nullable, then it will still return an error.
237+
pub fn truncated_rows(mut self, truncated_rows: bool) -> Self {
238+
self.truncated_rows = truncated_rows;
239+
self
240+
}
226241
}
227242

228243
/// Options that control the reading of Parquet files.
@@ -558,7 +573,8 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
558573
.with_newlines_in_values(self.newlines_in_values)
559574
.with_schema_infer_max_rec(self.schema_infer_max_records)
560575
.with_file_compression_type(self.file_compression_type.to_owned())
561-
.with_null_regex(self.null_regex.clone());
576+
.with_null_regex(self.null_regex.clone())
577+
.with_truncated_rows(self.truncated_rows);
562578

563579
ListingOptions::new(Arc::new(file_format))
564580
.with_file_extension(self.file_extension)

datafusion/datasource-csv/src/file_format.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,11 @@ impl CsvFormat {
222222
self
223223
}
224224

225+
pub fn with_truncated_rows(mut self, truncated_rows: bool) -> Self {
226+
self.options.truncated_rows = Some(truncated_rows);
227+
self
228+
}
229+
225230
/// Set the regex to use for null values in the CSV reader.
226231
/// - default to treat empty values as null.
227232
pub fn with_null_regex(mut self, null_regex: Option<String>) -> Self {
@@ -291,6 +296,13 @@ impl CsvFormat {
291296
self
292297
}
293298

299+
/// Set whether rows should be truncated to the column width
300+
/// - defaults to false
301+
pub fn with_truncate_rows(mut self, truncate_rows: bool) -> Self {
302+
self.options.truncated_rows = Some(truncate_rows);
303+
self
304+
}
305+
294306
/// The delimiter character.
295307
pub fn delimiter(&self) -> u8 {
296308
self.options.delimiter
@@ -426,11 +438,13 @@ impl FileFormat for CsvFormat {
426438
.with_file_compression_type(self.options.compression.into())
427439
.with_newlines_in_values(newlines_in_values);
428440

441+
let truncated_rows = self.options.truncated_rows.unwrap_or(false);
429442
let source = Arc::new(
430443
CsvSource::new(has_header, self.options.delimiter, self.options.quote)
431444
.with_escape(self.options.escape)
432445
.with_terminator(self.options.terminator)
433-
.with_comment(self.options.comment),
446+
.with_comment(self.options.comment)
447+
.with_truncate_rows(truncated_rows),
434448
);
435449

436450
let config = conf_builder.with_source(source).build();
@@ -509,7 +523,8 @@ impl CsvFormat {
509523
.unwrap_or_else(|| state.config_options().catalog.has_header),
510524
)
511525
.with_delimiter(self.options.delimiter)
512-
.with_quote(self.options.quote);
526+
.with_quote(self.options.quote)
527+
.with_truncated_rows(self.options.truncated_rows.unwrap_or(false));
513528

514529
if let Some(null_regex) = &self.options.null_regex {
515530
let regex = Regex::new(null_regex.as_str())

datafusion/datasource-csv/src/source.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ pub struct CsvSource {
9494
metrics: ExecutionPlanMetricsSet,
9595
projected_statistics: Option<Statistics>,
9696
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
97+
truncate_rows: bool,
9798
}
9899

99100
impl CsvSource {
@@ -111,6 +112,11 @@ impl CsvSource {
111112
pub fn has_header(&self) -> bool {
112113
self.has_header
113114
}
115+
116+
// true if rows length support truncate
117+
pub fn truncate_rows(&self) -> bool {
118+
self.truncate_rows
119+
}
114120
/// A column delimiter
115121
pub fn delimiter(&self) -> u8 {
116122
self.delimiter
@@ -156,6 +162,13 @@ impl CsvSource {
156162
conf.comment = comment;
157163
conf
158164
}
165+
166+
/// Whether to support truncate rows when read csv file
167+
pub fn with_truncate_rows(&self, truncate_rows: bool) -> Self {
168+
let mut conf = self.clone();
169+
conf.truncate_rows = truncate_rows;
170+
conf
171+
}
159172
}
160173

161174
impl CsvSource {
@@ -175,7 +188,8 @@ impl CsvSource {
175188
.expect("Batch size must be set before initializing builder"),
176189
)
177190
.with_header(self.has_header)
178-
.with_quote(self.quote);
191+
.with_quote(self.quote)
192+
.with_truncated_rows(self.truncate_rows);
179193
if let Some(terminator) = self.terminator {
180194
builder = builder.with_terminator(terminator);
181195
}
@@ -340,6 +354,7 @@ impl FileOpener for CsvOpener {
340354

341355
let config = CsvSource {
342356
has_header: csv_has_header,
357+
truncate_rows: self.config.truncate_rows,
343358
..(*self.config).clone()
344359
};
345360

datafusion/proto-common/proto/datafusion_common.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,7 @@ message CsvOptions {
460460
bytes double_quote = 15; // Indicates if quotes are doubled
461461
bytes newlines_in_values = 16; // Indicates if newlines are supported in values
462462
bytes terminator = 17; // Optional terminator character as a byte
463+
bytes truncated_rows = 18; // Indicates if truncated rows are allowed
463464
}
464465

465466
// Options controlling CSV format

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -917,6 +917,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
917917
null_regex: (!proto_opts.null_regex.is_empty())
918918
.then(|| proto_opts.null_regex.clone()),
919919
comment: proto_opts.comment.first().copied(),
920+
truncated_rows: proto_opts.truncated_rows.first().map(|h| *h != 0),
920921
})
921922
}
922923
}

0 commit comments

Comments
 (0)