Skip to content

Commit

Permalink
[SPARK-48649][SQL] Add "ignoreInvalidPartitionPaths" and "spark.sql.f…
Browse files Browse the repository at this point in the history
…iles.ignoreInvalidPartitionPaths" configs to allow ignoring invalid partition paths

### What changes were proposed in this pull request?

This PR adds a new data source config `ignoreInvalidPartitionPaths` and SQL session configuration flag `spark.sql.files.ignoreInvalidPartitionPaths` to control the behaviour of skipping invalid partition paths (base paths).

When the config is enabled, it allows skipping invalid paths such as:
```
table/
  invalid/...
  part=1/...
  part=2/...
  part=3/...
```
In this case, `table/invalid` path will be ignored.

Data source option takes precedence over the SQL config so with the code:
```scala
spark.conf.set("spark.sql.files.ignoreInvalidPartitionPaths", "false")

spark.read.format("parquet").option("ignoreInvalidPartitionPaths", "true").load(...)
```

the query would ignore invalid partitions, i.e. the flag will be enabled.

The config is disabled by default.

### Why are the changes needed?

Allows ignoring invalid partition paths that cannot be parsed.

### Does this PR introduce _any_ user-facing change?

No. The added configs are disabled by default to have the exact same behaviour as before.

### How was this patch tested?

I added a unit test for this.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47006 from sadikovi/SPARK-48649.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
sadikovi authored and cloud-fan committed Jun 19, 2024
1 parent 6ee7c25 commit 5e28e95
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1977,6 +1977,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val IGNORE_INVALID_PARTITION_PATHS = buildConf("spark.sql.files.ignoreInvalidPartitionPaths")
.doc("Whether to ignore invalid partition paths that do not match <column>=<value>. When " +
"the option is enabled, table with two partition directories 'table/invalid' and " +
"'table/col=1' will only load the latter directory and ignore the invalid partition")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

val MAX_RECORDS_PER_FILE = buildConf("spark.sql.files.maxRecordsPerFile")
.doc("Maximum number of records to write out to a single file. " +
"If this value is zero or negative, there is no limit.")
Expand Down Expand Up @@ -5275,6 +5283,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def ignoreMissingFiles: Boolean = getConf(IGNORE_MISSING_FILES)

def ignoreInvalidPartitionPaths: Boolean = getConf(IGNORE_INVALID_PARTITION_PATHS)

def maxRecordsPerFile: Long = getConf(MAX_RECORDS_PER_FILE)

