Skip to content

Commit cd065ee

Browse files
committed
Use TableSchema in FileScanConfig (apache#18231)
Steps towards apache#14993
1 parent 03def45 commit cd065ee

File tree

2 files changed

+34
-22
lines changed

2 files changed

+34
-22
lines changed

datafusion/datasource/src/file_scan_config.rs

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ pub struct FileScanConfig {
156156
/// Schema information including the file schema, table partition columns,
157157
/// and the combined table schema.
158158
///
159+
/// The table schema (file schema + partition columns) is the schema exposed
160+
/// upstream of [`FileScanConfig`] (e.g. in [`DataSourceExec`]).
161+
///
162+
/// See [`TableSchema`] for more information.
163+
///
159164
/// [`DataSourceExec`]: crate::source::DataSourceExec
160165
pub table_schema: TableSchema,
161166
/// List of files to be processed, grouped into partitions
@@ -244,23 +249,19 @@ pub struct FileScanConfig {
244249
#[derive(Clone)]
245250
pub struct FileScanConfigBuilder {
246251
object_store_url: ObjectStoreUrl,
247-
/// Table schema before any projections or partition columns are applied.
252+
/// Schema information including the file schema, table partition columns,
253+
/// and the combined table schema.
248254
///
249-
/// This schema is used to read the files, but is **not** necessarily the
250-
/// schema of the physical files. Rather this is the schema that the
255+
/// This schema is used to read the files, but the file schema is **not** necessarily
256+
/// the schema of the physical files. Rather this is the schema that the
251257
/// physical file schema will be mapped onto, and the schema that the
252258
/// [`DataSourceExec`] will return.
253259
///
254-
/// This is usually the same as the table schema as specified by the `TableProvider` minus any partition columns.
255-
///
256-
/// This probably would be better named `table_schema`
257-
///
258260
/// [`DataSourceExec`]: crate::source::DataSourceExec
259-
file_schema: SchemaRef,
261+
table_schema: TableSchema,
260262
file_source: Arc<dyn FileSource>,
261263
limit: Option<usize>,
262264
projection: Option<Vec<usize>>,
263-
table_partition_cols: Vec<FieldRef>,
264265
constraints: Option<Constraints>,
265266
file_groups: Vec<FileGroup>,
266267
statistics: Option<Statistics>,
@@ -285,7 +286,7 @@ impl FileScanConfigBuilder {
285286
) -> Self {
286287
Self {
287288
object_store_url,
288-
file_schema,
289+
table_schema: TableSchema::from_file_schema(file_schema),
289290
file_source,
290291
file_groups: vec![],
291292
statistics: None,
@@ -294,7 +295,6 @@ impl FileScanConfigBuilder {
294295
new_lines_in_values: None,
295296
limit: None,
296297
projection: None,
297-
table_partition_cols: vec![],
298298
constraints: None,
299299
batch_size: None,
300300
expr_adapter_factory: None,
@@ -326,10 +326,13 @@ impl FileScanConfigBuilder {
326326

327327
/// Set the partitioning columns
328328
pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
329-
self.table_partition_cols = table_partition_cols
329+
let table_partition_cols: Vec<FieldRef> = table_partition_cols
330330
.into_iter()
331331
.map(|f| Arc::new(f) as FieldRef)
332332
.collect();
333+
self.table_schema = self
334+
.table_schema
335+
.with_table_partition_cols(table_partition_cols);
333336
self
334337
}
335338

@@ -427,11 +430,10 @@ impl FileScanConfigBuilder {
427430
pub fn build(self) -> FileScanConfig {
428431
let Self {
429432
object_store_url,
430-
file_schema,
433+
table_schema,
431434
file_source,
432435
limit,
433436
projection,
434-
table_partition_cols,
435437
constraints,
436438
file_groups,
437439
statistics,
@@ -443,19 +445,16 @@ impl FileScanConfigBuilder {
443445
} = self;
444446

445447
let constraints = constraints.unwrap_or_default();
446-
let statistics =
447-
statistics.unwrap_or_else(|| Statistics::new_unknown(&file_schema));
448+
let statistics = statistics
449+
.unwrap_or_else(|| Statistics::new_unknown(table_schema.file_schema()));
448450

449451
let file_source = file_source
450452
.with_statistics(statistics.clone())
451-
.with_schema(Arc::clone(&file_schema));
453+
.with_schema(Arc::clone(table_schema.file_schema()));
452454
let file_compression_type =
453455
file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
454456
let new_lines_in_values = new_lines_in_values.unwrap_or(false);
455457

456-
// Create TableSchema from file_schema and table_partition_cols
457-
let table_schema = TableSchema::new(file_schema, table_partition_cols);
458-
459458
FileScanConfig {
460459
object_store_url,
461460
table_schema,
@@ -477,7 +476,7 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
477476
fn from(config: FileScanConfig) -> Self {
478477
Self {
479478
object_store_url: config.object_store_url,
480-
file_schema: Arc::clone(config.table_schema.file_schema()),
479+
table_schema: config.table_schema,
481480
file_source: Arc::<dyn FileSource>::clone(&config.file_source),
482481
file_groups: config.file_groups,
483482
statistics: config.file_source.statistics().ok(),
@@ -486,7 +485,6 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
486485
new_lines_in_values: Some(config.new_lines_in_values),
487486
limit: config.limit,
488487
projection: config.projection,
489-
table_partition_cols: config.table_schema.table_partition_cols().clone(),
490488
constraints: Some(config.constraints),
491489
batch_size: config.batch_size,
492490
expr_adapter_factory: config.expr_adapter_factory,

datafusion/datasource/src/table_schema.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,20 @@ impl TableSchema {
121121
}
122122
}
123123

124+
/// Create a new TableSchema from a file schema with no partition columns.
125+
pub fn from_file_schema(file_schema: SchemaRef) -> Self {
126+
Self::new(file_schema, vec![])
127+
}
128+
129+
/// Set the table partition columns and rebuild the table schema.
130+
pub fn with_table_partition_cols(
131+
mut self,
132+
table_partition_cols: Vec<FieldRef>,
133+
) -> TableSchema {
134+
self.table_partition_cols = table_partition_cols;
135+
self
136+
}
137+
124138
/// Get the file schema (without partition columns).
125139
///
126140
/// This is the schema of the actual data files on disk.

0 commit comments

Comments
 (0)