Skip to content

Commit 9d15ced

Browse files
gengliangwangJackey Lee
authored andcommitted
[SPARK-26263][SQL] Validate partition values with user provided schema
## What changes were proposed in this pull request? Currently if user provides data schema, partition column values are converted as per it. But if the conversion failed, e.g. converting string to int, the column value is null. This PR proposes to throw exception in such case, instead of converting into null value silently: 1. These null partition column values doesn't make sense to users in most cases. It is better to show the conversion failure, and then users can adjust the schema or ETL jobs to fix it. 2. There are always exceptions on such conversion failure for non-partition data columns. Partition columns should have the same behavior. We can reproduce the case above as following: ``` /tmp/testDir ├── p=bar └── p=foo ``` If we run: ``` val schema = StructType(Seq(StructField("p", IntegerType, false))) spark.read.schema(schema).csv("/tmp/testDir/").show() ``` We will get: ``` +----+ | p| +----+ |null| |null| +----+ ``` ## How was this patch tested? Unit test Closes apache#23215 from gengliangwang/SPARK-26263. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent e75f43b commit 9d15ced

File tree

6 files changed

+77
-16
lines changed

6 files changed

+77
-16
lines changed

docs/sql-migration-guide-upgrade.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ displayTitle: Spark SQL Upgrading Guide
2929

3030
- In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (e.g. Parquet), the behavior will be udefined.
3131

32+
- In Spark version 2.4 and earlier, partition column value is converted as null if it can't be casted to corresponding user provided schema. Since 3.0, partition column value is validated with user provided schema. An exception is thrown if the validation fails. You can disable such validation by setting `spark.sql.sources.validatePartitionColumns` to `false`.
33+
3234
- In Spark version 2.4 and earlier, the `SET` command works without any warnings even if the specified key is for `SparkConf` entries and it has no effect because the command does not update `SparkConf`, but the behavior might confuse users. Since 3.0, the command fails if a `SparkConf` key is used. You can disable such a check by setting `spark.sql.legacy.execution.setCommandRejectsSparkConfs` to `false`.
3335

3436
- Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0.

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1396,6 +1396,16 @@ object SQLConf {
13961396
.booleanConf
13971397
.createWithDefault(false)
13981398

1399+
val VALIDATE_PARTITION_COLUMNS =
1400+
buildConf("spark.sql.sources.validatePartitionColumns")
1401+
.internal()
1402+
.doc("When this option is set to true, partition column values will be validated with " +
1403+
"user-specified schema. If the validation fails, a runtime exception is thrown." +
1404+
"When this option is set to false, the partition column value will be converted to null " +
1405+
"if it can not be casted to corresponding user-specified schema.")
1406+
.booleanConf
1407+
.createWithDefault(true)
1408+
13991409
val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
14001410
buildConf("spark.sql.streaming.continuous.executorQueueSize")
14011411
.internal()
@@ -2014,6 +2024,8 @@ class SQLConf extends Serializable with Logging {
20142024
def allowCreatingManagedTableUsingNonemptyLocation: Boolean =
20152025
getConf(ALLOW_CREATING_MANAGED_TABLE_USING_NONEMPTY_LOCATION)
20162026

2027+
def validatePartitionColumns: Boolean = getConf(VALIDATE_PARTITION_COLUMNS)
2028+
20172029
def partitionOverwriteMode: PartitionOverwriteMode.Value =
20182030
PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))
20192031

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,13 @@ abstract class PartitioningAwareFileIndex(
127127
val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
128128
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
129129

130-
val caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis
131130
PartitioningUtils.parsePartitions(
132131
leafDirs,
133132
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
134133
basePaths = basePaths,
135134
userSpecifiedSchema = userSpecifiedSchema,
136-
caseSensitive = caseSensitive,
135+
caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis,
136+
validatePartitionColumns = sparkSession.sqlContext.conf.validatePartitionColumns,
137137
timeZoneId = timeZoneId)
138138
}
139139

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@ import scala.util.Try
2626

2727
import org.apache.hadoop.fs.Path
2828

29-
import org.apache.spark.sql.AnalysisException
29+
import org.apache.spark.sql.{AnalysisException, SparkSession}
3030
import org.apache.spark.sql.catalyst.InternalRow
3131
import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion}
3232
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3333
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
3434
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
35+
import org.apache.spark.sql.internal.SQLConf
3536
import org.apache.spark.sql.types._
3637
import org.apache.spark.sql.util.SchemaUtils
3738

@@ -96,9 +97,10 @@ object PartitioningUtils {
9697
basePaths: Set[Path],
9798
userSpecifiedSchema: Option[StructType],
9899
caseSensitive: Boolean,
100+
validatePartitionColumns: Boolean,
99101
timeZoneId: String): PartitionSpec = {
100-
parsePartitions(paths, typeInference, basePaths, userSpecifiedSchema,
101-
caseSensitive, DateTimeUtils.getTimeZone(timeZoneId))
102+
parsePartitions(paths, typeInference, basePaths, userSpecifiedSchema, caseSensitive,
103+
validatePartitionColumns, DateTimeUtils.getTimeZone(timeZoneId))
102104
}
103105

