Skip to content

Commit

Permalink
[SPARK-7939] [SQL] Add conf to enable/disable partition column type i…
Browse files Browse the repository at this point in the history
…nference

JIRA: https://issues.apache.org/jira/browse/SPARK-7939

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes apache#6503 from viirya/disable_partition_type_inference and squashes the following commits:

3e90470 [Liang-Chi Hsieh] Default to enable type inference and update docs.
455edb1 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into disable_partition_type_inference
9a57933 [Liang-Chi Hsieh] Add conf to enable/disable partition column type inference.
  • Loading branch information
viirya authored and liancheng committed Jun 8, 2015
1 parent eacd4a9 commit 03ef6be
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 24 deletions.
6 changes: 5 additions & 1 deletion docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1102,7 +1102,11 @@ root
{% endhighlight %}

Notice that the data types of the partitioning columns are automatically inferred. Currently,
numeric data types and string type are supported.
numeric data types and string type are supported. Sometimes users may not want to automatically
infer the data types of the partitioning columns. For these use cases, the automatic type inference
can be configured by `spark.sql.sources.partitionColumnTypeInference.enabled`, which is default to
`true`. When type inference is disabled, string type will be used for the partitioning columns.


### Schema merging

Expand Down
6 changes: 6 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ private[spark] object SQLConf {
// Whether to perform partition discovery when loading external data sources. Default to true.
val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled"

// Whether to perform partition column type inference. Default to true.
val PARTITION_COLUMN_TYPE_INFERENCE = "spark.sql.sources.partitionColumnTypeInference.enabled"

// The output committer class used by FSBasedRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"
Expand Down Expand Up @@ -250,6 +253,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def partitionDiscoveryEnabled() =
getConf(SQLConf.PARTITION_DISCOVERY_ENABLED, "true").toBoolean

private[spark] def partitionColumnTypeInferenceEnabled() =
getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE, "true").toBoolean

// Do not use a value larger than 4000 as the default value of this property.
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
private[spark] def schemaStringLengthThreshold: Int =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ private[sql] object PartitioningUtils {
*/
private[sql] def parsePartitions(
paths: Seq[Path],
defaultPartitionName: String): PartitionSpec = {
defaultPartitionName: String,
typeInference: Boolean): PartitionSpec = {
// First, we need to parse every partition's path and see if we can find partition values.
val pathsWithPartitionValues = paths.flatMap { path =>
parsePartition(path, defaultPartitionName).map(path -> _)
parsePartition(path, defaultPartitionName, typeInference).map(path -> _)
}

if (pathsWithPartitionValues.isEmpty) {
Expand Down Expand Up @@ -124,7 +125,8 @@ private[sql] object PartitioningUtils {
*/
private[sql] def parsePartition(
path: Path,
defaultPartitionName: String): Option[PartitionValues] = {
defaultPartitionName: String,
typeInference: Boolean): Option[PartitionValues] = {
val columns = ArrayBuffer.empty[(String, Literal)]
// Old Hadoop versions don't have `Path.isRoot`
var finished = path.getParent == null
Expand All @@ -137,7 +139,7 @@ private[sql] object PartitioningUtils {
return None
}

val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName)
val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference)
maybeColumn.foreach(columns += _)
chopped = chopped.getParent
finished = maybeColumn.isEmpty || chopped.getParent == null
Expand All @@ -153,7 +155,8 @@ private[sql] object PartitioningUtils {

private def parsePartitionColumn(
columnSpec: String,
defaultPartitionName: String): Option[(String, Literal)] = {
defaultPartitionName: String,
typeInference: Boolean): Option[(String, Literal)] = {
val equalSignIndex = columnSpec.indexOf('=')
if (equalSignIndex == -1) {
None
Expand All @@ -164,7 +167,7 @@ private[sql] object PartitioningUtils {
val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'")

val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName)
val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName, typeInference)
Some(columnName -> literal)
}
}
Expand Down Expand Up @@ -211,19 +214,28 @@ private[sql] object PartitioningUtils {
*/
private[sql] def inferPartitionColumnValue(
raw: String,
defaultPartitionName: String): Literal = {
// First tries integral types
Try(Literal.create(Integer.parseInt(raw), IntegerType))
.orElse(Try(Literal.create(JLong.parseLong(raw), LongType)))
// Then falls back to fractional types
.orElse(Try(Literal.create(JFloat.parseFloat(raw), FloatType)))
.orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
.orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited)))
// Then falls back to string
.getOrElse {
if (raw == defaultPartitionName) Literal.create(null, NullType)
else Literal.create(unescapePathName(raw), StringType)
defaultPartitionName: String,
typeInference: Boolean): Literal = {
if (typeInference) {
// First tries integral types
Try(Literal.create(Integer.parseInt(raw), IntegerType))
.orElse(Try(Literal.create(JLong.parseLong(raw), LongType)))
// Then falls back to fractional types
.orElse(Try(Literal.create(JFloat.parseFloat(raw), FloatType)))
.orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
.orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited)))
// Then falls back to string
.getOrElse {
if (raw == defaultPartitionName) Literal.create(null, NullType)
else Literal.create(unescapePathName(raw), StringType)
}
} else {
if (raw == defaultPartitionName) {
Literal.create(null, NullType)
} else {
Literal.create(unescapePathName(raw), StringType)
}
}
}

