Skip to content

Commit 55bf8e9

Browse files
doki23alamb
andauthored
Support types other than String for partition columns on ListingTables (#4221)
* partition columns config * solve test compile problems * fix ut * add partition column types config for each file reader options * get correct partition column types from partitioned files * remove DEFAULT_PARTITION_COLUMN_TYPE * update pruned_partition_list * change api * add some tests * create partitioned external table with schema * upd * Update datafusion/core/src/datasource/listing_table_factory.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * code clean * code clean Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 502b7e3 commit 55bf8e9

File tree

13 files changed

+399
-89
lines changed

13 files changed

+399
-89
lines changed

datafusion/core/src/datasource/listing/helpers.rs

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
2020
use std::sync::Arc;
2121

22+
use arrow::array::new_empty_array;
2223
use arrow::{
2324
array::{
2425
Array, ArrayBuilder, ArrayRef, Date64Array, Date64Builder, StringBuilder,
@@ -176,7 +177,7 @@ pub async fn pruned_partition_list<'a>(
176177
table_path: &'a ListingTableUrl,
177178
filters: &'a [Expr],
178179
file_extension: &'a str,
179-
table_partition_cols: &'a [String],
180+
table_partition_cols: &'a [(String, DataType)],
180181
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
181182
let list = table_path.list_all_files(store, file_extension);
182183

@@ -187,7 +188,15 @@ pub async fn pruned_partition_list<'a>(
187188

188189
let applicable_filters: Vec<_> = filters
189190
.iter()
190-
.filter(|f| expr_applicable_for_cols(table_partition_cols, f))
191+
.filter(|f| {
192+
expr_applicable_for_cols(
193+
&table_partition_cols
194+
.iter()
195+
.map(|x| x.0.clone())
196+
.collect::<Vec<_>>(),
197+
f,
198+
)
199+
})
191200
.collect();
192201

193202
if applicable_filters.is_empty() {
@@ -200,11 +209,26 @@ pub async fn pruned_partition_list<'a>(
200209
let parsed_path = parse_partitions_for_path(
201210
table_path,
202211
&object_meta.location,
203-
table_partition_cols,
212+
&table_partition_cols
213+
.iter()
214+
.map(|x| x.0.clone())
215+
.collect::<Vec<_>>(),
204216
)
205217
.map(|p| {
206218
p.iter()
207-
.map(|&pn| ScalarValue::Utf8(Some(pn.to_owned())))
219+
.zip(table_partition_cols)
220+
.map(|(&part_value, part_column)| {
221+
ScalarValue::try_from_string(
222+
part_value.to_string(),
223+
&part_column.1,
224+
)
225+
.unwrap_or_else(|_| {
226+
panic!(
227+
"Failed to cast str {} to type {}",
228+
part_value, part_column.1
229+
)
230+
})
231+
})
208232
.collect()
209233
});
210234

@@ -221,6 +245,7 @@ pub async fn pruned_partition_list<'a>(
221245
let metas: Vec<_> = list.try_collect().await?;
222246
let batch = paths_to_batch(table_partition_cols, table_path, &metas)?;
223247
let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
248+
debug!("get mem_table: {:?}", mem_table);
224249

225250
// Filter the partitions using a local datafusion context
226251
// TODO having the external context would allow us to resolve `Volatility::Stable`
@@ -245,28 +270,35 @@ pub async fn pruned_partition_list<'a>(
245270
///
246271
/// Note: For the last modified date, this looses precisions higher than millisecond.
247272
fn paths_to_batch(
248-
table_partition_cols: &[String],
273+
table_partition_cols: &[(String, DataType)],
249274
table_path: &ListingTableUrl,
250275
metas: &[ObjectMeta],
251276
) -> Result<RecordBatch> {
252277
let mut key_builder = StringBuilder::with_capacity(metas.len(), 1024);
253278
let mut length_builder = UInt64Builder::with_capacity(metas.len());
254279
let mut modified_builder = Date64Builder::with_capacity(metas.len());
255-
let mut partition_builders = table_partition_cols
280+
let mut partition_scalar_values = table_partition_cols
256281
.iter()
257-
.map(|_| StringBuilder::with_capacity(metas.len(), 1024))
282+
.map(|_| Vec::new())
258283
.collect::<Vec<_>>();
259284
for file_meta in metas {
260285
if let Some(partition_values) = parse_partitions_for_path(
261286
table_path,
262287
&file_meta.location,
263-
table_partition_cols,
288+
&table_partition_cols
289+
.iter()
290+
.map(|x| x.0.clone())
291+
.collect::<Vec<_>>(),
264292
) {
265293
key_builder.append_value(file_meta.location.as_ref());
266294
length_builder.append_value(file_meta.size as u64);
267295
modified_builder.append_value(file_meta.last_modified.timestamp_millis());
268296
for (i, part_val) in partition_values.iter().enumerate() {
269-
partition_builders[i].append_value(part_val);
297+
let scalar_val = ScalarValue::try_from_string(
298+
part_val.to_string(),
299+
&table_partition_cols[i].1,
300+
)?;
301+
partition_scalar_values[i].push(scalar_val);
270302
}
271303
} else {
272304
debug!("No partitioning for path {}", file_meta.location);
@@ -279,8 +311,13 @@ fn paths_to_batch(
279311
ArrayBuilder::finish(&mut length_builder),
280312
ArrayBuilder::finish(&mut modified_builder),
281313
];
282-
for mut partition_builder in partition_builders {
283-
col_arrays.push(ArrayBuilder::finish(&mut partition_builder));
314+
for (i, part_scalar_val) in partition_scalar_values.into_iter().enumerate() {
315+
if part_scalar_val.is_empty() {
316+
col_arrays.push(new_empty_array(&table_partition_cols[i].1));
317+
} else {
318+
let partition_val_array = ScalarValue::iter_to_array(part_scalar_val)?;
319+
col_arrays.push(partition_val_array);
320+
}
284321
}
285322

286323
// put the schema together
@@ -289,8 +326,8 @@ fn paths_to_batch(
289326
Field::new(FILE_SIZE_COLUMN_NAME, DataType::UInt64, false),
290327
Field::new(FILE_MODIFIED_COLUMN_NAME, DataType::Date64, true),
291328
];
292-
for pn in table_partition_cols {
293-
fields.push(Field::new(pn, DataType::Utf8, false));
329+
for part_col in table_partition_cols {
330+
fields.push(Field::new(&part_col.0, part_col.1.to_owned(), false));
294331
}
295332

296333
let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), col_arrays)?;
@@ -366,9 +403,10 @@ fn parse_partitions_for_path<'a>(
366403

367404
#[cfg(test)]
368405
mod tests {
406+
use futures::StreamExt;
407+
369408
use crate::logical_expr::{case, col, lit};
370409
use crate::test::object_store::make_test_store;
371-
use futures::StreamExt;
372410

373411
use super::*;
374412

@@ -424,7 +462,7 @@ mod tests {
424462
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
425463
&[filter],
426464
".parquet",
427-
&[String::from("mypartition")],
465+
&[(String::from("mypartition"), DataType::Utf8)],
428466
)
429467
.await
430468
.expect("partition pruning failed")
@@ -447,7 +485,7 @@ mod tests {
447485
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
448486
&[filter],
449487
".parquet",
450-
&[String::from("mypartition")],
488+
&[(String::from("mypartition"), DataType::Utf8)],
451489
)
452490
.await
453491
.expect("partition pruning failed")
@@ -494,7 +532,10 @@ mod tests {
494532
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
495533
&[filter1, filter2, filter3],
496534
".parquet",
497-
&[String::from("part1"), String::from("part2")],
535+
&[
536+
(String::from("part1"), DataType::Utf8),
537+
(String::from("part2"), DataType::Utf8),
538+
],
498539
)
499540
.await
500541
.expect("partition pruning failed")
@@ -645,7 +686,7 @@ mod tests {
645686
];
646687

647688
let batches = paths_to_batch(
648-
&[String::from("part1")],
689+
&[(String::from("part1"), DataType::Utf8)],
649690
&ListingTableUrl::parse("file:///mybucket/tablepath").unwrap(),
650691
&files,
651692
)

datafusion/core/src/datasource/listing/table.rs

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::str::FromStr;
2121
use std::{any::Any, sync::Arc};
2222

2323
use arrow::compute::SortOptions;
24-
use arrow::datatypes::{Field, Schema, SchemaRef};
24+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2525
use async_trait::async_trait;
2626
use dashmap::DashMap;
2727
use datafusion_physical_expr::PhysicalSortExpr;
@@ -41,14 +41,14 @@ use crate::datasource::{
4141
};
4242
use crate::logical_expr::TableProviderFilterPushDown;
4343
use crate::physical_plan;
44+
use crate::physical_plan::file_format::partition_type_wrap;
4445
use crate::{
4546
error::{DataFusionError, Result},
4647
execution::context::SessionState,
4748
logical_expr::Expr,
4849
physical_plan::{
49-
empty::EmptyExec,
50-
file_format::{FileScanConfig, DEFAULT_PARTITION_COLUMN_DATATYPE},
51-
project_schema, ExecutionPlan, Statistics,
50+
empty::EmptyExec, file_format::FileScanConfig, project_schema, ExecutionPlan,
51+
Statistics,
5252
},
5353
};
5454

@@ -210,9 +210,7 @@ pub struct ListingOptions {
210210
/// partitioning expected should be named "a" and "b":
211211
/// - If there is a third level of partitioning it will be ignored.
212212
/// - Files that don't follow this partitioning will be ignored.
213-
/// Note that only `DEFAULT_PARTITION_COLUMN_DATATYPE` is currently
214-
/// supported for the column type.
215-
pub table_partition_cols: Vec<String>,
213+
pub table_partition_cols: Vec<(String, DataType)>,
216214
/// Set true to try to guess statistics from the files.
217215
/// This can add a lot of overhead as it will usually require files
218216
/// to be opened and at least partially parsed.
@@ -270,16 +268,19 @@ impl ListingOptions {
270268
///
271269
/// ```
272270
/// use std::sync::Arc;
271+
/// use arrow::datatypes::DataType;
273272
/// use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
274273
///
275274
/// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
276-
/// .with_table_partition_cols(vec!["col_a".to_string(), "col_b".to_string()]);
275+
/// .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8),
276+
/// ("col_b".to_string(), DataType::Utf8)]);
277277
///
278-
/// assert_eq!(listing_options.table_partition_cols, vec!["col_a", "col_b"]);
278+
/// assert_eq!(listing_options.table_partition_cols, vec![("col_a".to_string(), DataType::Utf8),
279+
/// ("col_b".to_string(), DataType::Utf8)]);
279280
/// ```
280281
pub fn with_table_partition_cols(
281282
mut self,
282-
table_partition_cols: Vec<String>,
283+
table_partition_cols: Vec<(String, DataType)>,
283284
) -> Self {
284285
self.table_partition_cols = table_partition_cols;
285286
self
@@ -428,10 +429,10 @@ impl ListingTable {
428429

429430
// Add the partition columns to the file schema
430431
let mut table_fields = file_schema.fields().clone();
431-
for part in &options.table_partition_cols {
432+
for (part_col_name, part_col_type) in &options.table_partition_cols {
432433
table_fields.push(Field::new(
433-
part,
434-
DEFAULT_PARTITION_COLUMN_DATATYPE.clone(),
434+
part_col_name,
435+
partition_type_wrap(part_col_type.clone()),
435436
false,
436437
));
437438
}
@@ -536,6 +537,23 @@ impl TableProvider for ListingTable {
536537
return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
537538
}
538539

540+
// extract types of partition columns
541+
let table_partition_cols = self
542+
.options
543+
.table_partition_cols
544+
.iter()
545+
.map(|col| {
546+
(
547+
col.0.to_owned(),
548+
self.table_schema
549+
.field_with_name(&col.0)
550+
.unwrap()
551+
.data_type()
552+
.clone(),
553+
)
554+
})
555+
.collect();
556+
539557
// create the execution plan
540558
self.options
541559
.format
@@ -548,7 +566,7 @@ impl TableProvider for ListingTable {
548566
projection: projection.clone(),
549567
limit,
550568
output_ordering: self.try_create_output_ordering()?,
551-
table_partition_cols: self.options.table_partition_cols.clone(),
569+
table_partition_cols,
552570
config_options: ctx.config.config_options(),
553571
},
554572
filters,
@@ -560,7 +578,15 @@ impl TableProvider for ListingTable {
560578
&self,
561579
filter: &Expr,
562580
) -> Result<TableProviderFilterPushDown> {
563-
if expr_applicable_for_cols(&self.options.table_partition_cols, filter) {
581+
if expr_applicable_for_cols(
582+
&self
583+
.options
584+
.table_partition_cols
585+
.iter()
586+
.map(|x| x.0.clone())
587+
.collect::<Vec<_>>(),
588+
filter,
589+
) {
564590
// if filter can be handled by partiton pruning, it is exact
565591
Ok(TableProviderFilterPushDown::Exact)
566592
} else {
@@ -807,7 +833,10 @@ mod tests {
807833

808834
let opt = ListingOptions::new(Arc::new(AvroFormat {}))
809835
.with_file_extension(FileType::AVRO.get_ext())
810-
.with_table_partition_cols(vec![String::from("p1")])
836+
.with_table_partition_cols(vec![(
837+
String::from("p1"),
838+
partition_type_wrap(DataType::Utf8),
839+
)])
811840
.with_target_partitions(4);
812841

813842
let table_path = ListingTableUrl::parse("test:///table/").unwrap();

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::datasource::listing::{
2929
};
3030
use crate::datasource::TableProvider;
3131
use crate::execution::context::SessionState;
32+
use arrow::datatypes::{DataType, SchemaRef};
3233
use async_trait::async_trait;
3334
use datafusion_common::DataFusionError;
3435
use datafusion_expr::CreateExternalTable;
@@ -88,17 +89,46 @@ impl TableProviderFactory for ListingTableFactory {
8889
),
8990
};
9091

91-
let provided_schema = if cmd.schema.fields().is_empty() {
92-
None
92+
let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() {
93+
(
94+
None,
95+
cmd.table_partition_cols
96+
.iter()
97+
.map(|x| (x.clone(), DataType::Utf8))
98+
.collect::<Vec<_>>(),
99+
)
93100
} else {
94-
Some(Arc::new(cmd.schema.as_ref().to_owned().into()))
101+
let schema: SchemaRef = Arc::new(cmd.schema.as_ref().to_owned().into());
102+
let table_partition_cols = cmd
103+
.table_partition_cols
104+
.iter()
105+
.map(|col| {
106+
schema.field_with_name(col).map_err(|arrow_err| {
107+
DataFusionError::Execution(arrow_err.to_string())
108+
})
109+
})
110+
.collect::<datafusion_common::Result<Vec<_>>>()?
111+
.into_iter()
112+
.map(|f| (f.name().to_owned(), f.data_type().to_owned()))
113+
.collect();
114+
// exclude partition columns to support creating partitioned external table
115+
// with a specified column definition like
116+
// `create external table a(c0 int, c1 int) stored as csv partitioned by (c1)...`
117+
let mut project_idx = Vec::new();
118+
for i in 0..schema.fields().len() {
119+
if !cmd.table_partition_cols.contains(schema.field(i).name()) {
120+
project_idx.push(i);
121+
}
122+
}
123+
let schema = Arc::new(schema.project(&project_idx)?);
124+
(Some(schema), table_partition_cols)
95125
};
96126

97127
let options = ListingOptions::new(file_format)
98128
.with_collect_stat(state.config.collect_statistics)
99129
.with_file_extension(file_extension)
100130
.with_target_partitions(state.config.target_partitions)
101-
.with_table_partition_cols(cmd.table_partition_cols.clone())
131+
.with_table_partition_cols(table_partition_cols)
102132
.with_file_sort_order(None);
103133

104134
let table_path = ListingTableUrl::parse(&cmd.location)?;

datafusion/core/src/datasource/memory.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::physical_plan::ExecutionPlan;
3737
use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
3838

3939
/// In-memory table
40+
#[derive(Debug)]
4041
pub struct MemTable {
4142
schema: SchemaRef,
4243
batches: Vec<Vec<RecordBatch>>,

0 commit comments

Comments
 (0)