Skip to content

[SPARK-5775] BugFix: GenericRow cannot be cast to SpecificMutableRow when nested data and partitioned table #4697

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ case class ParquetTableScan(
conf)

if (requestedPartitionOrdinals.nonEmpty) {
// This check is based on CatalystConverter.createRootConverter.
val primitiveRow = output.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))

// Uses temporary variable to avoid the whole `ParquetTableScan` object being captured into
// the `mapPartitionsWithInputSplit` closure below.
val outputSize = output.size

baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
val partValue = "([^=]+)=([^=]+)".r
val partValues =
Expand All @@ -141,19 +148,46 @@ case class ParquetTableScan(
relation.partitioningAttributes
.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))

new Iterator[Row] {
def hasNext = iter.hasNext
def next() = {
val row = iter.next()._2.asInstanceOf[SpecificMutableRow]

// Parquet will leave partitioning columns empty, so we fill them in here.
var i = 0
while (i < requestedPartitionOrdinals.size) {
row(requestedPartitionOrdinals(i)._2) =
partitionRowValues(requestedPartitionOrdinals(i)._1)
i += 1
// Parquet will leave partitioning columns empty, so we fill them in here.
if (primitiveRow) {
new Iterator[Row] {
def hasNext = iter.hasNext
def next() = {
// We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
val row = iter.next()._2.asInstanceOf[SpecificMutableRow]

var i = 0
while (i < requestedPartitionOrdinals.size) {
row(requestedPartitionOrdinals(i)._2) =
partitionRowValues(requestedPartitionOrdinals(i)._1)
i += 1
}
row
}
}
} else {
// Create a mutable row since we need to fill in values from partition columns.
val mutableRow = new GenericMutableRow(outputSize)
new Iterator[Row] {
def hasNext = iter.hasNext
def next() = {
// We are using CatalystGroupConverter and it returns a GenericRow.
// Since GenericRow is not mutable, we just cast it to a Row.
val row = iter.next()._2.asInstanceOf[Row]

var i = 0
while (i < row.size) {
mutableRow(i) = row(i)
i += 1
}
i = 0
while (i < requestedPartitionOrdinals.size) {
mutableRow(requestedPartitionOrdinals(i)._2) =
partitionRowValues(requestedPartitionOrdinals(i)._1)
i += 1
}
mutableRow
}
row
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job}
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate

import parquet.hadoop.ParquetInputFormat
import parquet.hadoop.{ParquetInputSplit, ParquetInputFormat}
import parquet.hadoop.util.ContextUtil

import org.apache.spark.annotation.DeveloperApi
Expand All @@ -40,7 +39,7 @@ import scala.collection.JavaConversions._

/**
* Allows creation of parquet based tables using the syntax
* `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
* `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
* required is `path`, which should be the location of a collection of, optionally partitioned,
* parquet files.
*/
Expand Down Expand Up @@ -265,23 +264,45 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
// When the data does not include the key and the key is requested then we must fill it in
// based on information from the input split.
if (!dataIncludesKey && partitionKeyLocation != -1) {
baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
val primitiveRow =
requestedSchema.toAttributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))

baseRDD.mapPartitionsWithInputSplit { case (split, iterator) =>
val partValue = "([^=]+)=([^=]+)".r
val partValues =
split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
.getPath
.toString
.split("/")
.flatMap {
case partValue(key, value) => Some(key -> value)
case _ => None
}.toMap

val currentValue = partValues.values.head.toInt
iter.map { pair =>
val res = pair._2.asInstanceOf[SpecificMutableRow]
res.setInt(partitionKeyLocation, currentValue)
res
case partValue(key, value) => Some(key -> value)
case _ => None }
.toMap

if (primitiveRow) {
iterator.map { pair =>
// We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
val mutableRow = pair._2.asInstanceOf[SpecificMutableRow]
var i = 0
mutableRow.update(partitionKeyLocation, partValues.values.head.toInt)
mutableRow
}
} else {
// Create a mutable row since we need to fill in values from partition columns.
val mutableRow = new GenericMutableRow(requestedSchema.toAttributes.size)

iterator.map { pair =>
// We are using CatalystGroupConverter and it returns a GenericRow.
// Since GenericRow is not mutable, we just cast it to a Row.
val row = pair._2.asInstanceOf[Row]
var i = 0
while (i < row.size) {
mutableRow(i) = row(i)
i += 1
}
mutableRow.update(partitionKeyLocation, partValues.values.head.toInt)
mutableRow
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,20 @@ import org.apache.spark.sql.hive.test.TestHive._
case class ParquetData(intField: Int, stringField: String)
// The data that also includes the partitioning key
case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
case class StructContainer(intStructField: Int, stringStructField: String)

case class ParquetDataWithComplexTypes(
intField: Int,
stringField: String,
structField: StructContainer,
arrayField: Seq[Int])

case class ParquetDataWithKeyAndComplexTypes(
p: Int,
intField: Int,
stringField: String,
structField: StructContainer,
arrayField: Seq[Int])

/**
* A suite to test the automatic conversion of metastore tables with parquet data to use the
Expand Down Expand Up @@ -69,6 +82,38 @@ class ParquetMetastoreSuite extends ParquetTest {
location '${partitionedTableDirWithKey.getCanonicalPath}'
""")

sql(s"""
create external table partitioned_parquet_with_complextypes
(
intField INT,
stringField STRING,
structField STRUCT<intStructField: INT, stringStructField: STRING>,
arrayField ARRAY<INT>
)
PARTITIONED BY (p int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
location '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
""")

sql(s"""
create external table partitioned_parquet_with_key_and_complextypes
(
intField INT,
stringField STRING,
structField STRUCT<intStructField: INT, stringStructField: STRING>,
arrayField ARRAY<INT>
)
PARTITIONED BY (p int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
location '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
""")

sql(s"""
create external table normal_parquet
(
Expand All @@ -90,10 +135,24 @@ class ParquetMetastoreSuite extends ParquetTest {
sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
}

(1 to 10).foreach { p =>
sql(s"ALTER TABLE partitioned_parquet_with_key_and_complextypes ADD PARTITION (p=$p)")
}

(1 to 10).foreach { p =>
sql(s"ALTER TABLE partitioned_parquet_with_complextypes ADD PARTITION (p=$p)")
}

setConf("spark.sql.hive.convertMetastoreParquet", "true")
}

override def afterAll(): Unit = {
sql("DROP TABLE IF EXISTS partitioned_parquet")
sql("DROP TABLE IF EXISTS partitioned_parquet_with_key")
sql("DROP TABLE IF EXISTS partitioned_parquet_with_complextypes")
sql("DROP TABLE IF EXISTS partitioned_parquet_with_key_and_complextypes")
sql("DROP TABLE IF EXISTS normal_parquet")

setConf("spark.sql.hive.convertMetastoreParquet", "false")
}

Expand Down Expand Up @@ -139,6 +198,22 @@ class ParquetSourceSuite extends ParquetTest {
path '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
)
""")

sql( s"""
CREATE TEMPORARY TABLE partitioned_parquet_with_key_and_complextypes
USING org.apache.spark.sql.parquet
OPTIONS (
path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
)
""")

sql( s"""
CREATE TEMPORARY TABLE partitioned_parquet_with_complextypes
USING org.apache.spark.sql.parquet
OPTIONS (
path '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
)
""")
}
}

Expand All @@ -147,7 +222,10 @@ class ParquetSourceSuite extends ParquetTest {
*/
abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
var partitionedTableDir: File = null
var normalTableDir: File = null
var partitionedTableDirWithKey: File = null
var partitionedTableDirWithKeyAndComplexTypes: File = null
var partitionedTableDirWithComplexTypes: File = null

override def beforeAll(): Unit = {
partitionedTableDir = File.createTempFile("parquettests", "sparksql")
Expand All @@ -161,6 +239,15 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
.saveAsParquetFile(partDir.getCanonicalPath)
}

normalTableDir = File.createTempFile("parquettests", "sparksql")
normalTableDir.delete()
normalTableDir.mkdir()

sparkContext
.makeRDD(1 to 10)
.map(i => ParquetData(i, s"part-1"))
.saveAsParquetFile(new File(normalTableDir, "normal").getCanonicalPath)

partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql")
partitionedTableDirWithKey.delete()
partitionedTableDirWithKey.mkdir()
Expand All @@ -171,9 +258,46 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
.map(i => ParquetDataWithKey(p, i, s"part-$p"))
.saveAsParquetFile(partDir.getCanonicalPath)
}

partitionedTableDirWithKeyAndComplexTypes = File.createTempFile("parquettests", "sparksql")
partitionedTableDirWithKeyAndComplexTypes.delete()
partitionedTableDirWithKeyAndComplexTypes.mkdir()

(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p")
sparkContext.makeRDD(1 to 10)
.map(i => ParquetDataWithKeyAndComplexTypes(
p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i))
.saveAsParquetFile(partDir.getCanonicalPath)
}

partitionedTableDirWithComplexTypes = File.createTempFile("parquettests", "sparksql")
partitionedTableDirWithComplexTypes.delete()
partitionedTableDirWithComplexTypes.mkdir()

(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p")
sparkContext.makeRDD(1 to 10)
.map(i => ParquetDataWithComplexTypes(
i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i))
.saveAsParquetFile(partDir.getCanonicalPath)
}
}

Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
override protected def afterAll(): Unit = {
//delete temporary files
partitionedTableDir.delete()
normalTableDir.delete()
partitionedTableDirWithKey.delete()
partitionedTableDirWithKeyAndComplexTypes.delete()
partitionedTableDirWithComplexTypes.delete()
}

Seq(
"partitioned_parquet",
"partitioned_parquet_with_key",
"partitioned_parquet_with_complextypes",
"partitioned_parquet_with_key_and_complextypes").foreach { table =>
test(s"ordering of the partitioning columns $table") {
checkAnswer(
sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
Expand All @@ -186,6 +310,8 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
)
}



test(s"project the partitioning column $table") {
checkAnswer(
sql(s"SELECT p, count(*) FROM $table group by p"),
Expand Down Expand Up @@ -263,6 +389,29 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
}
}

Seq(
"partitioned_parquet_with_key_and_complextypes",
"partitioned_parquet_with_complextypes").foreach { table =>
test(s"SPARK-5775 read structure from $table") {
checkAnswer(
sql(s"""
SELECT
p,
structField.intStructField,
structField.stringStructField
FROM $table
WHERE p = 1"""),
(1 to 10).map(i => Row(1, i, f"${i}_string")))
}

// Re-enable this after SPARK-5508 is fixed
ignore(s"SPARK-5775 read array from $table") {
checkAnswer(
sql(s"SELECT arrayField, p FROM $table WHERE p = 1"),
(1 to 10).map(i => Row(1 to i, 1)))
}
}

test("non-part select(*)") {
checkAnswer(
sql("SELECT COUNT(*) FROM normal_parquet"),
Expand Down