104106
private[datasources] def parsePartitions(
@@ -107,6 +109,7 @@ object PartitioningUtils {
107109
basePaths: Set[Path],
108110
userSpecifiedSchema: Option[StructType],
109111
caseSensitive: Boolean,
112+
validatePartitionColumns: Boolean,
110113
timeZone: TimeZone): PartitionSpec = {
111114
val userSpecifiedDataTypes = if (userSpecifiedSchema.isDefined) {
112115
val nameToDataType = userSpecifiedSchema.get.fields.map(f => f.name -> f.dataType).toMap
@@ -121,7 +124,8 @@ object PartitioningUtils {
121124

122125
// First, we need to parse every partition's path and see if we can find partition values.
123126
val (partitionValues, optDiscoveredBasePaths) = paths.map { path =>
124-
parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, timeZone)
127+
parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes,
128+
validatePartitionColumns, timeZone)
125129
}.unzip
126130

127131
// We create pairs of (path -> path's partition value) here
@@ -203,6 +207,7 @@ object PartitioningUtils {
203207
typeInference: Boolean,
204208
basePaths: Set[Path],
205209
userSpecifiedDataTypes: Map[String, DataType],
210+
validatePartitionColumns: Boolean,
206211
timeZone: TimeZone): (Option[PartitionValues], Option[Path]) = {
207212
val columns = ArrayBuffer.empty[(String, Literal)]
208213
// Old Hadoop versions don't have `Path.isRoot`
@@ -224,7 +229,8 @@ object PartitioningUtils {
224229
// Let's say currentPath is a path of "/table/a=1/", currentPath.getName will give us a=1.
225230
// Once we get the string, we try to parse it and find the partition column and value.
226231
val maybeColumn =
227-
parsePartitionColumn(currentPath.getName, typeInference, userSpecifiedDataTypes, timeZone)
232+
parsePartitionColumn(currentPath.getName, typeInference, userSpecifiedDataTypes,
233+
validatePartitionColumns, timeZone)
228234
maybeColumn.foreach(columns += _)
229235

230236
// Now, we determine if we should stop.
@@ -258,6 +264,7 @@ object PartitioningUtils {
258264
columnSpec: String,
259265
typeInference: Boolean,
260266
userSpecifiedDataTypes: Map[String, DataType],
267+
validatePartitionColumns: Boolean,
261268
timeZone: TimeZone): Option[(String, Literal)] = {
262269
val equalSignIndex = columnSpec.indexOf('=')
263270
if (equalSignIndex == -1) {
@@ -272,10 +279,15 @@ object PartitioningUtils {
272279
val literal = if (userSpecifiedDataTypes.contains(columnName)) {
273280
// SPARK-26188: if user provides corresponding column schema, get the column value without
274281
// inference, and then cast it as user specified data type.
275-
val columnValue = inferPartitionColumnValue(rawColumnValue, false, timeZone)
276-
val castedValue =
277-
Cast(columnValue, userSpecifiedDataTypes(columnName), Option(timeZone.getID)).eval()
278-
Literal.create(castedValue, userSpecifiedDataTypes(columnName))
282+
val dataType = userSpecifiedDataTypes(columnName)
283+
val columnValueLiteral = inferPartitionColumnValue(rawColumnValue, false, timeZone)
284+
val columnValue = columnValueLiteral.eval()
285+
val castedValue = Cast(columnValueLiteral, dataType, Option(timeZone.getID)).eval()
286+
if (validatePartitionColumns && columnValue != null && castedValue == null) {
287+
throw new RuntimeException(s"Failed to cast value `$columnValue` to `$dataType` " +
288+
s"for partition column `$columnName`")
289+
}
290+
Literal.create(castedValue, dataType)
279291
} else {
280292
inferPartitionColumnValue(rawColumnValue, typeInference, timeZone)
281293
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.util._
3030
import org.apache.spark.sql.functions.col
3131
import org.apache.spark.sql.internal.SQLConf
3232
import org.apache.spark.sql.test.SharedSQLContext
33-
import org.apache.spark.sql.types.{StringType, StructField, StructType}
33+
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
3434
import org.apache.spark.util.{KnownSizeEstimation, SizeEstimator}
3535

3636
class FileIndexSuite extends SharedSQLContext {
@@ -95,6 +95,31 @@ class FileIndexSuite extends SharedSQLContext {
9595
}
9696
}
9797

98+
test("SPARK-26263: Throw exception when partition value can't be casted to user-specified type") {
99+
withTempDir { dir =>
100+
val partitionDirectory = new File(dir, "a=foo")
101+
partitionDirectory.mkdir()
102+
val file = new File(partitionDirectory, "text.txt")
103+
stringToFile(file, "text")
104+
val path = new Path(dir.getCanonicalPath)
105+
val schema = StructType(Seq(StructField("a", IntegerType, false)))
106+
withSQLConf(SQLConf.VALIDATE_PARTITION_COLUMNS.key -> "true") {
107+
val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema))
108+
val msg = intercept[RuntimeException] {
109+
fileIndex.partitionSpec()
110+
}.getMessage
111+
assert(msg == "Failed to cast value `foo` to `IntegerType` for partition column `a`")
112+
}
113+
114+
withSQLConf(SQLConf.VALIDATE_PARTITION_COLUMNS.key -> "false") {
115+
val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema))
116+
val partitionValues = fileIndex.partitionSpec().partitions.map(_.values)
117+
assert(partitionValues.length == 1 && partitionValues(0).numFields == 1 &&
118+
partitionValues(0).isNullAt(0))
119+
}
120+
}
121+
}
122+
98123
test("InMemoryFileIndex: input paths are converted to qualified paths") {
99124
withTempDir { dir =>
100125
val file = new File(dir, "text.txt")

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
101101
"hdfs://host:9000/path/a=10.5/b=hello")
102102

103103
var exception = intercept[AssertionError] {
104-
parsePartitions(paths.map(new Path(_)), true, Set.empty[Path], None, true, timeZoneId)
104+
parsePartitions(paths.map(new Path(_)), true, Set.empty[Path], None, true, true, timeZoneId)
105105
}
106106
assert(exception.getMessage().contains("Conflicting directory structures detected"))
107107

@@ -117,6 +117,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
117117
Set(new Path("hdfs://host:9000/path/")),
118118
None,
119119
true,
120+
true,
120121
timeZoneId)
121122

122123
// Valid
@@ -132,6 +133,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
132133
Set(new Path("hdfs://host:9000/path/something=true/table")),
133134
None,
134135
true,
136+
true,
135137
timeZoneId)
136138

137139
// Valid
@@ -147,6 +149,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
147149
Set(new Path("hdfs://host:9000/path/table=true")),
148150
None,
149151
true,
152+
true,
150153
timeZoneId)
151154

152155
// Invalid
@@ -162,6 +165,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
162165
Set(new Path("hdfs://host:9000/path/")),
163166
None,
164167
true,
168+
true,
165169
timeZoneId)
166170
}
167171
assert(exception.getMessage().contains("Conflicting directory structures detected"))
@@ -184,20 +188,22 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
184188
Set(new Path("hdfs://host:9000/tmp/tables/")),
185189
None,
186190
true,
191+
true,
187192
timeZoneId)
188193
}
189194
assert(exception.getMessage().contains("Conflicting directory structures detected"))
190195
}
191196

