From a29f2bec21f92ef56b80b110c93761004d8043f4 Mon Sep 17 00:00:00 2001 From: Mohamed Abdeen <83442793+MohamedAbdeen21@users.noreply.github.com> Date: Fri, 5 Apr 2024 20:43:36 +0200 Subject: [PATCH] Validate partitions columns in `CREATE EXTERNAL TABLE` if table already exists. (#9912) * prevent panic * initial version, bad code * some error handling * Some slt tests * docs and minor refactors * cleaning up * fix tests * clear err message for single-file partitioned tables * typo * test invalid/mixed partitions on disk * ensure order in error msg for testing --- .../core/src/datasource/listing/table.rs | 107 ++++++++++++++++++ .../src/datasource/listing_table_factory.rs | 2 + .../physical_plan/file_scan_config.rs | 12 +- datafusion/core/src/execution/context/mod.rs | 2 +- .../test_files/create_external_table.slt | 93 ++++++++++++++- 5 files changed, 212 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 5ef7b6241b60..6625abd650d7 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -61,6 +61,7 @@ use datafusion_physical_expr::{ use async_trait::async_trait; use futures::{future, stream, StreamExt, TryStreamExt}; +use itertools::Itertools; use object_store::ObjectStore; /// Configuration for creating a [`ListingTable`] @@ -438,6 +439,112 @@ impl ListingOptions { self.format.infer_schema(state, &store, &files).await } + + /// Infers the partition columns stored in `LOCATION` and compares + /// them with the columns provided in `PARTITIONED BY` to help prevent + /// accidental corrupts of partitioned tables. + /// + /// Allows specifying partial partitions. + pub async fn validate_partitions( + &self, + state: &SessionState, + table_path: &ListingTableUrl, + ) -> Result<()> { + if self.table_partition_cols.is_empty() { + return Ok(()); + } + + if !table_path.is_collection() { + return plan_err!( + "Can't create a partitioned table backed by a single file, \ + perhaps the URL is missing a trailing slash?" + ); + } + + let inferred = self.infer_partitions(state, table_path).await?; + + // no partitioned files found on disk + if inferred.is_empty() { + return Ok(()); + } + + let table_partition_names = self + .table_partition_cols + .iter() + .map(|(col_name, _)| col_name.clone()) + .collect_vec(); + + if inferred.len() < table_partition_names.len() { + return plan_err!( + "Inferred partitions to be {:?}, but got {:?}", + inferred, + table_partition_names + ); + } + + // match prefix to allow creating tables with partial partitions + for (idx, col) in table_partition_names.iter().enumerate() { + if &inferred[idx] != col { + return plan_err!( + "Inferred partitions to be {:?}, but got {:?}", + inferred, + table_partition_names + ); + } + } + + Ok(()) + } + + /// Infer the partitioning at the given path on the provided object store. + /// For performance reasons, it doesn't read all the files on disk + /// and therefore may fail to detect invalid partitioning. + async fn infer_partitions( + &self, + state: &SessionState, + table_path: &ListingTableUrl, + ) -> Result> { + let store = state.runtime_env().object_store(table_path)?; + + // only use 10 files for inference + // This can fail to detect inconsistent partition keys + // A DFS traversal approach of the store can help here + let files: Vec<_> = table_path + .list_all_files(state, store.as_ref(), &self.file_extension) + .await? + .take(10) + .try_collect() + .await?; + + let stripped_path_parts = files.iter().map(|file| { + table_path + .strip_prefix(&file.location) + .unwrap() + .collect_vec() + }); + + let partition_keys = stripped_path_parts + .map(|path_parts| { + path_parts + .into_iter() + .rev() + .skip(1) // get parents only; skip the file itself + .rev() + .map(|s| s.split('=').take(1).collect()) + .collect_vec() + }) + .collect_vec(); + + match partition_keys.into_iter().all_equal_value() { + Ok(v) => Ok(v), + Err(None) => Ok(vec![]), + Err(Some(diff)) => { + let mut sorted_diff = [diff.0, diff.1]; + sorted_diff.sort(); + plan_err!("Found mixed partition values on disk {:?}", sorted_diff) + } + } + } } /// Reads data from one or more files via an diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index cbadf163cecc..1a0eb34d1234 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -137,6 +137,8 @@ impl TableProviderFactory for ListingTableFactory { .with_table_partition_cols(table_partition_cols) .with_file_sort_order(cmd.order_exprs.clone()); + options.validate_partitions(state, &table_path).await?; + let resolved_schema = match provided_schema { None => options.infer_schema(state, &table_path).await?, Some(s) => s, diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 370ca91a0b0e..1ea411cb6f59 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -32,7 +32,7 @@ use arrow::datatypes::{ArrowNativeType, UInt16Type}; use arrow_array::{ArrayRef, DictionaryArray, RecordBatch, RecordBatchOptions}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion_common::stats::Precision; -use datafusion_common::{exec_err, ColumnStatistics, Statistics}; +use datafusion_common::{exec_err, ColumnStatistics, DataFusionError, Statistics}; use datafusion_physical_expr::LexOrdering; use log::warn; @@ -256,9 +256,17 @@ impl PartitionColumnProjector { file_batch.columns().len() ); } + let mut cols = file_batch.columns().to_vec(); for &(pidx, sidx) in &self.projected_partition_indexes { - let mut partition_value = Cow::Borrowed(&partition_values[pidx]); + let p_value = + partition_values + .get(pidx) + .ok_or(DataFusionError::Execution( + "Invalid partitioning found on disk".to_string(), + ))?; + + let mut partition_value = Cow::Borrowed(p_value); // check if user forgot to dict-encode the partition value let field = self.projected_schema.field(sidx); diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 1a582be3013d..31a474bd217c 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1113,7 +1113,7 @@ impl SessionContext { table_ref: impl Into, provider: Arc, ) -> Result>> { - let table_ref = table_ref.into(); + let table_ref: TableReference = table_ref.into(); let table = table_ref.table().to_owned(); self.state .read() diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index a200217af6e1..8aeeb06c1909 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -113,4 +113,95 @@ statement error DataFusion error: Invalid or Unsupported Configuration: Config v CREATE EXTERNAL TABLE csv_table (column1 int) STORED AS CSV LOCATION 'foo.csv' -OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123') \ No newline at end of file +OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123') + +# Partitioned table on a single file +query error DataFusion error: Error during planning: Can't create a partitioned table backed by a single file, perhaps the URL is missing a trailing slash\? +CREATE EXTERNAL TABLE single_file_partition(c1 int) +PARTITIONED BY (p2 string, p1 string) +STORED AS CSV +LOCATION 'foo.csv'; + +# Wrong partition order error + +statement ok +CREATE EXTERNAL TABLE partitioned (c1 int) +PARTITIONED BY (p1 string, p2 string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/bad_partitioning/'; + +query ITT +INSERT INTO partitioned VALUES (1, 'x', 'y'); +---- +1 + +query error DataFusion error: Error during planning: Inferred partitions to be \["p1", "p2"\], but got \["p2", "p1"\] +CREATE EXTERNAL TABLE wrong_order_partitioned (c1 int) +PARTITIONED BY (p2 string, p1 string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/bad_partitioning/'; + +statement error DataFusion error: Error during planning: Inferred partitions to be \["p1", "p2"\], but got \["p2"\] +CREATE EXTERNAL TABLE wrong_order_partitioned (c1 int) +PARTITIONED BY (p2 string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/bad_partitioning/'; + +# But allows partial partition selection + +statement ok +CREATE EXTERNAL TABLE partial_partitioned (c1 int) +PARTITIONED BY (p1 string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/bad_partitioning/'; + +query IT +SELECT * FROM partial_partitioned; +---- +1 x + +statement ok +CREATE EXTERNAL TABLE inner_partition (c1 int) +PARTITIONED BY (p2 string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/bad_partitioning/p1=x/'; + +query IT +SELECT * FROM inner_partition; +---- +1 y + +# Simulate manual creation of invalid (mixed) partitions on disk + +statement ok +CREATE EXTERNAL TABLE test(name string) +PARTITIONED BY (year string, month string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/manual_partitioning/'; + +statement ok +-- passes the partition check since the previous statement didn't write to disk +CREATE EXTERNAL TABLE test2(name string) +PARTITIONED BY (month string, year string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/manual_partitioning/'; + +query TTT +-- creates year -> month partitions +INSERT INTO test VALUES('name', '2024', '03'); +---- +1 + +query TTT +-- creates month -> year partitions. +-- now table have both partitions (year -> month and month -> year) +INSERT INTO test2 VALUES('name', '2024', '03'); +---- +1 + +statement error DataFusion error: Error during planning: Found mixed partition values on disk \[\["month", "year"\], \["year", "month"\]\] +-- fails to infer as partitions are not consistent +CREATE EXTERNAL TABLE test3(name string) +PARTITIONED BY (month string, year string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/manual_partitioning/';