Skip to content

Commit 0959545

Browse files
committed
feat: Support serde for FileScanConfig batch_size
1 parent 65c9af1 commit 0959545

File tree

6 files changed

+40
-2
lines changed

6 files changed

+40
-2
lines changed

datafusion/datasource/src/file_scan_config.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,9 @@ pub struct FileScanConfig {
167167
pub new_lines_in_values: bool,
168168
/// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
169169
pub file_source: Arc<dyn FileSource>,
170+
/// Batch size while creating new batches
171+
/// Defaults to [`datafusion_common::config::ExecutionOptions`] batch_size.
172+
pub batch_size: Option<usize>,
170173
}
171174

172175
impl DataSource for FileScanConfig {
@@ -176,10 +179,13 @@ impl DataSource for FileScanConfig {
176179
context: Arc<TaskContext>,
177180
) -> Result<SendableRecordBatchStream> {
178181
let object_store = context.runtime_env().object_store(&self.object_store_url)?;
182+
let batch_size = self
183+
.batch_size
184+
.unwrap_or_else(|| context.session_config().batch_size());
179185

180186
let source = self
181187
.file_source
182-
.with_batch_size(context.session_config().batch_size())
188+
.with_batch_size(batch_size)
183189
.with_schema(Arc::clone(&self.file_schema))
184190
.with_projection(self);
185191

@@ -338,6 +344,7 @@ impl FileScanConfig {
338344
file_compression_type: FileCompressionType::UNCOMPRESSED,
339345
new_lines_in_values: false,
340346
file_source: Arc::clone(&file_source),
347+
batch_size: None,
341348
};
342349

343350
config = config.with_source(Arc::clone(&file_source));
@@ -492,6 +499,12 @@ impl FileScanConfig {
492499
self
493500
}
494501

502+
/// Set the batch_size property
503+
pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
504+
self.batch_size = batch_size;
505+
self
506+
}
507+
495508
/// Specifies whether newlines in (quoted) values are supported.
496509
///
497510
/// Parsing newlines in quoted values may be affected by execution behaviour such as

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -997,6 +997,7 @@ message FileScanExecConf {
997997
reserved 10;
998998

999999
datafusion_common.Constraints constraints = 11;
1000+
optional uint32 batch_size = 12;
10001001
}
10011002

10021003
message ParquetScanExecNode {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,8 @@ pub fn parse_protobuf_file_scan_config(
544544
.with_projection(projection)
545545
.with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
546546
.with_table_partition_cols(table_partition_cols)
547-
.with_output_ordering(output_ordering);
547+
.with_output_ordering(output_ordering)
548+
.with_batch_size(proto.batch_size.map(|s| s as usize));
548549
Ok(config)
549550
}
550551

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ pub fn serialize_file_scan_config(
530530
})
531531
.collect::<Vec<_>>(),
532532
constraints: Some(conf.constraints.clone().into()),
533+
batch_size: conf.batch_size.map(|s| s as u32),
533534
})
534535
}
535536

0 commit comments

Comments
 (0)