Skip to content

Commit e6003f0

Browse files
committed
[SPARK-5775] [SQL] BugFix: GenericRow cannot be cast to SpecificMutableRow when nested data and partitioned table
This PR adapts anselmevignon's apache#4697 to master and branch-1.3. Please refer to PR description of apache#4697 for details. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4792) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Author: Cheng Lian <liancheng@users.noreply.github.com> Author: Yin Huai <yhuai@databricks.com> Closes apache#4792 from liancheng/spark-5775 and squashes the following commits: 538f506 [Cheng Lian] Addresses comments cee55cf [Cheng Lian] Merge pull request #4 from yhuai/spark-5775-yin b0b74fb [Yin Huai] Remove runtime pattern matching. ca6e038 [Cheng Lian] Fixes SPARK-5775
1 parent 9168259 commit e6003f0

File tree

3 files changed

+217
-24
lines changed

3 files changed

+217
-24
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,13 @@ private[sql] case class ParquetTableScan(
126126
conf)
127127

128128
if (requestedPartitionOrdinals.nonEmpty) {
129+
// This check is based on CatalystConverter.createRootConverter.
130+
val primitiveRow = output.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))
131+
132+
// Uses temporary variable to avoid the whole `ParquetTableScan` object being captured into
133+
// the `mapPartitionsWithInputSplit` closure below.
134+
val outputSize = output.size
135+
129136
baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
130137
val partValue = "([^=]+)=([^=]+)".r
131138
val partValues =
@@ -143,19 +150,47 @@ private[sql] case class ParquetTableScan(
143150
relation.partitioningAttributes
144151
.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))
145152

