Skip to content

Commit 194d952

Browse files
authored
Merge pull request apache#11 from polygon-io/support_csv_truncate_for_read
X-1035 Part-2: support csv scan to read truncated rows
2 parents 5918ef8 + 253e49c commit 194d952

File tree

8 files changed

+110
-3
lines changed

8 files changed

+110
-3
lines changed

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1312,4 +1312,44 @@ mod tests {
13121312

13131313
Ok(())
13141314
}
1315+
1316+
#[tokio::test]
1317+
async fn test_read_csv_truncated_rows_via_tempfile() -> Result<()> {
1318+
use std::io::Write;
1319+
1320+
// create a SessionContext
1321+
let ctx = SessionContext::new();
1322+
1323+
// Create a temp file with a .csv suffix so the reader accepts it
1324+
let mut tmp = tempfile::Builder::new().suffix(".csv").tempfile()?; // ensures path ends with .csv
1325+
// CSV has header "a,b,c". First data row is truncated (only "1,2"), second row is complete.
1326+
write!(tmp, "a,b,c\n1,2\n3,4,5\n")?;
1327+
let path = tmp.path().to_str().unwrap().to_string();
1328+
1329+
// Build CsvReadOptions: header present, enable truncated_rows.
1330+
// (Use the exact builder method your crate exposes: `truncated_rows(true)` here,
1331+
// if the method name differs in your codebase use the appropriate one.)
1332+
let options = CsvReadOptions::default().truncated_rows(true);
1333+
1334+
println!("options: {}, path: {path}", options.truncated_rows);
1335+
1336+
// Call the API under test
1337+
let df = ctx.read_csv(&path, options).await?;
1338+
1339+
// Collect the results and combine batches so we can inspect columns
1340+
let batches = df.collect().await?;
1341+
let combined = concat_batches(&batches[0].schema(), &batches)?;
1342+
1343+
// Column 'c' is the 3rd column (index 2). The first data row was truncated -> should be NULL.
1344+
let col_c = combined.column(2);
1345+
assert!(
1346+
col_c.is_null(0),
1347+
"expected first row column 'c' to be NULL due to truncated row"
1348+
);
1349+
1350+
// Also ensure we read at least one row
1351+
assert!(combined.num_rows() >= 2);
1352+
1353+
Ok(())
1354+
}
13151355
}

datafusion/core/src/datasource/file_format/options.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ pub struct CsvReadOptions<'a> {
9191
pub file_sort_order: Vec<Vec<SortExpr>>,
9292
/// Optional regex to match null values
9393
pub null_regex: Option<String>,
94+
/// Whether to allow truncated rows when parsing.
95+
/// By default this is set to false and will error if the CSV rows have different lengths.
96+
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
97+
/// If the record’s schema is not nullable, then it will still return an error.
98+
pub truncated_rows: bool,
9499
}
95100

96101
impl Default for CsvReadOptions<'_> {
@@ -117,6 +122,7 @@ impl<'a> CsvReadOptions<'a> {
117122
file_sort_order: vec![],
118123
comment: None,
119124
null_regex: None,
125+
truncated_rows: false,
120126
}
121127
}
122128

@@ -223,6 +229,15 @@ impl<'a> CsvReadOptions<'a> {
223229
self.null_regex = null_regex;
224230
self
225231
}
232+
233+
/// Configure whether to allow truncated rows when parsing.
234+
/// By default this is set to false and will error if the CSV rows have different lengths
235+
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
236+
/// If the record’s schema is not nullable, then it will still return an error.
237+
pub fn truncated_rows(mut self, truncated_rows: bool) -> Self {
238+
self.truncated_rows = truncated_rows;
239+
self
240+
}
226241
}
227242

228243
/// Options that control the reading of Parquet files.
@@ -546,7 +561,8 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
546561
.with_newlines_in_values(self.newlines_in_values)
547562
.with_schema_infer_max_rec(self.schema_infer_max_records)
548563
.with_file_compression_type(self.file_compression_type.to_owned())
549-
.with_null_regex(self.null_regex.clone());
564+
.with_null_regex(self.null_regex.clone())
565+
.with_truncated_rows(self.truncated_rows);
550566

