diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index cde5830c733e0..40e33f757d693 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1102,7 +1102,11 @@ root {% endhighlight %} Notice that the data types of the partitioning columns are automatically inferred. Currently, -numeric data types and string type are supported. +numeric data types and string type are supported. Sometimes users may not want to automatically +infer the data types of the partitioning columns. For these use cases, the automatic type inference +can be configured by `spark.sql.sources.partitionColumnTypeInference.enabled`, which is default to +`true`. When type inference is disabled, string type will be used for the partitioning columns. + ### Schema merging diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 77c6af27d1007..c778889045d02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -71,6 +71,9 @@ private[spark] object SQLConf { // Whether to perform partition discovery when loading external data sources. Default to true. val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled" + // Whether to perform partition column type inference. Default to true. + val PARTITION_COLUMN_TYPE_INFERENCE = "spark.sql.sources.partitionColumnTypeInference.enabled" + // The output committer class used by FSBasedRelation. The specified class needs to be a // subclass of org.apache.hadoop.mapreduce.OutputCommitter. val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass" @@ -250,6 +253,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def partitionDiscoveryEnabled() = getConf(SQLConf.PARTITION_DISCOVERY_ENABLED, "true").toBoolean + private[spark] def partitionColumnTypeInferenceEnabled() = + getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE, "true").toBoolean + // Do not use a value larger than 4000 as the default value of this property. // See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information. private[spark] def schemaStringLengthThreshold: Int = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala index c4c99de5a38dc..9f6ec2ed8fc8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala @@ -72,10 +72,11 @@ private[sql] object PartitioningUtils { */ private[sql] def parsePartitions( paths: Seq[Path], - defaultPartitionName: String): PartitionSpec = { + defaultPartitionName: String, + typeInference: Boolean): PartitionSpec = { // First, we need to parse every partition's path and see if we can find partition values. val pathsWithPartitionValues = paths.flatMap { path => - parsePartition(path, defaultPartitionName).map(path -> _) + parsePartition(path, defaultPartitionName, typeInference).map(path -> _) } if (pathsWithPartitionValues.isEmpty) { @@ -124,7 +125,8 @@ private[sql] object PartitioningUtils { */ private[sql] def parsePartition( path: Path, - defaultPartitionName: String): Option[PartitionValues] = { + defaultPartitionName: String, + typeInference: Boolean): Option[PartitionValues] = { val columns = ArrayBuffer.empty[(String, Literal)] // Old Hadoop versions don't have `Path.isRoot` var finished = path.getParent == null @@ -137,7 +139,7 @@ private[sql] object PartitioningUtils { return None } - val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName) + val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference) maybeColumn.foreach(columns += _) chopped = chopped.getParent finished = maybeColumn.isEmpty || chopped.getParent == null @@ -153,7 +155,8 @@ private[sql] object PartitioningUtils { private def parsePartitionColumn( columnSpec: String, - defaultPartitionName: String): Option[(String, Literal)] = { + defaultPartitionName: String, + typeInference: Boolean): Option[(String, Literal)] = { val equalSignIndex = columnSpec.indexOf('=') if (equalSignIndex == -1) { None @@ -164,7 +167,7 @@ private[sql] object PartitioningUtils { val rawColumnValue = columnSpec.drop(equalSignIndex + 1) assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'") - val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName) + val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName, typeInference) Some(columnName -> literal) } } @@ -211,19 +214,28 @@ private[sql] object PartitioningUtils { */ private[sql] def inferPartitionColumnValue( raw: String, - defaultPartitionName: String): Literal = { - // First tries integral types - Try(Literal.create(Integer.parseInt(raw), IntegerType)) - .orElse(Try(Literal.create(JLong.parseLong(raw), LongType))) - // Then falls back to fractional types - .orElse(Try(Literal.create(JFloat.parseFloat(raw), FloatType))) - .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType))) - .orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited))) - // Then falls back to string - .getOrElse { - if (raw == defaultPartitionName) Literal.create(null, NullType) - else Literal.create(unescapePathName(raw), StringType) + defaultPartitionName: String, + typeInference: Boolean): Literal = { + if (typeInference) { + // First tries integral types + Try(Literal.create(Integer.parseInt(raw), IntegerType)) + .orElse(Try(Literal.create(JLong.parseLong(raw), LongType))) + // Then falls back to fractional types + .orElse(Try(Literal.create(JFloat.parseFloat(raw), FloatType))) + .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType))) + .orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited))) + // Then falls back to string + .getOrElse { + if (raw == defaultPartitionName) Literal.create(null, NullType) + else Literal.create(unescapePathName(raw), StringType) + } + } else { + if (raw == defaultPartitionName) { + Literal.create(null, NullType) + } else { + Literal.create(unescapePathName(raw), StringType) } + } } private val upCastingOrder: Seq[DataType] = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 25887ba9a15b0..d1547fb1e4abb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -491,9 +491,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio } private def discoverPartitions(): PartitionSpec = { + val typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled() // We use leaf dirs containing data files to discover the schema. val leafDirs = fileStatusCache.leafDirToChildrenFiles.keys.toSeq - PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME) + PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME, + typeInference) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index d9a010a9815a1..c2f1cc8ffd1fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -48,7 +48,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { test("column type inference") { def check(raw: String, literal: Literal): Unit = { - assert(inferPartitionColumnValue(raw, defaultPartitionName) === literal) + assert(inferPartitionColumnValue(raw, defaultPartitionName, true) === literal) } check("10", Literal.create(10, IntegerType)) @@ -60,12 +60,12 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { test("parse partition") { def check(path: String, expected: Option[PartitionValues]): Unit = { - assert(expected === parsePartition(new Path(path), defaultPartitionName)) + assert(expected === parsePartition(new Path(path), defaultPartitionName, true)) } def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = { val message = intercept[T] { - parsePartition(new Path(path), defaultPartitionName).get + parsePartition(new Path(path), defaultPartitionName, true).get }.getMessage assert(message.contains(expected)) @@ -105,7 +105,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { test("parse partitions") { def check(paths: Seq[String], spec: PartitionSpec): Unit = { - assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName) === spec) + assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) === spec) } check(Seq( @@ -174,6 +174,77 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { PartitionSpec.emptySpec) } + test("parse partitions with type inference disabled") { + def check(paths: Seq[String], spec: PartitionSpec): Unit = { + assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, false) === spec) + } + + check(Seq( + "hdfs://host:9000/path/a=10/b=hello"), + PartitionSpec( + StructType(Seq( + StructField("a", StringType), + StructField("b", StringType))), + Seq(Partition(Row("10", "hello"), "hdfs://host:9000/path/a=10/b=hello")))) + + check(Seq( + "hdfs://host:9000/path/a=10/b=20", + "hdfs://host:9000/path/a=10.5/b=hello"), + PartitionSpec( + StructType(Seq( + StructField("a", StringType), + StructField("b", StringType))), + Seq( + Partition(Row("10", "20"), "hdfs://host:9000/path/a=10/b=20"), + Partition(Row("10.5", "hello"), "hdfs://host:9000/path/a=10.5/b=hello")))) + + check(Seq( + "hdfs://host:9000/path/_temporary", + "hdfs://host:9000/path/a=10/b=20", + "hdfs://host:9000/path/a=10.5/b=hello", + "hdfs://host:9000/path/a=10.5/_temporary", + "hdfs://host:9000/path/a=10.5/_TeMpOrArY", + "hdfs://host:9000/path/a=10.5/b=hello/_temporary", + "hdfs://host:9000/path/a=10.5/b=hello/_TEMPORARY", + "hdfs://host:9000/path/_temporary/path", + "hdfs://host:9000/path/a=11/_temporary/path", + "hdfs://host:9000/path/a=10.5/b=world/_temporary/path"), + PartitionSpec( + StructType(Seq( + StructField("a", StringType), + StructField("b", StringType))), + Seq( + Partition(Row("10", "20"), "hdfs://host:9000/path/a=10/b=20"), + Partition(Row("10.5", "hello"), "hdfs://host:9000/path/a=10.5/b=hello")))) + + check(Seq( + s"hdfs://host:9000/path/a=10/b=20", + s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"), + PartitionSpec( + StructType(Seq( + StructField("a", StringType), + StructField("b", StringType))), + Seq( + Partition(Row("10", "20"), s"hdfs://host:9000/path/a=10/b=20"), + Partition(Row(null, "hello"), s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello")))) + + check(Seq( + s"hdfs://host:9000/path/a=10/b=$defaultPartitionName", + s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"), + PartitionSpec( + StructType(Seq( + StructField("a", StringType), + StructField("b", StringType))), + Seq( + Partition(Row("10", null), s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"), + Partition(Row("10.5", null), s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName")))) + + check(Seq( + s"hdfs://host:9000/path1", + s"hdfs://host:9000/path2"), + PartitionSpec.emptySpec) + } + test("read partitioned table - normal case") { withTempDir { base => for {