@@ -33,7 +33,47 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
33
33
Batch (" Filter Pushdown" , Once ,
34
34
CombineFilters ,
35
35
PushPredicateThroughProject ,
36
- PushPredicateThroughInnerJoin ) :: Nil
36
+ PushPredicateThroughInnerJoin ,
37
+ ColumnPruning ) :: Nil
38
+ }
39
+
40
+ /**
41
+ * Attempts to eliminate the reading of unneeded columns from the query plan using the following
42
+ * transformations:
43
+ *
44
+ * - Inserting Projections beneath the following operators:
45
+ * - Aggregate
46
+ * - Project <- Join
47
+ * - Collapse adjacent projections, performing alias substitution.
48
+ */
49
+ object ColumnPruning extends Rule [LogicalPlan ] {
50
+ def apply (plan : LogicalPlan ): LogicalPlan = plan transform {
51
+ case a @ Aggregate (_, _, child) if (child.outputSet -- a.references).nonEmpty =>
52
+ a.copy(child = Project (a.references.toSeq, child))
53
+
54
+ case Project (projectList, Join (left, right, joinType, condition)) =>
55
+ val allReferences : Set [Attribute ] =
56
+ projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set .empty)
57
+ def prunedChild (c : LogicalPlan ) =
58
+ if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) {
59
+ Project (allReferences.filter(c.outputSet.contains).toSeq, c)
60
+ } else {
61
+ c
62
+ }
63
+
64
+ Project (projectList, Join (prunedChild(left), prunedChild(right), joinType, condition))
65
+
66
+ case Project (project1, Project (project2, child)) =>
67
+ val aliasMap = project2.collect {
68
+ case a @ Alias (e, _) => (a.toAttribute: Expression , a)
69
+ }.toMap
70
+ // TODO: Fix TransformBase.
71
+ val substitutedProjection = project1.map(_.transform {
72
+ case a if aliasMap.contains(a) => aliasMap(a)
73
+ }).asInstanceOf [Seq [NamedExpression ]]
74
+
75
+ Project (substitutedProjection, child)
76
+ }
37
77
}
38
78
39
79
/**
0 commit comments