Skip to content

Commit 3abd0c1

Browse files
committed
[SPARK-2406][SQL] Initial support for using ParquetTableScan to read HiveMetaStore tables.
This PR adds an experimental flag `spark.sql.hive.convertMetastoreParquet` that when true causes the planner to detects tables that use Hive's Parquet SerDe and instead plans them using Spark SQL's native `ParquetTableScan`. Author: Michael Armbrust <michael@databricks.com> Author: Yin Huai <huai@cse.ohio-state.edu> Closes apache#1819 from marmbrus/parquetMetastore and squashes the following commits: 1620079 [Michael Armbrust] Revert "remove hive parquet bundle" cc30430 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into parquetMetastore 4f3d54f [Michael Armbrust] fix style 41ebc5f [Michael Armbrust] remove hive parquet bundle a43e0da [Michael Armbrust] Merge remote-tracking branch 'origin/master' into parquetMetastore 4c4dc19 [Michael Armbrust] Fix bug with tree splicing. ebb267e [Michael Armbrust] include parquet hive to tests pass (Remove this later). c0d9b72 [Michael Armbrust] Avoid creating a HadoopRDD per partition. Add dirty hacks to retrieve partition values from the InputSplit. 8cdc93c [Michael Armbrust] Merge pull request #8 from yhuai/parquetMetastore a0baec7 [Yin Huai] Partitioning columns can be resolved. 1161338 [Michael Armbrust] Add a test to make sure conversion is actually happening 212d5cd [Michael Armbrust] Initial support for using ParquetTableScan to read HiveMetaStore tables.
1 parent 9eb74c7 commit 3abd0c1

File tree

8 files changed

+427
-23
lines changed

8 files changed

+427
-23
lines changed

project/SparkBuild.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,6 @@ object SQL {
228228
object Hive {
229229

230230
lazy val settings = Seq(
231-
232231
javaOptions += "-XX:MaxPermSize=1g",
233232
// Multiple queries rely on the TestHive singleton. See comments there for more details.
234233
parallelExecution in Test := false,

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,3 +303,15 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
303303
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
304304
}
305305
}
306+
307+
/**
308+
* :: DeveloperApi ::
309+
* A plan node that does nothing but lie about the output of its child. Used to spice a
310+
* (hopefully structurally equivalent) tree from a different optimization sequence into an already
311+
* resolved tree.
312+
*/
313+
@DeveloperApi
314+
case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
315+
def children = child :: Nil
316+
def execute() = child.execute()
317+
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
4747
private[sql] case class ParquetRelation(
4848
path: String,
4949
@transient conf: Option[Configuration],
50-
@transient sqlContext: SQLContext)
50+
@transient sqlContext: SQLContext,
51+
partitioningAttributes: Seq[Attribute] = Nil)
5152
extends LeafNode with MultiInstanceRelation {
5253

5354
self: Product =>
@@ -61,12 +62,13 @@ private[sql] case class ParquetRelation(
6162

6263
/** Attributes */
6364
override val output =
65+
partitioningAttributes ++
6466
ParquetTypesConverter.readSchemaFromFile(
65-
new Path(path),
67+
new Path(path.split(",").head),
6668
conf,
6769
sqlContext.isParquetBinaryAsString)
6870

69-
override def newInstance = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
71+
override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
7072

7173
// Equals must also take into account the output attributes so that we can distinguish between
7274
// different instances of the same relation,

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce._
3434
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
3535
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
3636
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
37+
3738
import parquet.hadoop._
3839
import parquet.hadoop.api.{InitContext, ReadSupport}
3940
import parquet.hadoop.metadata.GlobalMetaData
@@ -42,6 +43,7 @@ import parquet.io.ParquetDecodingException
4243
import parquet.schema.MessageType
4344

4445
import org.apache.spark.rdd.RDD
46+
import org.apache.spark.sql.catalyst.expressions._
4547
import org.apache.spark.sql.SQLConf
4648
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
4749
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
@@ -60,28 +62,38 @@ case class ParquetTableScan(
6062
// The resolution of Parquet attributes is case sensitive, so we resolve the original attributes
6163
// by exprId. note: output cannot be transient, see
6264
// https://issues.apache.org/jira/browse/SPARK-1367
63-
val output = attributes.map { a =>
64-
relation.output
65-
.find(o => o.exprId == a.exprId)
66-
.getOrElse(sys.error(s"Invalid parquet attribute $a in ${relation.output.mkString(",")}"))
67-
}
65+
val normalOutput =
66+
attributes
67+
.filterNot(a => relation.partitioningAttributes.map(_.exprId).contains(a.exprId))
68+
.flatMap(a => relation.output.find(o => o.exprId == a.exprId))
69+
70+
val partOutput =
71+
attributes.flatMap(a => relation.partitioningAttributes.find(o => o.exprId == a.exprId))
72+
73+
def output = partOutput ++ normalOutput
74+
75+
assert(normalOutput.size + partOutput.size == attributes.size,
76+
s"$normalOutput + $partOutput != $attributes, ${relation.output}")
6877

6978
override def execute(): RDD[Row] = {
7079
val sc = sqlContext.sparkContext
7180
val job = new Job(sc.hadoopConfiguration)
7281
ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
7382

7483
val conf: Configuration = ContextUtil.getConfiguration(job)
75-
val qualifiedPath = {
76-
val path = new Path(relation.path)
77-
path.getFileSystem(conf).makeQualified(path)
84+
85+
relation.path.split(",").foreach { curPath =>
86+
val qualifiedPath = {
87+
val path = new Path(curPath)
88+
path.getFileSystem(conf).makeQualified(path)
89+
}
90+
NewFileInputFormat.addInputPath(job, qualifiedPath)
7891
}
79-
NewFileInputFormat.addInputPath(job, qualifiedPath)
8092

8193
// Store both requested and original schema in `Configuration`
8294
conf.set(
8395
RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
84-
ParquetTypesConverter.convertToString(output))
96+
ParquetTypesConverter.convertToString(normalOutput))
8597
conf.set(
8698
RowWriteSupport.SPARK_ROW_SCHEMA,
8799
ParquetTypesConverter.convertToString(relation.output))
@@ -102,13 +114,41 @@ case class ParquetTableScan(
102114
SQLConf.PARQUET_CACHE_METADATA,
103115
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "false"))
104116

105-
sc.newAPIHadoopRDD(
106-
conf,
107-
classOf[FilteringParquetRowInputFormat],
108-
classOf[Void],
109-
classOf[Row])
110-
.map(_._2)
111-
.filter(_ != null) // Parquet's record filters may produce null values
117+
val baseRDD =
118+
new org.apache.spark.rdd.NewHadoopRDD(
119+
sc,
120+
classOf[FilteringParquetRowInputFormat],
121+
classOf[Void],
122+
classOf[Row],
123+
conf)
124+
125+
if (partOutput.nonEmpty) {
126+
baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
127+
val partValue = "([^=]+)=([^=]+)".r
128+
val partValues =
129+
split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
130+
.getPath
131+
.toString
132+
.split("/")
133+
.flatMap {
134+
case partValue(key, value) => Some(key -> value)
135+
case _ => None
136+
}.toMap
137+
138+
val partitionRowValues =
139+
partOutput.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))
140+
141+
new Iterator[Row] {
142+
private[this] val joinedRow = new JoinedRow(Row(partitionRowValues:_*), null)
143+
144+
def hasNext = iter.hasNext
145+
146+
def next() = joinedRow.withRight(iter.next()._2)
147+
}
148+
}
149+
} else {
150+
baseRDD.map(_._2)
151+
}.filter(_ != null) // Parquet's record filters may produce null values
112152
}
113153

