Skip to content

Commit b0b74fb

Browse files
committed
Remove runtime pattern matching.
1 parent ca6e038 commit b0b74fb

File tree

2 files changed

+60
-36
lines changed

2 files changed

+60
-36
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,9 @@ private[sql] case class ParquetTableScan(
126126
conf)
127127

128128
if (requestedPartitionOrdinals.nonEmpty) {
129+
// This check if based on CatalystConverter.createRootConverter.
130+
val primitiveRow = output.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))
131+
129132
baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
130133
val partValue = "([^=]+)=([^=]+)".r
131134
val partValues =
@@ -143,37 +146,46 @@ private[sql] case class ParquetTableScan(
143146
relation.partitioningAttributes
144147
.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))
145148

146-
val mutableRow = new GenericMutableRow(output.size)
147-
148-
new Iterator[Row] {
149-
def hasNext = iter.hasNext
150-
def next() = {
151-
iter.next() match {
152-
case (_, row: SpecificMutableRow) =>
153-
// Parquet will leave partitioning columns empty, so we fill them in here.
154-
var i = 0
155-
while (i < requestedPartitionOrdinals.size) {
156-
row(requestedPartitionOrdinals(i)._2) =
157-
partitionRowValues(requestedPartitionOrdinals(i)._1)
158-
i += 1
159-
}
160-
row
161-
162-
case (_, row: Row) =>
163-
var i = 0
164-
while (i < row.size) {
165-
mutableRow(i) = row(i)
166-
i += 1
167-
}
168-
169-
i = 0
170-
while (i < requestedPartitionOrdinals.size) {
171-
mutableRow(requestedPartitionOrdinals(i)._2) =
172-
partitionRowValues(requestedPartitionOrdinals(i)._1)
173-
i += 1
174-
}
175-
176-
mutableRow
149+
if (primitiveRow) {
150+
new Iterator[Row] {
151+
def hasNext = iter.hasNext
152+
def next() = {
153+
// We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
154+
val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
155+
156+
// Parquet will leave partitioning columns empty, so we fill them in here.
157+
var i = 0
158+
while (i < requestedPartitionOrdinals.size) {
159+
row(requestedPartitionOrdinals(i)._2) =
160+
partitionRowValues(requestedPartitionOrdinals(i)._1)
161+
i += 1
162+
}
163+
row
164+
}
165+
}
166+
} else {
167+
// Create a mutable row since we need to fill in values from partition columns.
168+
val mutableRow = new GenericMutableRow(output.size)
169+
new Iterator[Row] {
170+
def hasNext = iter.hasNext
171+
def next() = {
172+
// We are using CatalystGroupConverter and it returns a GenericRow.
173+
// Since GenericRow is not mutable, we just cast it to a Row.
174+
val row = iter.next()._2.asInstanceOf[Row]
175+
176+
var i = 0
177+
while (i < row.size) {
178+
mutableRow(i) = row(i)
179+
i += 1
180+
}
181+
// Parquet will leave partitioning columns empty, so we fill them in here.
182+
i = 0
183+
while (i < requestedPartitionOrdinals.size) {
184+
mutableRow(requestedPartitionOrdinals(i)._2) =
185+
partitionRowValues(requestedPartitionOrdinals(i)._1)
186+
i += 1
187+
}
188+
mutableRow
177189
}
178190
}
179191
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -476,16 +476,21 @@ private[sql] case class ParquetRelation2(
476476
// When the data does not include the key and the key is requested then we must fill it in
477477
// based on information from the input split.
478478
if (!partitionKeysIncludedInDataSchema && partitionKeyLocations.nonEmpty) {
479+
// This check if based on CatalystConverter.createRootConverter.
480+
val primitiveRow =
481+
requestedSchema.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))
482+
479483
baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
480484
val partValues = selectedPartitions.collectFirst {
481485
case p if split.getPath.getParent.toString == p.path => p.values
482486
}.get
483487

484488
val requiredPartOrdinal = partitionKeyLocations.keys.toSeq
485-
val mutableRow = new GenericMutableRow(requestedSchema.size)
486489

487-
iterator.map {
488-
case (_, row: SpecificMutableRow) =>
490+
if (primitiveRow) {
491+
iterator.map { pair =>
492+
// We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
493+
val row = pair._2.asInstanceOf[SpecificMutableRow]
489494
var i = 0
490495
while (i < requiredPartOrdinal.size) {
491496
// TODO Avoids boxing cost here!
@@ -494,8 +499,14 @@ private[sql] case class ParquetRelation2(
494499
i += 1
495500
}
496501
row
497-
498-
case (_, row: Row) =>
502+
}
503+
} else {
504+
// Create a mutable row since we need to fill in values from partition columns.
505+
val mutableRow = new GenericMutableRow(requestedSchema.size)
506+
iterator.map { pair =>
507+
// We are using CatalystGroupConverter and it returns a GenericRow.
508+
// Since GenericRow is not mutable, we just cast it to a Row.
509+
val row = pair._2.asInstanceOf[Row]
499510
var i = 0
500511
while (i < row.size) {
501512
// TODO Avoids boxing cost here!
@@ -511,6 +522,7 @@ private[sql] case class ParquetRelation2(
511522
i += 1
512523
}
513524
mutableRow
525+
}
514526
}
515527
}
516528
} else {

0 commit comments

Comments
 (0)