192197
test("parse partition") {
193198
def check(path: String, expected: Option[PartitionValues]): Unit = {
194-
val actual = parsePartition(new Path(path), true, Set.empty[Path], Map.empty, timeZone)._1
199+
val actual = parsePartition(new Path(path), true, Set.empty[Path],
200+
Map.empty, true, timeZone)._1
195201
assert(expected === actual)
196202
}
197203

198204
def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = {
199205
val message = intercept[T] {
200-
parsePartition(new Path(path), true, Set.empty[Path], Map.empty, timeZone)
206+
parsePartition(new Path(path), true, Set.empty[Path], Map.empty, true, timeZone)
201207
}.getMessage
202208

203209
assert(message.contains(expected))
@@ -242,6 +248,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
242248
typeInference = true,
243249
basePaths = Set(new Path("file://path/a=10")),
244250
Map.empty,
251+
true,
245252
timeZone = timeZone)._1
246253

247254
assert(partitionSpec1.isEmpty)
@@ -252,6 +259,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
252259
typeInference = true,
253260
basePaths = Set(new Path("file://path")),
254261
Map.empty,
262+
true,
255263
timeZone = timeZone)._1
256264

257265
assert(partitionSpec2 ==
@@ -272,6 +280,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
272280
rootPaths,
273281
None,
274282
true,
283+
true,
275284
timeZoneId)
276285
assert(actualSpec.partitionColumns === spec.partitionColumns)
277286
assert(actualSpec.partitions.length === spec.partitions.length)
@@ -384,7 +393,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
384393
test("parse partitions with type inference disabled") {
385394
def check(paths: Seq[String], spec: PartitionSpec): Unit = {
386395
val actualSpec =
387-
parsePartitions(paths.map(new Path(_)), false, Set.empty[Path], None, true, timeZoneId)
396+
parsePartitions(paths.map(new Path(_)), false, Set.empty[Path], None,
397+
true, true, timeZoneId)
388398
assert(actualSpec === spec)
389399
}
390400

0 commit comments

Comments
 (0)