114154
/**

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
7979
// Change the default SQL dialect to HiveQL
8080
override private[spark] def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
8181

82+
/**
83+
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
84+
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
85+
* SerDe.
86+
*/
87+
private[spark] def convertMetastoreParquet: Boolean =
88+
getConf("spark.sql.hive.convertMetastoreParquet", "false") == "true"
89+
8290
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
8391
new this.QueryExecution { val logical = plan }
8492

@@ -326,6 +334,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
326334
TakeOrdered,
327335
ParquetOperations,
328336
InMemoryScans,
337+
ParquetConversion, // Must be before HiveTableScans
329338
HiveTableScans,
330339
DataSinks,
331340
Scripts,

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,136 @@
1717

1818
package org.apache.spark.sql.hive
1919

20-
import org.apache.spark.sql.SQLContext
20+
import org.apache.spark.annotation.Experimental
21+
import org.apache.spark.sql._
22+
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
2123
import org.apache.spark.sql.catalyst.expressions._
24+
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
2225
import org.apache.spark.sql.catalyst.planning._
2326
import org.apache.spark.sql.catalyst.plans._
24-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
27+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema}
2528
import org.apache.spark.sql.execution._
2629
import org.apache.spark.sql.hive.execution._
2730
import org.apache.spark.sql.columnar.InMemoryRelation
31+
import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTableScan}
32+
33+
import scala.collection.JavaConversions._
2834

