Skip to content

[SQL] Improve column pruning in the optimizer. #378

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,56 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
Batch("Filter Pushdown", Once,
CombineFilters,
PushPredicateThroughProject,
PushPredicateThroughInnerJoin) :: Nil
PushPredicateThroughInnerJoin,
ColumnPruning) :: Nil
}

/**
* Attempts to eliminate the reading of unneeded columns from the query plan using the following
* transformations:
*
* - Inserting Projections beneath the following operators:
* - Aggregate
* - Project <- Join
* - Collapse adjacent projections, performing alias substitution.
*/
object ColumnPruning extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
// Project away references that are not needed to calculate the required aggregates.
a.copy(child = Project(a.references.toSeq, child))

case Project(projectList, Join(left, right, joinType, condition)) =>
// Collect the list of off references required either above or to evaluate the condition.
val allReferences: Set[Attribute] =
projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)
/** Applies a projection when the child is producing unnecessary attributes */
def prunedChild(c: LogicalPlan) =
if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) {
Project(allReferences.filter(c.outputSet.contains).toSeq, c)
} else {
c
}

Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))

case Project(projectList1, Project(projectList2, child)) =>
// Create a map of Aliases to their values from the child projection.
// e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).
val aliasMap = projectList2.collect {
case a @ Alias(e, _) => (a.toAttribute: Expression, a)
}.toMap

// Substitute any attributes that are produced by the child projection, so that we safely
// eliminate it.
// e.g., 'SELECT c + 1 FROM (SELECT a + b AS C ...' produces 'SELECT a + b + 1 ...'
// TODO: Fix TransformBase to avoid the cast below.
val substitutedProjection = projectList1.map(_.transform {
case a if aliasMap.contains(a) => aliasMap(a)
}).asInstanceOf[Seq[NamedExpression]]

Project(substitutedProjection, child)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ case class Aggregate(
extends UnaryNode {

def output = aggregateExpressions.map(_.toAttribute)
def references = child.references
def references = (groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet
}

case class Limit(limit: Expression, child: LogicalPlan) extends UnaryNode {
Expand Down