Skip to content

Commit c0d9b72

Browse files
committed
Avoid creating a HadoopRDD per partition. Add dirty hacks to retrieve partition values from the InputSplit.
1 parent 8cdc93c commit c0d9b72

File tree

3 files changed

+79
-45
lines changed

3 files changed

+79
-45
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
4747
private[sql] case class ParquetRelation(
4848
path: String,
4949
@transient conf: Option[Configuration],
50-
@transient sqlContext: SQLContext)
50+
@transient sqlContext: SQLContext,
51+
partitioningAttributes: Seq[Attribute] = Nil)
5152
extends LeafNode with MultiInstanceRelation {
5253

5354
self: Product =>
@@ -60,7 +61,9 @@ private[sql] case class ParquetRelation(
6061
.getSchema
6162

6263
/** Attributes */
63-
override val output = ParquetTypesConverter.readSchemaFromFile(new Path(path), conf)
64+
override val output =
65+
partitioningAttributes ++
66+
ParquetTypesConverter.readSchemaFromFile(new Path(path.split(",").head), conf)
6467

6568
override def newInstance = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
6669

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 56 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import parquet.schema.MessageType
4242

4343
import org.apache.spark.rdd.RDD
4444
import org.apache.spark.sql.SQLContext
45-
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
45+
import org.apache.spark.sql.catalyst.expressions._
4646
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
4747
import org.apache.spark.{Logging, SerializableWritable, TaskContext}
4848

@@ -59,28 +59,38 @@ case class ParquetTableScan(
5959
// The resolution of Parquet attributes is case sensitive, so we resolve the original attributes
6060
// by exprId. note: output cannot be transient, see
6161
// https://issues.apache.org/jira/browse/SPARK-1367
62-
val output = attributes.map { a =>
63-
relation.output
64-
.find(o => o.exprId == a.exprId)
65-
.getOrElse(sys.error(s"Invalid parquet attribute $a in ${relation.output.mkString(",")}"))
66-
}
62+
val normalOutput =
63+
attributes
64+
.filterNot(a => relation.partitioningAttributes.map(_.exprId).contains(a.exprId))
65+
.flatMap(a => relation.output.find(o => o.exprId == a.exprId))
66+
67+
val partOutput =
68+
attributes.flatMap(a => relation.partitioningAttributes.find(o => o.exprId == a.exprId))
69+
70+
def output = partOutput ++ normalOutput
71+
72+
assert(normalOutput.size + partOutput.size == attributes.size,
73+
s"$normalOutput + $partOutput != $attributes, ${relation.output}")
6774

6875
override def execute(): RDD[Row] = {
6976
val sc = sqlContext.sparkContext
7077
val job = new Job(sc.hadoopConfiguration)
7178
ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
7279

7380
val conf: Configuration = ContextUtil.getConfiguration(job)
74-
val qualifiedPath = {
75-
val path = new Path(relation.path)
76-
path.getFileSystem(conf).makeQualified(path)
81+
82+
relation.path.split(",").foreach { curPath =>
83+
val qualifiedPath = {
84+
val path = new Path(curPath)
85+
path.getFileSystem(conf).makeQualified(path)
86+
}
87+
NewFileInputFormat.addInputPath(job, qualifiedPath)
7788
}
78-
NewFileInputFormat.addInputPath(job, qualifiedPath)
7989

8090
// Store both requested and original schema in `Configuration`
8191
conf.set(
8292
RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
83-
ParquetTypesConverter.convertToString(output))
93+
ParquetTypesConverter.convertToString(normalOutput))
8494
conf.set(
8595
RowWriteSupport.SPARK_ROW_SCHEMA,
8696
ParquetTypesConverter.convertToString(relation.output))
@@ -96,13 +106,41 @@ case class ParquetTableScan(
96106
ParquetFilters.serializeFilterExpressions(columnPruningPred, conf)
97107
}
98108

99-
sc.newAPIHadoopRDD(
100-
conf,
101-
classOf[FilteringParquetRowInputFormat],
102-
classOf[Void],
103-
classOf[Row])
104-
.map(_._2)
105-
.filter(_ != null) // Parquet's record filters may produce null values
109+
val baseRDD =
110+
new org.apache.spark.rdd.NewHadoopRDD(
111+
sc,
112+
classOf[FilteringParquetRowInputFormat],
113+
classOf[Void],
114+
classOf[Row],
115+
conf)
116+
117+
if (partOutput.nonEmpty) {
118+
baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
119+
val partValue = "([^=]+)=([^=]+)".r
120+
val partValues =
121+
split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
122+
.getPath
123+
.toString
124+
.split("/")
125+
.flatMap {
126+
case partValue(key, value) => Some(key -> value)
127+
case _ => None
128+
}.toMap
129+
130+
val partitionRowValues =
131+
partOutput.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))
132+
133+
new Iterator[Row] {
134+
private[this] val joinedRow = new JoinedRow(Row(partitionRowValues:_*), null)
135+
136+
def hasNext = iter.hasNext
137+
138+
def next() = joinedRow.withRight(iter.next()._2)
139+
}
140+
}
141+
} else {
142+
baseRDD.map(_._2)
143+
}.filter(_ != null) // Parquet's record filters may produce null values
106144
}
107145

