Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ pub struct FileScanConfig {
pub new_lines_in_values: bool,
/// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
pub file_source: Arc<dyn FileSource>,
/// Batch size while creating new batches
/// Defaults to [`datafusion_common::config::ExecutionOptions`] batch_size.
pub batch_size: Option<usize>,
}

impl DataSource for FileScanConfig {
Expand All @@ -176,10 +179,13 @@ impl DataSource for FileScanConfig {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let object_store = context.runtime_env().object_store(&self.object_store_url)?;
let batch_size = self
.batch_size
.unwrap_or_else(|| context.session_config().batch_size());

let source = self
.file_source
.with_batch_size(context.session_config().batch_size())
.with_batch_size(batch_size)
.with_schema(Arc::clone(&self.file_schema))
.with_projection(self);

Expand Down Expand Up @@ -338,6 +344,7 @@ impl FileScanConfig {
file_compression_type: FileCompressionType::UNCOMPRESSED,
new_lines_in_values: false,
file_source: Arc::clone(&file_source),
batch_size: None,
};

config = config.with_source(Arc::clone(&file_source));
Expand Down Expand Up @@ -492,6 +499,12 @@ impl FileScanConfig {
self
}

/// Set the batch_size property
pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
self.batch_size = batch_size;
self
}

/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,7 @@ message FileScanExecConf {
reserved 10;

datafusion_common.Constraints constraints = 11;
optional uint64 batch_size = 12;
}

message ParquetScanExecNode {
Expand Down
22 changes: 22 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,8 @@ pub fn parse_protobuf_file_scan_config(
.with_projection(projection)
.with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
.with_table_partition_cols(table_partition_cols)
.with_output_ordering(output_ordering);
.with_output_ordering(output_ordering)
.with_batch_size(proto.batch_size.map(|s| s as usize));
Ok(config)
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ pub fn serialize_file_scan_config(
})
.collect::<Vec<_>>(),
constraints: Some(conf.constraints.clone().into()),
batch_size: conf.batch_size.map(|s| s as u64),
})
}

Expand Down