private val upCastingOrder: Seq[DataType] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,9 +491,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}

private def discoverPartitions(): PartitionSpec = {
val typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled()
// We use leaf dirs containing data files to discover the schema.
val leafDirs = fileStatusCache.leafDirToChildrenFiles.keys.toSeq
PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME,
typeInference)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {

test("column type inference") {
def check(raw: String, literal: Literal): Unit = {
assert(inferPartitionColumnValue(raw, defaultPartitionName) === literal)
assert(inferPartitionColumnValue(raw, defaultPartitionName, true) === literal)
}

check("10", Literal.create(10, IntegerType))
Expand All @@ -60,12 +60,12 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {

test("parse partition") {
def check(path: String, expected: Option[PartitionValues]): Unit = {
assert(expected === parsePartition(new Path(path), defaultPartitionName))
assert(expected === parsePartition(new Path(path), defaultPartitionName, true))
}

def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = {
val message = intercept[T] {
parsePartition(new Path(path), defaultPartitionName).get
parsePartition(new Path(path), defaultPartitionName, true).get
}.getMessage

assert(message.contains(expected))
Expand Down Expand Up @@ -105,7 +105,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {

test("parse partitions") {
def check(paths: Seq[String], spec: PartitionSpec): Unit = {
assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName) === spec)
assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) === spec)
}

check(Seq(
Expand Down Expand Up @@ -174,6 +174,77 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
PartitionSpec.emptySpec)
}

test("parse partitions with type inference disabled") {
def check(paths: Seq[String], spec: PartitionSpec): Unit = {
assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, false) === spec)
}

check(Seq(
"hdfs://host:9000/path/a=10/b=hello"),
PartitionSpec(
StructType(Seq(
StructField("a", StringType),
StructField("b", StringType))),
Seq(Partition(Row("10", "hello"), "hdfs://host:9000/path/a=10/b=hello"))))

check(Seq(
"hdfs://host:9000/path/a=10/b=20",
"hdfs://host:9000/path/a=10.5/b=hello"),
PartitionSpec(
StructType(Seq(
StructField("a", StringType),
StructField("b", StringType))),
Seq(
Partition(Row("10", "20"), "hdfs://host:9000/path/a=10/b=20"),
Partition(Row("10.5", "hello"), "hdfs://host:9000/path/a=10.5/b=hello"))))

check(Seq(
"hdfs://host:9000/path/_temporary",
"hdfs://host:9000/path/a=10/b=20",
"hdfs://host:9000/path/a=10.5/b=hello",
"hdfs://host:9000/path/a=10.5/_temporary",
"hdfs://host:9000/path/a=10.5/_TeMpOrArY",
"hdfs://host:9000/path/a=10.5/b=hello/_temporary",
"hdfs://host:9000/path/a=10.5/b=hello/_TEMPORARY",
"hdfs://host:9000/path/_temporary/path",
"hdfs://host:9000/path/a=11/_temporary/path",
"hdfs://host:9000/path/a=10.5/b=world/_temporary/path"),
PartitionSpec(
StructType(Seq(
StructField("a", StringType),
StructField("b", StringType))),
Seq(
Partition(Row("10", "20"), "hdfs://host:9000/path/a=10/b=20"),
Partition(Row("10.5", "hello"), "hdfs://host:9000/path/a=10.5/b=hello"))))

check(Seq(
s"hdfs://host:9000/path/a=10/b=20",
s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"),
PartitionSpec(
StructType(Seq(
StructField("a", StringType),
StructField("b", StringType))),
Seq(
Partition(Row("10", "20"), s"hdfs://host:9000/path/a=10/b=20"),
Partition(Row(null, "hello"), s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"))))

check(Seq(
s"hdfs://host:9000/path/a=10/b=$defaultPartitionName",
s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"),
PartitionSpec(
StructType(Seq(
StructField("a", StringType),
StructField("b", StringType))),
Seq(
Partition(Row("10", null), s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
Partition(Row("10.5", null), s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"))))

check(Seq(
s"hdfs://host:9000/path1",
s"hdfs://host:9000/path2"),
PartitionSpec.emptySpec)
}

test("read partitioned table - normal case") {
withTempDir { base =>
for {
Expand Down

0 comments on commit 03ef6be

Please sign in to comment.