108146
/**

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema
2828
import org.apache.spark.sql.execution._
2929
import org.apache.spark.sql.hive.execution._
3030
import org.apache.spark.sql.columnar.InMemoryRelation
31-
import org.apache.spark.sql.parquet.ParquetTableScan
31+
import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTableScan}
3232

3333
import scala.collection.JavaConversions._
3434

@@ -51,6 +51,13 @@ private[hive] trait HiveStrategies {
5151
implicit class LogicalPlanHacks(s: SchemaRDD) {
5252
def lowerCase =
5353
new SchemaRDD(s.sqlContext, LowerCaseSchema(s.logicalPlan))
54+
55+
def addPartitioningAttributes(attrs: Seq[Attribute]) =
56+
new SchemaRDD(
57+
s.sqlContext,
58+
s.logicalPlan transform {
59+
case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
60+
})
5461
}
5562

5663
implicit class PhysicalPlanHacks(s: SparkPlan) {
@@ -76,8 +83,7 @@ private[hive] trait HiveStrategies {
7683
}).reduceOption(And).getOrElse(Literal(true))
7784

7885
val unresolvedProjection = projectList.map(_ transform {
79-
// Handle non-partitioning columns
80-
case a: AttributeReference if !partitionKeyIds.contains(a.exprId) => UnresolvedAttribute(a.name)
86+
case a: AttributeReference => UnresolvedAttribute(a.name)
8187
})
8288

8389
if (relation.hiveQlTable.isPartitioned) {
@@ -109,28 +115,15 @@ private[hive] trait HiveStrategies {
109115
pruningCondition(inputData)
110116
}
111117

112-
org.apache.spark.sql.execution.Union(
113-
partitions.par.map { p =>
114-
val partValues = p.getValues()
115-
val internalProjection = unresolvedProjection.map(_ transform {
116-
// Handle partitioning columns
117-
case a: AttributeReference if partitionKeyIds.contains(a.exprId) => {
118-
val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
119-
val key = relation.partitionKeys(idx)
120-
121-
Alias(Cast(Literal(partValues.get(idx), StringType), key.dataType), a.name)()
122-
}
123-
})
124-
125-
hiveContext
126-
.parquetFile(p.getLocation)
127-
.lowerCase
128-
.where(unresolvedOtherPredicates)
129-
.select(internalProjection:_*)
130-
.queryExecution
131-
.executedPlan
132-
.fakeOutput(projectList.map(_.toAttribute))
133-
}.seq) :: Nil
118+
hiveContext
119+
.parquetFile(partitions.map(_.getLocation).mkString(","))
120+
.addPartitioningAttributes(relation.partitionKeys)
121+
.lowerCase
122+
.where(unresolvedOtherPredicates)
123+
.select(unresolvedProjection:_*)
124+
.queryExecution
125+
.executedPlan
126+
.fakeOutput(projectList.map(_.toAttribute)):: Nil
134127
} else {
135128
hiveContext
136129
.parquetFile(relation.hiveQlTable.getDataLocation.getPath)

0 commit comments

Comments
 (0)