Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ object ExternalCatalogUtils {
}

def getPartitionPathString(col: String, value: String): String = {
val partitionString = if (value == null) {
val partitionString = if (value == null || value.isEmpty) {
DEFAULT_PARTITION_NAME
} else {
escapePathName(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,12 @@ case class CatalogTablePartition(
val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties)
val timeZoneId = caseInsensitiveProperties.getOrElse("timeZone", defaultTimeZondId)
InternalRow.fromSeq(partitionSchema.map { field =>
Cast(Literal(spec(field.name)), field.dataType, Option(timeZoneId)).eval()
val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
null
} else {
spec(field.name)
}
Cast(Literal(partValue), field.dataType, Option(timeZoneId)).eval()
})
}
}
Expand Down Expand Up @@ -163,7 +168,7 @@ case class BucketSpec(
* @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the
* catalog. If false, it is inferred automatically based on file
* structure.
* @param schemaPresevesCase Whether or not the schema resolved for this table is case-sensitive.
* @param schemaPreservesCase Whether or not the schema resolved for this table is case-sensitive.
* When using a Hive Metastore, this flag is set to false if a case-
* sensitive schema was unable to be read from the table properties.
* Used to trigger case-sensitive schema inference at query time, when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ case class FileSourceScanExec(
val input = ctx.freshName("input")
ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
val exprRows = output.zipWithIndex.map{ case (a, i) =>
new BoundReference(i, a.dataType, a.nullable)
BoundReference(i, a.dataType, a.nullable)
}
val row = ctx.freshName("row")
ctx.INPUT_ROW = row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,14 +335,11 @@ object FileFormatWriter extends Logging {
/** Expressions that given partition columns build a path string like: col1=val/col2=val/... */
private def partitionPathExpression: Seq[Expression] = {
desc.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
val escaped = ScalaUDF(
ExternalCatalogUtils.escapePathName _,
val partitionName = ScalaUDF(
ExternalCatalogUtils.getPartitionPathString _,
StringType,
Seq(Cast(c, StringType, Option(desc.timeZoneId))),
Seq(StringType))
val str = If(IsNull(c), Literal(ExternalCatalogUtils.DEFAULT_PARTITION_NAME), escaped)
val partitionName = Literal(ExternalCatalogUtils.escapePathName(c.name) + "=") :: str :: Nil
if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
Seq(Literal(c.name), Cast(c, StringType, Option(desc.timeZoneId))))
if (i == 0) Seq(partitionName) else Seq(Literal(Path.SEPARATOR), partitionName)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

// TODO: We should tighten up visibility of the classes here once we clean up Hive coupling.

Expand Down Expand Up @@ -129,7 +128,7 @@ object PartitioningUtils {
// "hdfs://host:9000/invalidPath"
// "hdfs://host:9000/path"
// TODO: Selective case sensitivity.
val discoveredBasePaths = optDiscoveredBasePaths.flatMap(x => x).map(_.toString.toLowerCase())
val discoveredBasePaths = optDiscoveredBasePaths.flatten.map(_.toString.toLowerCase())
assert(
discoveredBasePaths.distinct.size == 1,
"Conflicting directory structures detected. Suspicious paths:\b" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1004,8 +1004,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName)
val clientPartitionNames =
client.getPartitionNames(catalogTable, partialSpec.map(lowerCasePartitionSpec))
clientPartitionNames.map { partName =>
val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partName)
clientPartitionNames.map { partitionPath =>
val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partitionPath)
partSpec.map { case (partName, partValue) =>
partColNameMap(partName.toLowerCase) + "=" + escapePathName(partValue)
}.mkString("/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.File
import org.apache.hadoop.fs.Path

import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -316,6 +316,28 @@ class PartitionProviderCompatibilitySuite
}
}
}

test(s"SPARK-19887 partition value is null - partition management $enabled") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: is null -> is null or empty.

withTable("test") {
Seq((1, "p", 1), (2, null, 2)).toDF("a", "b", "c")
.write.partitionBy("b", "c").saveAsTable("test")
checkAnswer(spark.table("test"),
Row(1, "p", 1) :: Row(2, null, 2) :: Nil)

Seq((3, null: String, 3)).toDF("a", "b", "c")
.write.mode("append").partitionBy("b", "c").saveAsTable("test")
checkAnswer(spark.table("test"),
Row(1, "p", 1) :: Row(2, null, 2) :: Row(3, null, 3) :: Nil)
// make sure partition pruning also works.
checkAnswer(spark.table("test").filter($"b".isNotNull), Row(1, "p", 1))

// empty string is an invalid partition value and we treat it as null when read back.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks weird that you read back something different from what you wrote, "" and null are not the same strictly speaking. I would leave users to decide that "" is read back as null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember all the details as this PR is pretty old. This is probably the behavior of Hive so we just followed it.

Looking at it now, I agree it's not ideal to treat invalid partition values as null. We'd better fail earlier. Can we leave it as a known bug of v1 table and fix it in v2?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, Hive cannot differentiate null and empty string in this case and we basically followed that for compatibility.

Seq((4, "", 4)).toDF("a", "b", "c")
.write.mode("append").partitionBy("b", "c").saveAsTable("test")
checkAnswer(spark.table("test"),
Row(1, "p", 1) :: Row(2, null, 2) :: Row(3, null, 3) :: Row(4, null, 4) :: Nil)
}
}
}

/**
Expand Down