def useCompression: Boolean = getConf(COMPRESS_CACHED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils

object FileIndexOptions extends DataSourceOptions {
val IGNORE_MISSING_FILES = newOption(FileSourceOptions.IGNORE_MISSING_FILES)
val IGNORE_INVALID_PARTITION_PATHS = newOption("ignoreInvalidPartitionPaths")
val TIME_ZONE = newOption(DateTimeUtils.TIMEZONE_OPTION)
val RECURSIVE_FILE_LOOKUP = newOption("recursiveFileLookup")
val BASE_PATH_PARAM = newOption("basePath")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ abstract class PartitioningAwareFileIndex(
caseInsensitiveMap.getOrElse(FileIndexOptions.RECURSIVE_FILE_LOOKUP, "false").toBoolean
}

protected lazy val ignoreInvalidPartitionPaths: Boolean = {
caseInsensitiveMap
.get(FileIndexOptions.IGNORE_INVALID_PARTITION_PATHS)
.map(_.toBoolean)
.getOrElse(sparkSession.sessionState.conf.ignoreInvalidPartitionPaths)
}

override def listFiles(
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
def isNonEmptyFile(f: FileStatus): Boolean = {
Expand Down Expand Up @@ -162,7 +169,8 @@ abstract class PartitioningAwareFileIndex(
userSpecifiedSchema = userSpecifiedSchema,
caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis,
validatePartitionColumns = sparkSession.sessionState.conf.validatePartitionColumns,
timeZoneId = timeZoneId)
timeZoneId = timeZoneId,
ignoreInvalidPartitionPaths = ignoreInvalidPartitionPaths)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ object PartitioningUtils extends SQLConfHelper {
userSpecifiedSchema: Option[StructType],
caseSensitive: Boolean,
validatePartitionColumns: Boolean,
timeZoneId: String): PartitionSpec = {
timeZoneId: String,
ignoreInvalidPartitionPaths: Boolean): PartitionSpec = {
parsePartitions(paths, typeInference, basePaths, userSpecifiedSchema, caseSensitive,
validatePartitionColumns, DateTimeUtils.getZoneId(timeZoneId))
validatePartitionColumns, DateTimeUtils.getZoneId(timeZoneId), ignoreInvalidPartitionPaths)
}

private[datasources] def parsePartitions(
Expand All @@ -118,7 +119,8 @@ object PartitioningUtils extends SQLConfHelper {
userSpecifiedSchema: Option[StructType],
caseSensitive: Boolean,
validatePartitionColumns: Boolean,
zoneId: ZoneId): PartitionSpec = {
zoneId: ZoneId,
ignoreInvalidPartitionPaths: Boolean): PartitionSpec = {
val userSpecifiedDataTypes = if (userSpecifiedSchema.isDefined) {
val nameToDataType = userSpecifiedSchema.get.fields.map(f => f.name -> f.dataType).toMap
if (!caseSensitive) {
Expand Down Expand Up @@ -171,7 +173,7 @@ object PartitioningUtils extends SQLConfHelper {
// TODO: Selective case sensitivity.
val discoveredBasePaths = optDiscoveredBasePaths.flatten.map(_.toString.toLowerCase())
assert(
discoveredBasePaths.distinct.size == 1,
ignoreInvalidPartitionPaths || discoveredBasePaths.distinct.size == 1,
"Conflicting directory structures detected. Suspicious paths:\b" +
discoveredBasePaths.distinct.mkString("\n\t", "\n\t", "\n\n") +
"If provided paths are partition directories, please set " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.mockito.Mockito.{mock, when}

import org.apache.spark.{SparkException, SparkRuntimeException}
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
Expand Down Expand Up @@ -547,6 +547,66 @@ class FileIndexSuite extends SharedSparkSession {
assert(fileIndex.leafFileStatuses.toSeq == statuses)
}

test("SPARK-48649: Ignore invalid partitions") {
// Table:
// id part_col
// 1 1
// 2 2
val df = spark.range(1, 3, 1, 2).toDF("id")
.withColumn("part_col", col("id"))

withTempPath { directoryPath =>
df.write
.mode("overwrite")
.format("parquet")
.partitionBy("part_col")
.save(directoryPath.getCanonicalPath)

// Rename one of the folders.
new File(directoryPath, "part_col=1").renameTo(new File(directoryPath, "undefined"))

// By default, we expect the invalid path assertion to trigger.
val ex = intercept[AssertionError] {
spark.read
.format("parquet")
.load(directoryPath.getCanonicalPath)
.collect()
}
assert(ex.getMessage.contains("Conflicting directory structures detected"))

// With the config enabled, we should only read the valid partition.
withSQLConf(SQLConf.IGNORE_INVALID_PARTITION_PATHS.key -> "true") {
assert(
spark.read
.format("parquet")
.load(directoryPath.getCanonicalPath)
.collect() === Seq(Row(2, 2)))
}

// Data source option override takes precedence.
withSQLConf(SQLConf.IGNORE_INVALID_PARTITION_PATHS.key -> "true") {
val ex = intercept[AssertionError] {
spark.read
.format("parquet")
.option(FileIndexOptions.IGNORE_INVALID_PARTITION_PATHS, "false")
.load(directoryPath.getCanonicalPath)
.collect()
}
assert(ex.getMessage.contains("Conflicting directory structures detected"))
}

// Data source option override takes precedence.
withSQLConf(SQLConf.IGNORE_INVALID_PARTITION_PATHS.key -> "false") {
assert(
spark.read
.format("parquet")
.option(FileIndexOptions.IGNORE_INVALID_PARTITION_PATHS, "true")
.load(directoryPath.getCanonicalPath)
.collect() === Seq(Row(2, 2)))
}
}
}

test("expire FileStatusCache if TTL is configured") {
val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS)
try {
Expand Down Expand Up @@ -585,9 +645,10 @@ class FileIndexSuite extends SharedSparkSession {
}

test("SPARK-40667: validate FileIndex Options") {
assert(FileIndexOptions.getAllOptions.size == 7)
assert(FileIndexOptions.getAllOptions.size == 8)
// Please add validation on any new FileIndex options here
assert(FileIndexOptions.isValidOption("ignoreMissingFiles"))
assert(FileIndexOptions.isValidOption("ignoreInvalidPartitionPaths"))
assert(FileIndexOptions.isValidOption("timeZone"))
assert(FileIndexOptions.isValidOption("recursiveFileLookup"))
assert(FileIndexOptions.isValidOption("basePath"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ abstract class ParquetPartitionDiscoverySuite
"hdfs://host:9000/path/a=10.5/b=hello")

var exception = intercept[AssertionError] {
parsePartitions(paths.map(new Path(_)), true, Set.empty[Path], None, true, true, timeZoneId)
parsePartitions(
paths.map(new Path(_)), true, Set.empty[Path], None, true, true, timeZoneId, false)
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))

Expand All @@ -129,7 +130,8 @@ abstract class ParquetPartitionDiscoverySuite
None,
true,
true,
timeZoneId)
timeZoneId,
false)

// Valid
paths = Seq(
Expand All @@ -145,7 +147,8 @@ abstract class ParquetPartitionDiscoverySuite
None,
true,
true,
timeZoneId)
timeZoneId,
false)

// Valid
paths = Seq(
Expand All @@ -161,7 +164,8 @@ abstract class ParquetPartitionDiscoverySuite
None,
true,
true,
timeZoneId)
timeZoneId,
false)

// Invalid
paths = Seq(
Expand All @@ -177,7 +181,8 @@ abstract class ParquetPartitionDiscoverySuite
None,
true,
true,
timeZoneId)
timeZoneId,
false)
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))

Expand All @@ -200,7 +205,8 @@ abstract class ParquetPartitionDiscoverySuite
None,
true,
true,
timeZoneId)
timeZoneId,
false)
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
}
Expand Down Expand Up @@ -296,7 +302,8 @@ abstract class ParquetPartitionDiscoverySuite
None,
true,
true,
timeZoneId)
timeZoneId,
false)
assert(actualSpec.partitionColumns === spec.partitionColumns)
assert(actualSpec.partitions.length === spec.partitions.length)
actualSpec.partitions.zip(spec.partitions).foreach { case (actual, expected) =>
Expand Down Expand Up @@ -427,7 +434,7 @@ abstract class ParquetPartitionDiscoverySuite
def check(paths: Seq[String], spec: PartitionSpec): Unit = {
val actualSpec =
parsePartitions(paths.map(new Path(_)), false, Set.empty[Path], None,
true, true, timeZoneId)
true, true, timeZoneId, false)
assert(actualSpec === spec)
}

Expand Down

0 comments on commit 5e28e95

Please sign in to comment.