146-
new Iterator[Row] {
147-
def hasNext = iter.hasNext
148-
def next() = {
149-
val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
150-
151-
// Parquet will leave partitioning columns empty, so we fill them in here.
152-
var i = 0
153-
while (i < requestedPartitionOrdinals.size) {
154-
row(requestedPartitionOrdinals(i)._2) =
155-
partitionRowValues(requestedPartitionOrdinals(i)._1)
156-
i += 1
153+
if (primitiveRow) {
154+
new Iterator[Row] {
155+
def hasNext = iter.hasNext
156+
def next() = {
157+
// We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
158+
val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
159+
160+
// Parquet will leave partitioning columns empty, so we fill them in here.
161+
var i = 0
162+
while (i < requestedPartitionOrdinals.size) {
163+
row(requestedPartitionOrdinals(i)._2) =
164+
partitionRowValues(requestedPartitionOrdinals(i)._1)
165+
i += 1
166+
}
167+
row
168+
}
169+
}
170+
} else {
171+
// Create a mutable row since we need to fill in values from partition columns.
172+
val mutableRow = new GenericMutableRow(outputSize)
173+
new Iterator[Row] {
174+
def hasNext = iter.hasNext
175+
def next() = {
176+
// We are using CatalystGroupConverter and it returns a GenericRow.
177+
// Since GenericRow is not mutable, we just cast it to a Row.
178+
val row = iter.next()._2.asInstanceOf[Row]
179+
180+
var i = 0
181+
while (i < row.size) {
182+
mutableRow(i) = row(i)
183+
i += 1
184+
}
185+
// Parquet will leave partitioning columns empty, so we fill them in here.
186+
i = 0
187+
while (i < requestedPartitionOrdinals.size) {
188+
mutableRow(requestedPartitionOrdinals(i)._2) =
189+
partitionRowValues(requestedPartitionOrdinals(i)._1)
190+
i += 1
191+
}
192+
mutableRow
157193
}
158-
row
159194
}
160195
}
161196
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -482,23 +482,53 @@ private[sql] case class ParquetRelation2(
482482
// When the data does not include the key and the key is requested then we must fill it in
483483
// based on information from the input split.
484484
if (!partitionKeysIncludedInDataSchema && partitionKeyLocations.nonEmpty) {
485+
// This check is based on CatalystConverter.createRootConverter.
486+
val primitiveRow =
487+
requestedSchema.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))
488+
485489
baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
486490
val partValues = selectedPartitions.collectFirst {
487491
case p if split.getPath.getParent.toString == p.path => p.values
488492
}.get
489493

490494
val requiredPartOrdinal = partitionKeyLocations.keys.toSeq
491495

492-
iterator.map { pair =>
493-
val row = pair._2.asInstanceOf[SpecificMutableRow]
494-
var i = 0
495-
while (i < requiredPartOrdinal.size) {
496-
// TODO Avoids boxing cost here!
497-
val partOrdinal = requiredPartOrdinal(i)
498-
row.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
499-
i += 1
496+
if (primitiveRow) {
497+
iterator.map { pair =>
498+
// We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
499+
val row = pair._2.asInstanceOf[SpecificMutableRow]
500+
var i = 0
501+
while (i < requiredPartOrdinal.size) {
502+
// TODO Avoids boxing cost here!
503+
val partOrdinal = requiredPartOrdinal(i)
504+
row.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
505+
i += 1
506+
}
507+
row
508+
}
509+
} else {
510+
// Create a mutable row since we need to fill in values from partition columns.
511+
val mutableRow = new GenericMutableRow(requestedSchema.size)
512+
iterator.map { pair =>
513+
// We are using CatalystGroupConverter and it returns a GenericRow.
514+
// Since GenericRow is not mutable, we just cast it to a Row.
515+
val row = pair._2.asInstanceOf[Row]
516+
var i = 0
517+
while (i < row.size) {
518+
// TODO Avoids boxing cost here!
519+
mutableRow(i) = row(i)
520+
i += 1
521+
}
522+
523+
i = 0
524+
while (i < requiredPartOrdinal.size) {
525+
// TODO Avoids boxing cost here!
526+
val partOrdinal = requiredPartOrdinal(i)
527+
mutableRow.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
528+
i += 1
529+
}
530+
mutableRow
500531
}
501-
row
502532
}
503533
}
504534
} else {

sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala

Lines changed: 131 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,20 @@ case class ParquetData(intField: Int, stringField: String)
3636
// The data that also includes the partitioning key
3737
case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
3838

39+
case class StructContainer(intStructField :Int, stringStructField: String)
40+
41+
case class ParquetDataWithComplexTypes(
42+
intField: Int,
43+
stringField: String,
44+
structField: StructContainer,
45+
arrayField: Seq[Int])
46+
47+
case class ParquetDataWithKeyAndComplexTypes(
48+
p: Int,
49+
intField: Int,
50+
stringField: String,
51+
structField: StructContainer,
52+
arrayField: Seq[Int])
3953

4054
/**
4155
* A suite to test the automatic conversion of metastore tables with parquet data to use the
@@ -86,6 +100,38 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
86100
location '${new File(normalTableDir, "normal").getCanonicalPath}'
87101
""")
88102

103+
sql(s"""
104+
CREATE EXTERNAL TABLE partitioned_parquet_with_complextypes
105+
(
106+
intField INT,
107+
stringField STRING,
108+
structField STRUCT<intStructField: INT, stringStructField: STRING>,
109+
arrayField ARRAY<INT>
110+
)
111+
PARTITIONED BY (p int)
112+
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
113+
STORED AS
114+
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
115+
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
116+
LOCATION '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
117+
""")
118+
119+
sql(s"""
120+
CREATE EXTERNAL TABLE partitioned_parquet_with_key_and_complextypes
121+
(
122+
intField INT,
123+
stringField STRING,
124+
structField STRUCT<intStructField: INT, stringStructField: STRING>,
125+
arrayField ARRAY<INT>
126+
)
127+
PARTITIONED BY (p int)
128+
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
129+
STORED AS
130+
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
131+
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
132+
LOCATION '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
133+
""")
134+
89135
(1 to 10).foreach { p =>
90136
sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
91137
}
@@ -94,7 +140,15 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
94140
sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
95141
}
96142

97-
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
143+
(1 to 10).foreach { p =>
144+
sql(s"ALTER TABLE partitioned_parquet_with_key_and_complextypes ADD PARTITION (p=$p)")
145+
}
146+
147+
(1 to 10).foreach { p =>
148+
sql(s"ALTER TABLE partitioned_parquet_with_complextypes ADD PARTITION (p=$p)")
149+
}
150+
151+
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
98152
jsonRDD(rdd1).registerTempTable("jt")
99153
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}"""))
100154
jsonRDD(rdd2).registerTempTable("jt_array")
@@ -105,6 +159,8 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
105159
override def afterAll(): Unit = {
106160
sql("DROP TABLE partitioned_parquet")
107161
sql("DROP TABLE partitioned_parquet_with_key")
162+
sql("DROP TABLE partitioned_parquet_with_complextypes")
163+
sql("DROP TABLE partitioned_parquet_with_key_and_complextypes")
108164
sql("DROP TABLE normal_parquet")
109165
sql("DROP TABLE IF EXISTS jt")
110166
sql("DROP TABLE IF EXISTS jt_array")
@@ -409,6 +465,22 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
409465
path '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
410466
)
411467
""")
468+
469+
sql( s"""
470+
CREATE TEMPORARY TABLE partitioned_parquet_with_key_and_complextypes
471+
USING org.apache.spark.sql.parquet
472+
OPTIONS (
473+
path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
474+
)
475+
""")
476+
477+
sql( s"""
478+
CREATE TEMPORARY TABLE partitioned_parquet_with_complextypes
479+
USING org.apache.spark.sql.parquet
480+
OPTIONS (
481+
path '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
482+
)
483+
""")
412484
}
413485

414486
test("SPARK-6016 make sure to use the latest footers") {
@@ -473,7 +545,8 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
473545
var partitionedTableDir: File = null
474546
var normalTableDir: File = null
475547
var partitionedTableDirWithKey: File = null
476-
548+
var partitionedTableDirWithComplexTypes: File = null
549+
var partitionedTableDirWithKeyAndComplexTypes: File = null
477550

478551
override def beforeAll(): Unit = {
479552
partitionedTableDir = File.createTempFile("parquettests", "sparksql")
@@ -509,9 +582,45 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
509582
.toDF()
510583
.saveAsParquetFile(partDir.getCanonicalPath)
511584
}
585+
586+
partitionedTableDirWithKeyAndComplexTypes = File.createTempFile("parquettests", "sparksql")
587+
partitionedTableDirWithKeyAndComplexTypes.delete()
588+
partitionedTableDirWithKeyAndComplexTypes.mkdir()
589+
590+
(1 to 10).foreach { p =>
591+
val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p")
592+
sparkContext.makeRDD(1 to 10).map { i =>
593+
ParquetDataWithKeyAndComplexTypes(
594+
p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
595+
}.toDF().saveAsParquetFile(partDir.getCanonicalPath)
596+
}
597+
598+
partitionedTableDirWithComplexTypes = File.createTempFile("parquettests", "sparksql")
599+
partitionedTableDirWithComplexTypes.delete()
600+
partitionedTableDirWithComplexTypes.mkdir()
601+
602+
(1 to 10).foreach { p =>
603+
val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p")
604+
sparkContext.makeRDD(1 to 10).map { i =>
605+
ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
606+
}.toDF().saveAsParquetFile(partDir.getCanonicalPath)
607+
}
608+
}
609+
610+
override protected def afterAll(): Unit = {
611+
partitionedTableDir.delete()
612+
normalTableDir.delete()
613+
partitionedTableDirWithKey.delete()
614+
partitionedTableDirWithComplexTypes.delete()
615+
partitionedTableDirWithKeyAndComplexTypes.delete()
512616
}
513617

514-
Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
618+
Seq(
619+
"partitioned_parquet",
620+
"partitioned_parquet_with_key",
621+
"partitioned_parquet_with_complextypes",
622+
"partitioned_parquet_with_key_and_complextypes").foreach { table =>
623+
515624
test(s"ordering of the partitioning columns $table") {
516625
checkAnswer(
517626
sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
@@ -601,6 +710,25 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
601710
}
602711
}
603712

713+
Seq(
714+
"partitioned_parquet_with_key_and_complextypes",
715+
"partitioned_parquet_with_complextypes").foreach { table =>
716+
717+
test(s"SPARK-5775 read struct from $table") {
718+
checkAnswer(
719+
sql(s"SELECT p, structField.intStructField, structField.stringStructField FROM $table WHERE p = 1"),
720+
(1 to 10).map(i => Row(1, i, f"${i}_string")))
721+
}
722+
723+
// Re-enable this after SPARK-5508 is fixed
724+
ignore(s"SPARK-5775 read array from $table") {
725+
checkAnswer(
726+
sql(s"SELECT arrayField, p FROM $table WHERE p = 1"),
727+
(1 to 10).map(i => Row(1 to i, 1)))
728+
}
729+
}
730+
731+
604732
test("non-part select(*)") {
605733
checkAnswer(
606734
sql("SELECT COUNT(*) FROM normal_parquet"),

0 commit comments

Comments
 (0)