Skip to content

Commit 6cc095c

Browse files
Yash DattaYash Datta
authored andcommitted
SPARK-5684: Change the way partition values are passed to ParquetScan
1 parent bc2f6b1 commit 6cc095c

File tree

3 files changed

+14
-11
lines changed

3 files changed

+14
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ private[sql] case class ParquetRelation(
4949
path: String,
5050
@transient conf: Option[Configuration],
5151
@transient sqlContext: SQLContext,
52-
partitioningAttributes: Seq[Attribute] = Nil)
52+
partitioningAttributes: Seq[Attribute] = Nil,
53+
partitionValues: String = "")
5354
extends LeafNode with MultiInstanceRelation {
5455

5556
self: Product =>
@@ -65,7 +66,7 @@ private[sql] case class ParquetRelation(
6566
override val output =
6667
partitioningAttributes ++
6768
ParquetTypesConverter.readSchemaFromFile(
68-
new Path(path.split(",").head.split("->").head),
69+
new Path(path.split(",").head),
6970
conf,
7071
sqlContext.conf.isParquetBinaryAsString,
7172
sqlContext.conf.isParquetINT96AsTimestamp)

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,11 @@ private[sql] case class ParquetTableScan(
8888
val conf: Configuration = ContextUtil.getConfiguration(job)
8989

9090
if (requestedPartitionOrdinals.nonEmpty) {
91-
relation.path.split(",").foreach { tcurPath =>
92-
val p = tcurPath.split("->")
93-
val curPath = p.apply(0)
94-
val partition = p.apply(1)
91+
val partVals = relation.partitionValues.split(",")
92+
var i = 0
93+
relation.path.split(",").foreach { curPath =>
94+
val partition = partVals.apply(i)
95+
i += 1
9596
val qualifiedPath = {
9697
val path = new Path(curPath)
9798
path.getFileSystem(conf).makeQualified(path)

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,16 @@ private[hive] trait HiveStrategies {
6060
implicit class LogicalPlanHacks(s: DataFrame) {
6161
def lowerCase: DataFrame = DataFrame(s.sqlContext, s.logicalPlan)
6262

63-
def addPartitioningAttributes(attrs: Seq[Attribute]): DataFrame = {
63+
def addPartitioningAttributes(attrs: Seq[Attribute], partVals: String): DataFrame = {
6464
// Don't add the partitioning key if its already present in the data.
6565
if (attrs.map(_.name).toSet.subsetOf(s.logicalPlan.output.map(_.name).toSet)) {
6666
s
6767
} else {
6868
DataFrame(
6969
s.sqlContext,
7070
s.logicalPlan transform {
71-
case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
71+
case p: ParquetRelation => p.copy(partitioningAttributes = attrs,
72+
partitionValues = partVals)
7273
})
7374
}
7475
}
@@ -137,15 +138,15 @@ private[hive] trait HiveStrategies {
137138
pruningCondition(inputData)
138139
}
139140

140-
val partitionLocations = partitions.map(part =>
141-
part.getLocation() + "->" + part.getName())
141+
val partitionLocations = partitions.map(part => part.getLocation)
142+
val partitionNames = partitions.map(part => part.getName)
142143

143144
if (partitionLocations.isEmpty) {
144145
PhysicalRDD(plan.output, sparkContext.emptyRDD[Row]) :: Nil
145146
} else {
146147
hiveContext
147148
.parquetFile(partitionLocations: _*)
148-
.addPartitioningAttributes(relation.partitionKeys)
149+
.addPartitioningAttributes(relation.partitionKeys, partitionNames.mkString(","))
149150
.lowerCase
150151
.where(unresolvedOtherPredicates)
151152
.select(unresolvedProjection: _*)

0 commit comments

Comments
 (0)