Skip to content

Commit a149250

Browse files
committed
Adds test case for the error message
1 parent 6b74dd8 commit a149250

File tree

2 files changed

+76
-26
lines changed

2 files changed

+76
-26
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -187,32 +187,9 @@ private[sql] object PartitioningUtils {
187187
Seq.empty
188188
} else {
189189
val distinctPartColNames = pathsWithPartitionValues.map(_._2.columnNames).distinct
190-
191-
def listConflictingPartitionColumns: String = {
192-
def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
193-
seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value })
194-
195-
val partColNamesToPaths = groupByKey(pathsWithPartitionValues.map {
196-
case (path, partValues) => partValues.columnNames -> path
197-
})
198-
199-
val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map {
200-
case (names, index) =>
201-
s"Partition column name list #$index: $names"
202-
}
203-
204-
// Lists out those non-leaf partition directories that also contain files
205-
val suspiciousPaths =
206-
distinctPartColNames.sortBy(_.length).init.flatMap(partColNamesToPaths)
207-
208-
s"Conflicting partition column names detected:\n" +
209-
distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
210-
"For partitioned table directories, data files should only live in leaf directories. " +
211-
"Please check the following directories for unexpected files:\n" +
212-
suspiciousPaths.mkString("\n\t", "\n\t", "\n")
213-
}
214-
215-
assert(distinctPartColNames.size == 1, listConflictingPartitionColumns)
190+
assert(
191+
distinctPartColNames.size == 1,
192+
listConflictingPartitionColumns(pathsWithPartitionValues))
216193

217194
// Resolves possible type conflicts for each column
218195
val values = pathsWithPartitionValues.map(_._2)
@@ -228,6 +205,34 @@ private[sql] object PartitioningUtils {
228205
}
229206
}
230207

208+
private[sql] def listConflictingPartitionColumns(
209+
pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
210+
val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct
211+
212+
def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
213+
seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value })
214+
215+
val partColNamesToPaths = groupByKey(pathWithPartitionValues.map {
216+
case (path, partValues) => partValues.columnNames -> path
217+
})
218+
219+
val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map {
220+
case (names, index) =>
221+
s"Partition column name list #$index: $names"
222+
}
223+
224+
// Lists out those non-leaf partition directories that also contain files
225+
val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths)
226+
227+
s"Conflicting partition column names detected:\n" +
228+
distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
229+
"For partitioned table directories, data files should only live in leaf directories.\n" +
230+
"And directories at the same level should have the same partition column name.\n" +
231+
"Please check the following directories for unexpected files or " +
232+
"inconsistent partition column names:\n" +
233+
suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
234+
}
235+
231236
/**
232237
* Converts a string to a [[Literal]] with automatic type inference. Currently only supports
233238
* [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.Unlimited]], and

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,4 +538,49 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
538538
checkAnswer(sqlContext.read.format("parquet").load(dir.getCanonicalPath), df)
539539
}
540540
}
541+
542+
test("listConflictingPartitionColumns") {
543+
def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]) = {
544+
val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) =>
545+
s"\tPartition column name list #$index: $list"
546+
}.mkString("\n", "\n", "\n")
547+
548+
// scalastyle:off
549+
s"""Conflicting partition column names detected:
550+
|$conflictingColNameLists
551+
|For partitioned table directories, data files should only live in leaf directories.
552+
|And directories at the same level should have the same partition column name.
553+
|Please check the following directories for unexpected files or inconsistent partition column names:
554+
|${paths.map("\t" + _).mkString("\n", "\n", "")}
555+
""".stripMargin.trim
556+
// scalastyle:on
557+
}
558+
559+
assert(
560+
listConflictingPartitionColumns(
561+
Seq(
562+
(new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))),
563+
(new Path("file:/tmp/foo/b=1"), PartitionValues(Seq("b"), Seq(Literal(1)))))).trim ===
564+
makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1", "file:/tmp/foo/b=1")))
565+
566+
assert(
567+
listConflictingPartitionColumns(
568+
Seq(
569+
(new Path("file:/tmp/foo/a=1/_temporary"), PartitionValues(Seq("a"), Seq(Literal(1)))),
570+
(new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))))).trim ===
571+
makeExpectedMessage(
572+
Seq("a"),
573+
Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1")))
574+
575+
assert(
576+
listConflictingPartitionColumns(
577+
Seq(
578+
(new Path("file:/tmp/foo/a=1"),
579+
PartitionValues(Seq("a"), Seq(Literal(1)))),
580+
(new Path("file:/tmp/foo/a=1/b=foo"),
581+
PartitionValues(Seq("a", "b"), Seq(Literal(1), Literal("foo")))))).trim ===
582+
makeExpectedMessage(
583+
Seq("a", "a, b"),
584+
Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo")))
585+
}
541586
}

0 commit comments

Comments
 (0)