2935
private[hive] trait HiveStrategies {
3036
// Possibly being too clever with types here... or not clever enough.
3137
self: SQLContext#SparkPlanner =>
3238

3339
val hiveContext: HiveContext
3440

41+
/**
42+
* :: Experimental ::
43+
* Finds table scans that would use the Hive SerDe and replaces them with our own native parquet
44+
* table scan operator.
45+
*
46+
* TODO: Much of this logic is duplicated in HiveTableScan. Ideally we would do some refactoring
47+
* but since this is after the code freeze for 1.1 all logic is here to minimize disruption.
48+
*
49+
* Other issues:
50+
* - Much of this logic assumes case insensitive resolution.
51+
*/
52+
@Experimental
53+
object ParquetConversion extends Strategy {
54+
implicit class LogicalPlanHacks(s: SchemaRDD) {
55+
def lowerCase =
56+
new SchemaRDD(s.sqlContext, LowerCaseSchema(s.logicalPlan))
57+
58+
def addPartitioningAttributes(attrs: Seq[Attribute]) =
59+
new SchemaRDD(
60+
s.sqlContext,
61+
s.logicalPlan transform {
62+
case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
63+
})
64+
}
65+
66+
implicit class PhysicalPlanHacks(originalPlan: SparkPlan) {
67+
def fakeOutput(newOutput: Seq[Attribute]) =
68+
OutputFaker(
69+
originalPlan.output.map(a =>
70+
newOutput.find(a.name.toLowerCase == _.name.toLowerCase)
71+
.getOrElse(
72+
sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))),
73+
originalPlan)
74+
}
75+
76+
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
77+
case PhysicalOperation(projectList, predicates, relation: MetastoreRelation)
78+
if relation.tableDesc.getSerdeClassName.contains("Parquet") &&
79+
hiveContext.convertMetastoreParquet =>
80+
81+
// Filter out all predicates that only deal with partition keys
82+
val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet
83+
val (pruningPredicates, otherPredicates) = predicates.partition {
84+
_.references.map(_.exprId).subsetOf(partitionKeyIds)
85+
}
86+
87+
// We are going to throw the predicates and projection back at the whole optimization
88+
// sequence so lets unresolve all the attributes, allowing them to be rebound to the
89+
// matching parquet attributes.
90+
val unresolvedOtherPredicates = otherPredicates.map(_ transform {
91+
case a: AttributeReference => UnresolvedAttribute(a.name)
92+
}).reduceOption(And).getOrElse(Literal(true))
93+
94+
val unresolvedProjection = projectList.map(_ transform {
95+
case a: AttributeReference => UnresolvedAttribute(a.name)
96+
})
97+
98+
if (relation.hiveQlTable.isPartitioned) {
99+
val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
100+
// Translate the predicate so that it automatically casts the input values to the correct
101+
// data types during evaluation
102+
val castedPredicate = rawPredicate transform {
103+
case a: AttributeReference =>
104+
val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
105+
val key = relation.partitionKeys(idx)
106+
Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
107+
}
108+
109+
val inputData = new GenericMutableRow(relation.partitionKeys.size)
110+
val pruningCondition =
111+
if(codegenEnabled) {
112+
GeneratePredicate(castedPredicate)
113+
} else {
114+
InterpretedPredicate(castedPredicate)
115+
}
116+
117+
val partitions = relation.hiveQlPartitions.filter { part =>
118+
val partitionValues = part.getValues
119+
var i = 0
120+
while (i < partitionValues.size()) {
121+
inputData(i) = partitionValues(i)
122+
i += 1
123+
}
124+
pruningCondition(inputData)
125+
}
126+
127+
hiveContext
128+
.parquetFile(partitions.map(_.getLocation).mkString(","))
129+
.addPartitioningAttributes(relation.partitionKeys)
130+
.lowerCase
131+
.where(unresolvedOtherPredicates)
132+
.select(unresolvedProjection:_*)
133+
.queryExecution
134+
.executedPlan
135+
.fakeOutput(projectList.map(_.toAttribute)):: Nil
136+
} else {
137+
hiveContext
138+
.parquetFile(relation.hiveQlTable.getDataLocation.getPath)
139+
.lowerCase
140+
.where(unresolvedOtherPredicates)
141+
.select(unresolvedProjection:_*)
142+
.queryExecution
143+
.executedPlan
144+
.fakeOutput(projectList.map(_.toAttribute)) :: Nil
145+
}
146+
case _ => Nil
147+
}
148+
}
149+
35150
object Scripts extends Strategy {
36151
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
37152
case logical.ScriptTransformation(input, script, output, child) =>
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.parquet
19+
20+
import java.util.Properties
21+
22+
import org.apache.hadoop.conf.Configuration
23+
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category
24+
import org.apache.hadoop.hive.serde2.{SerDeStats, SerDe}
25+
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
26+
import org.apache.hadoop.io.Writable
27+
28+
/**
29+
* A placeholder that allows SparkSQL users to create metastore tables that are stored as
30+
* parquet files. It is only intended to pass the checks that the serde is valid and exists
31+
* when a CREATE TABLE is run. The actual work of decoding will be done by ParquetTableScan
32+
* when "spark.sql.hive.convertMetastoreParquet" is set to true.
33+
*/
34+
@deprecated("No code should depend on FakeParquetHiveSerDe as it is only intended as a " +
35+
"placeholder in the Hive MetaStore")
36+
class FakeParquetSerDe extends SerDe {
37+
override def getObjectInspector: ObjectInspector = new ObjectInspector {
38+
override def getCategory: Category = Category.PRIMITIVE
39+
40+
override def getTypeName: String = "string"
41+
}
42+
43+
override def deserialize(p1: Writable): AnyRef = throwError
44+
45+
override def initialize(p1: Configuration, p2: Properties): Unit = {}
46+
47+
override def getSerializedClass: Class[_ <: Writable] = throwError
48+
49+
override def getSerDeStats: SerDeStats = throwError
50+
51+
override def serialize(p1: scala.Any, p2: ObjectInspector): Writable = throwError
52+
53+
private def throwError =
54+
sys.error(
55+
"spark.sql.hive.convertMetastoreParquet must be set to true to use FakeParquetSerDe")
56+
}

0 commit comments

Comments
 (0)