551567
ListingOptions::new(Arc::new(file_format))
552568
.with_file_extension(self.file_extension)

datafusion/datasource-csv/src/file_format.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,11 @@ impl CsvFormat {
222222
self
223223
}
224224

225+
pub fn with_truncated_rows(mut self, truncated_rows: bool) -> Self {
226+
self.options.truncated_rows = Some(truncated_rows);
227+
self
228+
}
229+
225230
/// Set the regex to use for null values in the CSV reader.
226231
/// - default to treat empty values as null.
227232
pub fn with_null_regex(mut self, null_regex: Option<String>) -> Self {
@@ -291,6 +296,13 @@ impl CsvFormat {
291296
self
292297
}
293298

299+
/// Set whether rows should be truncated to the column width
300+
/// - defaults to false
301+
pub fn with_truncate_rows(mut self, truncate_rows: bool) -> Self {
302+
self.options.truncated_rows = Some(truncate_rows);
303+
self
304+
}
305+
294306
/// The delimiter character.
295307
pub fn delimiter(&self) -> u8 {
296308
self.options.delimiter
@@ -422,11 +434,13 @@ impl FileFormat for CsvFormat {
422434
.with_file_compression_type(self.options.compression.into())
423435
.with_newlines_in_values(newlines_in_values);
424436

437+
let truncated_rows = self.options.truncated_rows.unwrap_or(false);
425438
let source = Arc::new(
426439
CsvSource::new(has_header, self.options.delimiter, self.options.quote)
427440
.with_escape(self.options.escape)
428441
.with_terminator(self.options.terminator)
429-
.with_comment(self.options.comment),
442+
.with_comment(self.options.comment)
443+
.with_truncate_rows(truncated_rows),
430444
);
431445

432446
let config = conf_builder.with_source(source).build();

datafusion/datasource-csv/src/source.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ pub struct CsvSource {
9393
metrics: ExecutionPlanMetricsSet,
9494
projected_statistics: Option<Statistics>,
9595
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
96+
truncate_rows: bool,
9697
}
9798

9899
impl CsvSource {
@@ -110,6 +111,11 @@ impl CsvSource {
110111
pub fn has_header(&self) -> bool {
111112
self.has_header
112113
}
114+
115+
// true if rows length support truncate
116+
pub fn truncate_rows(&self) -> bool {
117+
self.truncate_rows
118+
}
113119
/// A column delimiter
114120
pub fn delimiter(&self) -> u8 {
115121
self.delimiter
@@ -155,6 +161,13 @@ impl CsvSource {
155161
conf.comment = comment;
156162
conf
157163
}
164+
165+
/// Whether to support truncate rows when read csv file
166+
pub fn with_truncate_rows(&self, truncate_rows: bool) -> Self {
167+
let mut conf = self.clone();
168+
conf.truncate_rows = truncate_rows;
169+
conf
170+
}
158171
}
159172

160173
impl CsvSource {
@@ -174,7 +187,8 @@ impl CsvSource {
174187
.expect("Batch size must be set before initializing builder"),
175188
)
176189
.with_header(self.has_header)
177-
.with_quote(self.quote);
190+
.with_quote(self.quote)
191+
.with_truncated_rows(self.truncate_rows);
178192
if let Some(terminator) = self.terminator {
179193
builder = builder.with_terminator(terminator);
180194
}
@@ -335,6 +349,7 @@ impl FileOpener for CsvOpener {
335349

336350
let config = CsvSource {
337351
has_header: csv_has_header,
352+
truncate_rows: self.config.truncate_rows,
338353
..(*self.config).clone()
339354
};
340355

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,6 +1023,7 @@ message CsvScanExecNode {
10231023
string comment = 6;
10241024
}
10251025
bool newlines_in_values = 7;
1026+
bool truncate_rows = 8;
10261027
}
10271028

10281029
message JsonScanExecNode {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2286,6 +2286,7 @@ impl protobuf::PhysicalPlanNode {
22862286
None
22872287
},
22882288
newlines_in_values: maybe_csv.newlines_in_values(),
2289+
truncate_rows: csv_config.truncate_rows(),
22892290
},
22902291
)),
22912292
}));

0 commit comments

Comments
 (0)