Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SQL] Support partitioned parquet tables that have the key in both the directory and the file #3272

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,18 @@ private[hive] trait HiveStrategies {
def lowerCase =
new SchemaRDD(s.sqlContext, s.logicalPlan)

def addPartitioningAttributes(attrs: Seq[Attribute]) =
new SchemaRDD(
s.sqlContext,
s.logicalPlan transform {
case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
})
def addPartitioningAttributes(attrs: Seq[Attribute]) = {
// Don't add the partitioning key if its already present in the data.
if (attrs.map(_.name).toSet.subsetOf(s.logicalPlan.output.map(_.name).toSet)) {
s
} else {
new SchemaRDD(
s.sqlContext,
s.logicalPlan transform {
case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
})
}
}
}

implicit class PhysicalPlanHacks(originalPlan: SparkPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive._

// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
// The data that also includes the partitioning key
case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)


/**
* Tests for our SerDe -> Native parquet scan conversion.
Expand All @@ -45,6 +49,17 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
.saveAsParquetFile(partDir.getCanonicalPath)
}

val partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql")
partitionedTableDirWithKey.delete()
partitionedTableDirWithKey.mkdir()

(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDirWithKey, s"p=$p")
sparkContext.makeRDD(1 to 10)
.map(i => ParquetDataWithKey(p, i, s"part-$p"))
.saveAsParquetFile(partDir.getCanonicalPath)
}

sql(s"""
create external table partitioned_parquet
(
Expand All @@ -59,6 +74,20 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
location '${partitionedTableDir.getCanonicalPath}'
""")

sql(s"""
create external table partitioned_parquet_with_key
(
intField INT,
stringField STRING
)
PARTITIONED BY (p int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
location '${partitionedTableDirWithKey.getCanonicalPath}'
""")

sql(s"""
create external table normal_parquet
(
Expand All @@ -76,82 +105,87 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
}

(1 to 10).foreach { p =>
sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
}

setConf("spark.sql.hive.convertMetastoreParquet", "true")
}

override def afterAll(): Unit = {
setConf("spark.sql.hive.convertMetastoreParquet", "false")
}

test("project the partitioning column") {
checkAnswer(
sql("SELECT p, count(*) FROM partitioned_parquet group by p"),
(1, 10) ::
(2, 10) ::
(3, 10) ::
(4, 10) ::
(5, 10) ::
(6, 10) ::
(7, 10) ::
(8, 10) ::
(9, 10) ::
(10, 10) :: Nil
)
}
Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
test(s"project the partitioning column $table") {
checkAnswer(
sql(s"SELECT p, count(*) FROM $table group by p"),
(1, 10) ::
(2, 10) ::
(3, 10) ::
(4, 10) ::
(5, 10) ::
(6, 10) ::
(7, 10) ::
(8, 10) ::
(9, 10) ::
(10, 10) :: Nil
)
}

test("project partitioning and non-partitioning columns") {
checkAnswer(
sql("SELECT stringField, p, count(intField) " +
"FROM partitioned_parquet GROUP BY p, stringField"),
("part-1", 1, 10) ::
("part-2", 2, 10) ::
("part-3", 3, 10) ::
("part-4", 4, 10) ::
("part-5", 5, 10) ::
("part-6", 6, 10) ::
("part-7", 7, 10) ::
("part-8", 8, 10) ::
("part-9", 9, 10) ::
("part-10", 10, 10) :: Nil
)
}
test(s"project partitioning and non-partitioning columns $table") {
checkAnswer(
sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"),
("part-1", 1, 10) ::
("part-2", 2, 10) ::
("part-3", 3, 10) ::
("part-4", 4, 10) ::
("part-5", 5, 10) ::
("part-6", 6, 10) ::
("part-7", 7, 10) ::
("part-8", 8, 10) ::
("part-9", 9, 10) ::
("part-10", 10, 10) :: Nil
)
}

test("simple count") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet"),
100)
}
test(s"simple count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table"),
100)
}

test("pruned count") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p = 1"),
10)
}
test(s"pruned count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
10)
}

test("multi-partition pruned count") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p IN (1,2,3)"),
30)
}
test(s"multi-partition pruned count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
30)
}

test("non-partition predicates") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet WHERE intField IN (1,2,3)"),
30)
}
test(s"non-partition predicates $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
30)
}

test("sum") {
checkAnswer(
sql("SELECT SUM(intField) FROM partitioned_parquet WHERE intField IN (1,2,3) AND p = 1"),
1 + 2 + 3)
}
test(s"sum $table") {
checkAnswer(
sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"),
1 + 2 + 3)
}

test("hive udfs") {
checkAnswer(
sql("SELECT concat(stringField, stringField) FROM partitioned_parquet"),
sql("SELECT stringField FROM partitioned_parquet").map {
case Row(s: String) => Row(s + s)
}.collect().toSeq)
test(s"hive udfs $table") {
checkAnswer(
sql(s"SELECT concat(stringField, stringField) FROM $table"),
sql(s"SELECT stringField FROM $table").map {
case Row(s: String) => Row(s + s)
}.collect().toSeq)
}
}

test("non-part select(*)") {
Expand Down