From 5e28e9532978fb80527605ea5cb31fa9125d7949 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Wed, 19 Jun 2024 09:46:38 +0800 Subject: [PATCH] [SPARK-48649][SQL] Add "ignoreInvalidPartitionPaths" and "spark.sql.files.ignoreInvalidPartitionPaths" configs to allow ignoring invalid partition paths ### What changes were proposed in this pull request? This PR adds a new data source config `ignoreInvalidPartitionPaths` and SQL session configuration flag `spark.sql.files.ignoreInvalidPartitionPaths` to control the behaviour of skipping invalid partition paths (base paths). When the config is enabled, it allows skipping invalid paths such as: ``` table/ invalid/... part=1/... part=2/... part=3/... ``` In this case, `table/invalid` path will be ignored. Data source option takes precedence over the SQL config so with the code: ```scala spark.conf.set("spark.sql.files.ignoreInvalidPartitionPaths", "false") spark.read.format("parquet").option("ignoreInvalidPartitionPaths", "true").load(...) ``` the query would ignore invalid partitions, i.e. the flag will be enabled. The config is disabled by default. ### Why are the changes needed? Allows ignoring invalid partition paths that cannot be parsed. ### Does this PR introduce _any_ user-facing change? No. The added configs are disabled by default to have the exact same behaviour as before. ### How was this patch tested? I added a unit test for this. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47006 from sadikovi/SPARK-48649. Authored-by: Ivan Sadikov Signed-off-by: Wenchen Fan --- .../apache/spark/sql/internal/SQLConf.scala | 10 +++ .../datasources/FileIndexOptions.scala | 1 + .../PartitioningAwareFileIndex.scala | 10 ++- .../datasources/PartitioningUtils.scala | 10 +-- .../datasources/FileIndexSuite.scala | 65 ++++++++++++++++++- .../ParquetPartitionDiscoverySuite.scala | 23 ++++--- 6 files changed, 104 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 25a2441e05fed..fd804bc0e986d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1977,6 +1977,14 @@ object SQLConf { .booleanConf .createWithDefault(false) + val IGNORE_INVALID_PARTITION_PATHS = buildConf("spark.sql.files.ignoreInvalidPartitionPaths") + .doc("Whether to ignore invalid partition paths that do not match =. When " + + "the option is enabled, table with two partition directories 'table/invalid' and " + + "'table/col=1' will only load the latter directory and ignore the invalid partition") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + val MAX_RECORDS_PER_FILE = buildConf("spark.sql.files.maxRecordsPerFile") .doc("Maximum number of records to write out to a single file. " + "If this value is zero or negative, there is no limit.") @@ -5275,6 +5283,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def ignoreMissingFiles: Boolean = getConf(IGNORE_MISSING_FILES) + def ignoreInvalidPartitionPaths: Boolean = getConf(IGNORE_INVALID_PARTITION_PATHS) + def maxRecordsPerFile: Long = getConf(MAX_RECORDS_PER_FILE) def useCompression: Boolean = getConf(COMPRESS_CACHED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndexOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndexOptions.scala index 1c352e3748f21..5a300dae4daab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndexOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndexOptions.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils object FileIndexOptions extends DataSourceOptions { val IGNORE_MISSING_FILES = newOption(FileSourceOptions.IGNORE_MISSING_FILES) + val IGNORE_INVALID_PARTITION_PATHS = newOption("ignoreInvalidPartitionPaths") val TIME_ZONE = newOption(DateTimeUtils.TIMEZONE_OPTION) val RECURSIVE_FILE_LOOKUP = newOption("recursiveFileLookup") val BASE_PATH_PARAM = newOption("basePath") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index cc9f0d23bcb6b..07be3f89872cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -70,6 +70,13 @@ abstract class PartitioningAwareFileIndex( caseInsensitiveMap.getOrElse(FileIndexOptions.RECURSIVE_FILE_LOOKUP, "false").toBoolean } + protected lazy val ignoreInvalidPartitionPaths: Boolean = { + caseInsensitiveMap + .get(FileIndexOptions.IGNORE_INVALID_PARTITION_PATHS) + .map(_.toBoolean) + .getOrElse(sparkSession.sessionState.conf.ignoreInvalidPartitionPaths) + } + override def listFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { def isNonEmptyFile(f: FileStatus): Boolean = { @@ -162,7 +169,8 @@ abstract class PartitioningAwareFileIndex( userSpecifiedSchema = userSpecifiedSchema, caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis, validatePartitionColumns = sparkSession.sessionState.conf.validatePartitionColumns, - timeZoneId = timeZoneId) + timeZoneId = timeZoneId, + ignoreInvalidPartitionPaths = ignoreInvalidPartitionPaths) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 56cba0e0561d1..3b2d601b81fb5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -106,9 +106,10 @@ object PartitioningUtils extends SQLConfHelper { userSpecifiedSchema: Option[StructType], caseSensitive: Boolean, validatePartitionColumns: Boolean, - timeZoneId: String): PartitionSpec = { + timeZoneId: String, + ignoreInvalidPartitionPaths: Boolean): PartitionSpec = { parsePartitions(paths, typeInference, basePaths, userSpecifiedSchema, caseSensitive, - validatePartitionColumns, DateTimeUtils.getZoneId(timeZoneId)) + validatePartitionColumns, DateTimeUtils.getZoneId(timeZoneId), ignoreInvalidPartitionPaths) } private[datasources] def parsePartitions( @@ -118,7 +119,8 @@ object PartitioningUtils extends SQLConfHelper { userSpecifiedSchema: Option[StructType], caseSensitive: Boolean, validatePartitionColumns: Boolean, - zoneId: ZoneId): PartitionSpec = { + zoneId: ZoneId, + ignoreInvalidPartitionPaths: Boolean): PartitionSpec = { val userSpecifiedDataTypes = if (userSpecifiedSchema.isDefined) { val nameToDataType = userSpecifiedSchema.get.fields.map(f => f.name -> f.dataType).toMap if (!caseSensitive) { @@ -171,7 +173,7 @@ object PartitioningUtils extends SQLConfHelper { // TODO: Selective case sensitivity. val discoveredBasePaths = optDiscoveredBasePaths.flatten.map(_.toString.toLowerCase()) assert( - discoveredBasePaths.distinct.size == 1, + ignoreInvalidPartitionPaths || discoveredBasePaths.distinct.size == 1, "Conflicting directory structures detected. Suspicious paths:\b" + discoveredBasePaths.distinct.mkString("\n\t", "\n\t", "\n\n") + "If provided paths are partition directories, please set " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 110c330f16956..6399eb6da049f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -31,7 +31,7 @@ import org.mockito.Mockito.{mock, when} import org.apache.spark.{SparkException, SparkRuntimeException} import org.apache.spark.metrics.source.HiveCatalogMetrics -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} @@ -547,6 +547,66 @@ class FileIndexSuite extends SharedSparkSession { assert(fileIndex.leafFileStatuses.toSeq == statuses) } + test("SPARK-48649: Ignore invalid partitions") { + // Table: + // id part_col + // 1 1 + // 2 2 + val df = spark.range(1, 3, 1, 2).toDF("id") + .withColumn("part_col", col("id")) + + withTempPath { directoryPath => + df.write + .mode("overwrite") + .format("parquet") + .partitionBy("part_col") + .save(directoryPath.getCanonicalPath) + + // Rename one of the folders. + new File(directoryPath, "part_col=1").renameTo(new File(directoryPath, "undefined")) + + // By default, we expect the invalid path assertion to trigger. + val ex = intercept[AssertionError] { + spark.read + .format("parquet") + .load(directoryPath.getCanonicalPath) + .collect() + } + assert(ex.getMessage.contains("Conflicting directory structures detected")) + + // With the config enabled, we should only read the valid partition. + withSQLConf(SQLConf.IGNORE_INVALID_PARTITION_PATHS.key -> "true") { + assert( + spark.read + .format("parquet") + .load(directoryPath.getCanonicalPath) + .collect() === Seq(Row(2, 2))) + } + + // Data source option override takes precedence. + withSQLConf(SQLConf.IGNORE_INVALID_PARTITION_PATHS.key -> "true") { + val ex = intercept[AssertionError] { + spark.read + .format("parquet") + .option(FileIndexOptions.IGNORE_INVALID_PARTITION_PATHS, "false") + .load(directoryPath.getCanonicalPath) + .collect() + } + assert(ex.getMessage.contains("Conflicting directory structures detected")) + } + + // Data source option override takes precedence. + withSQLConf(SQLConf.IGNORE_INVALID_PARTITION_PATHS.key -> "false") { + assert( + spark.read + .format("parquet") + .option(FileIndexOptions.IGNORE_INVALID_PARTITION_PATHS, "true") + .load(directoryPath.getCanonicalPath) + .collect() === Seq(Row(2, 2))) + } + } + } + test("expire FileStatusCache if TTL is configured") { val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) try { @@ -585,9 +645,10 @@ class FileIndexSuite extends SharedSparkSession { } test("SPARK-40667: validate FileIndex Options") { - assert(FileIndexOptions.getAllOptions.size == 7) + assert(FileIndexOptions.getAllOptions.size == 8) // Please add validation on any new FileIndex options here assert(FileIndexOptions.isValidOption("ignoreMissingFiles")) + assert(FileIndexOptions.isValidOption("ignoreInvalidPartitionPaths")) assert(FileIndexOptions.isValidOption("timeZone")) assert(FileIndexOptions.isValidOption("recursiveFileLookup")) assert(FileIndexOptions.isValidOption("basePath")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 183c4f71df6c6..a6ad147c865d2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -112,7 +112,8 @@ abstract class ParquetPartitionDiscoverySuite "hdfs://host:9000/path/a=10.5/b=hello") var exception = intercept[AssertionError] { - parsePartitions(paths.map(new Path(_)), true, Set.empty[Path], None, true, true, timeZoneId) + parsePartitions( + paths.map(new Path(_)), true, Set.empty[Path], None, true, true, timeZoneId, false) } assert(exception.getMessage().contains("Conflicting directory structures detected")) @@ -129,7 +130,8 @@ abstract class ParquetPartitionDiscoverySuite None, true, true, - timeZoneId) + timeZoneId, + false) // Valid paths = Seq( @@ -145,7 +147,8 @@ abstract class ParquetPartitionDiscoverySuite None, true, true, - timeZoneId) + timeZoneId, + false) // Valid paths = Seq( @@ -161,7 +164,8 @@ abstract class ParquetPartitionDiscoverySuite None, true, true, - timeZoneId) + timeZoneId, + false) // Invalid paths = Seq( @@ -177,7 +181,8 @@ abstract class ParquetPartitionDiscoverySuite None, true, true, - timeZoneId) + timeZoneId, + false) } assert(exception.getMessage().contains("Conflicting directory structures detected")) @@ -200,7 +205,8 @@ abstract class ParquetPartitionDiscoverySuite None, true, true, - timeZoneId) + timeZoneId, + false) } assert(exception.getMessage().contains("Conflicting directory structures detected")) } @@ -296,7 +302,8 @@ abstract class ParquetPartitionDiscoverySuite None, true, true, - timeZoneId) + timeZoneId, + false) assert(actualSpec.partitionColumns === spec.partitionColumns) assert(actualSpec.partitions.length === spec.partitions.length) actualSpec.partitions.zip(spec.partitions).foreach { case (actual, expected) => @@ -427,7 +434,7 @@ abstract class ParquetPartitionDiscoverySuite def check(paths: Seq[String], spec: PartitionSpec): Unit = { val actualSpec = parsePartitions(paths.map(new Path(_)), false, Set.empty[Path], None, - true, true, timeZoneId) + true, true, timeZoneId, false) assert(actualSpec === spec) }