Skip to content

Commit 8f8d8a2

Browse files
gatorsmiledavies
authored andcommitted
[SPARK-13609] [SQL] Support Column Pruning for MapPartitions
#### What changes were proposed in this pull request? This PR is to prune unnecessary columns when the operator is `MapPartitions`. The solution is to add an extra `Project` in the child node. For the other two operators `AppendColumns` and `MapGroups`, it sounds doable. More discussions are required. The major reason is the current implementation of the `inputPlan` of `groupBy` is based on the child of `AppendColumns`. It might be a bug? Thus, will submit a separate PR. #### How was this patch tested? Added a test case in ColumnPruningSuite to verify the rule. Added another test case in DatasetSuite.scala to verify the data. Author: gatorsmile <gatorsmile@gmail.com> Closes #11460 from gatorsmile/datasetPruningNew.
1 parent d8afd45 commit 8f8d8a2

File tree

3 files changed

+28
-2
lines changed

3 files changed

+28
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,10 @@ object ColumnPruning extends Rule[LogicalPlan] {
331331
}.unzip._1
332332
}
333333
a.copy(child = Expand(newProjects, newOutput, grandChild))
334-
// TODO: support some logical plan for Dataset
334+
335+
// Prunes the unused columns from child of MapPartitions
336+
case mp @ MapPartitions(_, _, _, child) if (child.outputSet -- mp.references).nonEmpty =>
337+
mp.copy(child = prunedChild(child, mp.references))
335338

336339
// Prunes the unused columns from child of Aggregate/Window/Expand/Generate
337340
case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717

1818
package org.apache.spark.sql.catalyst.optimizer
1919

20+
import scala.reflect.runtime.universe.TypeTag
21+
2022
import org.apache.spark.sql.catalyst.analysis
2123
import org.apache.spark.sql.catalyst.dsl.expressions._
2224
import org.apache.spark.sql.catalyst.dsl.plans._
25+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2326
import org.apache.spark.sql.catalyst.expressions.{Ascending, Explode, Literal, SortOrder}
2427
import org.apache.spark.sql.catalyst.plans.PlanTest
2528
import org.apache.spark.sql.catalyst.plans.logical._
@@ -249,5 +252,16 @@ class ColumnPruningSuite extends PlanTest {
249252
comparePlans(Optimize.execute(query), expected)
250253
}
251254

255+
implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]()
256+
private val func = identity[Iterator[OtherTuple]] _
257+
258+
test("Column pruning on MapPartitions") {
259+
val input = LocalRelation('_1.int, '_2.int, 'c.int)
260+
val plan1 = MapPartitions(func, input)
261+
val correctAnswer1 =
262+
MapPartitions(func, Project(Seq('_1, '_2), input)).analyze
263+
comparePlans(Optimize.execute(plan1.analyze), correctAnswer1)
264+
}
265+
252266
// todo: add more tests for column pruning
253267
}

sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
113113
("a", 2), ("b", 3), ("c", 4))
114114
}
115115

116-
test("map with type change") {
116+
test("map with type change with the exact matched number of attributes") {
117117
val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
118118

119119
checkAnswer(
@@ -123,6 +123,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
123123
OtherTuple("a", 1), OtherTuple("b", 2), OtherTuple("c", 3))
124124
}
125125

126+
test("map with type change with less attributes") {
127+
val ds = Seq(("a", 1, 3), ("b", 2, 4), ("c", 3, 5)).toDS()
128+
129+
checkAnswer(
130+
ds.as[OtherTuple]
131+
.map(identity[OtherTuple]),
132+
OtherTuple("a", 1), OtherTuple("b", 2), OtherTuple("c", 3))
133+
}
134+
126135
test("map and group by with class data") {
127136
// We inject a group by here to make sure this test case is future proof
128137
// when we implement better pipelining and local execution mode.

0 commit comments

Comments
 (0)