Skip to content

Commit cc465fd

Browse files
committed
[SPARK-8138] [SQL] Improves error message when conflicting partition columns are found
This PR improves the error message shown when conflicting partition column names are detected. This can be particularly annoying and confusing when there are a large number of partitions while a handful of them happened to contain unexpected temporary file(s). Now all suspicious directories are listed as below: ``` java.lang.AssertionError: assertion failed: Conflicting partition column names detected: Partition column name list #0: b, c, d Partition column name list #1: b, c Partition column name list #2: b For partitioned table directories, data files should only live in leaf directories. Please check the following directories for unexpected files: file:/tmp/foo/b=0 file:/tmp/foo/b=1 file:/tmp/foo/b=1/c=1 file:/tmp/foo/b=0/c=0 ``` Author: Cheng Lian <lian@databricks.com> Closes apache#6610 from liancheng/part-errmsg and squashes the following commits: 7d05f2c [Cheng Lian] Fixes Scala style issue a149250 [Cheng Lian] Adds test case for the error message 6b74dd8 [Cheng Lian] Also lists suspicious non-leaf partition directories a935eb8 [Cheng Lian] Improves error message when conflicting partition columns are found
1 parent 09fcf96 commit cc465fd

File tree

2 files changed

+82
-10
lines changed

2 files changed

+82
-10
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private[sql] object PartitioningUtils {
8484
} else {
8585
// This dataset is partitioned. We need to check whether all partitions have the same
8686
// partition columns and resolve potential type conflicts.
87-
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues.map(_._2))
87+
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
8888

8989
// Creates the StructType which represents the partition columns.
9090
val fields = {
@@ -181,19 +181,18 @@ private[sql] object PartitioningUtils {
181181
* StringType
182182
* }}}
183183
*/
184-
private[sql] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = {
185-
// Column names of all partitions must match
186-
val distinctPartitionsColNames = values.map(_.columnNames).distinct
187-
188-
if (distinctPartitionsColNames.isEmpty) {
184+
private[sql] def resolvePartitions(
185+
pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
186+
if (pathsWithPartitionValues.isEmpty) {
189187
Seq.empty
190188
} else {
191-
assert(distinctPartitionsColNames.size == 1, {
192-
val list = distinctPartitionsColNames.mkString("\t", "\n\t", "")
193-
s"Conflicting partition column names detected:\n$list"
194-
})
189+
val distinctPartColNames = pathsWithPartitionValues.map(_._2.columnNames).distinct
190+
assert(
191+
distinctPartColNames.size == 1,
192+
listConflictingPartitionColumns(pathsWithPartitionValues))
195193

196194
// Resolves possible type conflicts for each column
195+
val values = pathsWithPartitionValues.map(_._2)
197196
val columnCount = values.head.columnNames.size
198197
val resolvedValues = (0 until columnCount).map { i =>
199198
resolveTypeConflicts(values.map(_.literals(i)))
@@ -206,6 +205,34 @@ private[sql] object PartitioningUtils {
206205
}
207206
}
208207

208+
private[sql] def listConflictingPartitionColumns(
209+
pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
210+
val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct
211+
212+
def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
213+
seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value })
214+
215+
val partColNamesToPaths = groupByKey(pathWithPartitionValues.map {
216+
case (path, partValues) => partValues.columnNames -> path
217+
})
218+
219+
val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map {
220+
case (names, index) =>
221+
s"Partition column name list #$index: $names"
222+
}
223+
224+
// Lists out those non-leaf partition directories that also contain files
225+
val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths)
226+
227+
s"Conflicting partition column names detected:\n" +
228+
distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
229+
"For partitioned table directories, data files should only live in leaf directories.\n" +
230+
"And directories at the same level should have the same partition column name.\n" +
231+
"Please check the following directories for unexpected files or " +
232+
"inconsistent partition column names:\n" +
233+
suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
234+
}
235+
209236
/**
210237
* Converts a string to a [[Literal]] with automatic type inference. Currently only supports
211238
* [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.Unlimited]], and

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,4 +538,49 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
538538
checkAnswer(sqlContext.read.format("parquet").load(dir.getCanonicalPath), df)
539539
}
540540
}
541+
542+
test("listConflictingPartitionColumns") {
543+
def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = {
544+
val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) =>
545+
s"\tPartition column name list #$index: $list"
546+
}.mkString("\n", "\n", "\n")
547+
548+
// scalastyle:off
549+
s"""Conflicting partition column names detected:
550+
|$conflictingColNameLists
551+
|For partitioned table directories, data files should only live in leaf directories.
552+
|And directories at the same level should have the same partition column name.
553+
|Please check the following directories for unexpected files or inconsistent partition column names:
554+
|${paths.map("\t" + _).mkString("\n", "\n", "")}
555+
""".stripMargin.trim
556+
// scalastyle:on
557+
}
558+
559+
assert(
560+
listConflictingPartitionColumns(
561+
Seq(
562+
(new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))),
563+
(new Path("file:/tmp/foo/b=1"), PartitionValues(Seq("b"), Seq(Literal(1)))))).trim ===
564+
makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1", "file:/tmp/foo/b=1")))
565+
566+
assert(
567+
listConflictingPartitionColumns(
568+
Seq(
569+
(new Path("file:/tmp/foo/a=1/_temporary"), PartitionValues(Seq("a"), Seq(Literal(1)))),
570+
(new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))))).trim ===
571+
makeExpectedMessage(
572+
Seq("a"),
573+
Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1")))
574+
575+
assert(
576+
listConflictingPartitionColumns(
577+
Seq(
578+
(new Path("file:/tmp/foo/a=1"),
579+
PartitionValues(Seq("a"), Seq(Literal(1)))),
580+
(new Path("file:/tmp/foo/a=1/b=foo"),
581+
PartitionValues(Seq("a", "b"), Seq(Literal(1), Literal("foo")))))).trim ===
582+
makeExpectedMessage(
583+
Seq("a", "a, b"),
584+
Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo")))
585+
}
541586
}

0 commit comments

Comments
 (0)