Skip to content

Commit

Permalink
[SQL] Support partitioned parquet tables that have the key in both th…
Browse files Browse the repository at this point in the history
…e directory and the file

Author: Michael Armbrust <michael@databricks.com>

Closes apache#3272 from marmbrus/keyInPartitionedTable and squashes the following commits:

447f08c [Michael Armbrust] Support partitioned parquet tables that have the key in both the directory and the file
  • Loading branch information
marmbrus committed Nov 18, 2014
1 parent b54c6ab commit 90d72ec
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,18 @@ private[hive] trait HiveStrategies {
def lowerCase =
new SchemaRDD(s.sqlContext, s.logicalPlan)

def addPartitioningAttributes(attrs: Seq[Attribute]) =
new SchemaRDD(
s.sqlContext,
s.logicalPlan transform {
case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
})
def addPartitioningAttributes(attrs: Seq[Attribute]) = {
// Don't add the partitioning key if its already present in the data.
if (attrs.map(_.name).toSet.subsetOf(s.logicalPlan.output.map(_.name).toSet)) {
s
} else {
new SchemaRDD(
s.sqlContext,
s.logicalPlan transform {
case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
})
}
}
}

implicit class PhysicalPlanHacks(originalPlan: SparkPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive._

// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
// The data that also includes the partitioning key
case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)


/**
* Tests for our SerDe -> Native parquet scan conversion.
Expand All @@ -45,6 +49,17 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
.saveAsParquetFile(partDir.getCanonicalPath)
}

val partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql")
partitionedTableDirWithKey.delete()
partitionedTableDirWithKey.mkdir()

(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDirWithKey, s"p=$p")
sparkContext.makeRDD(1 to 10)
.map(i => ParquetDataWithKey(p, i, s"part-$p"))
.saveAsParquetFile(partDir.getCanonicalPath)
}

sql(s"""
create external table partitioned_parquet
(
Expand All @@ -59,6 +74,20 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
location '${partitionedTableDir.getCanonicalPath}'
""")

sql(s"""
create external table partitioned_parquet_with_key
(
intField INT,
stringField STRING
)
PARTITIONED BY (p int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
location '${partitionedTableDirWithKey.getCanonicalPath}'
""")

sql(s"""
create external table normal_parquet
(
Expand All @@ -76,82 +105,87 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
}

(1 to 10).foreach { p =>
sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
}

setConf("spark.sql.hive.convertMetastoreParquet", "true")
}

override def afterAll(): Unit = {
setConf("spark.sql.hive.convertMetastoreParquet", "false")
}

test("project the partitioning column") {
checkAnswer(
sql("SELECT p, count(*) FROM partitioned_parquet group by p"),
(1, 10) ::
(2, 10) ::
(3, 10) ::
(4, 10) ::
(5, 10) ::
(6, 10) ::
(7, 10) ::
(8, 10) ::
(9, 10) ::
(10, 10) :: Nil
)
}
Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
test(s"project the partitioning column $table") {
checkAnswer(
sql(s"SELECT p, count(*) FROM $table group by p"),
(1, 10) ::
(2, 10) ::
(3, 10) ::
(4, 10) ::
(5, 10) ::
(6, 10) ::
(7, 10) ::
(8, 10) ::
(9, 10) ::
(10, 10) :: Nil
)
}

test("project partitioning and non-partitioning columns") {
checkAnswer(
sql("SELECT stringField, p, count(intField) " +
"FROM partitioned_parquet GROUP BY p, stringField"),
("part-1", 1, 10) ::
("part-2", 2, 10) ::
("part-3", 3, 10) ::
("part-4", 4, 10) ::
("part-5", 5, 10) ::
("part-6", 6, 10) ::
("part-7", 7, 10) ::
("part-8", 8, 10) ::
("part-9", 9, 10) ::
("part-10", 10, 10) :: Nil
)
}
test(s"project partitioning and non-partitioning columns $table") {
checkAnswer(
sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"),
("part-1", 1, 10) ::
("part-2", 2, 10) ::
("part-3", 3, 10) ::
("part-4", 4, 10) ::
("part-5", 5, 10) ::
("part-6", 6, 10) ::
("part-7", 7, 10) ::
("part-8", 8, 10) ::
("part-9", 9, 10) ::
("part-10", 10, 10) :: Nil
)
}

test("simple count") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet"),
100)
}
test(s"simple count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table"),
100)
}

test("pruned count") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p = 1"),
10)
}
test(s"pruned count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
10)
}

test("multi-partition pruned count") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p IN (1,2,3)"),
30)
}
test(s"multi-partition pruned count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
30)
}

test("non-partition predicates") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet WHERE intField IN (1,2,3)"),
30)
}
test(s"non-partition predicates $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
30)
}

test("sum") {
checkAnswer(
sql("SELECT SUM(intField) FROM partitioned_parquet WHERE intField IN (1,2,3) AND p = 1"),
1 + 2 + 3)
}
test(s"sum $table") {
checkAnswer(
sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"),
1 + 2 + 3)
}

test("hive udfs") {
checkAnswer(
sql("SELECT concat(stringField, stringField) FROM partitioned_parquet"),
sql("SELECT stringField FROM partitioned_parquet").map {
case Row(s: String) => Row(s + s)
}.collect().toSeq)
test(s"hive udfs $table") {
checkAnswer(
sql(s"SELECT concat(stringField, stringField) FROM $table"),
sql(s"SELECT stringField FROM $table").map {
case Row(s: String) => Row(s + s)
}.collect().toSeq)
}
}

test("non-part select(*)") {
Expand Down

0 comments on commit 90d72ec

Please sign in to comment.