Skip to content

[SPARK-37670][SQL] Support predicate pushdown and column pruning for de-duped CTEs #34929

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ object CTESubstitution extends Rule[LogicalPlan] {
if (cteDefs.isEmpty) {
substituted
} else if (substituted eq lastSubstituted.get) {
WithCTE(substituted, cteDefs.toSeq)
WithCTE(substituted, cteDefs.sortBy(_.id).toSeq)
} else {
var done = false
substituted.resolveOperatorsWithPruning(_ => !done) {
case p if p eq lastSubstituted.get =>
done = true
WithCTE(p, cteDefs.toSeq)
WithCTE(p, cteDefs.sortBy(_.id).toSeq)
}
}
}
Expand Down Expand Up @@ -203,6 +203,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
cteDefs: mutable.ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = {
val resolvedCTERelations = new mutable.ArrayBuffer[(String, CTERelationDef)](relations.size)
for ((name, relation) <- relations) {
val lastCTEDefCount = cteDefs.length
val innerCTEResolved = if (isLegacy) {
// In legacy mode, outer CTE relations take precedence. Here we don't resolve the inner
// `With` nodes, later we will substitute `UnresolvedRelation`s with outer CTE relations.
Expand All @@ -211,8 +212,33 @@ object CTESubstitution extends Rule[LogicalPlan] {
} else {
// A CTE definition might contain an inner CTE that has a higher priority, so traverse and
// substitute CTE defined in `relation` first.
// NOTE: we must call `traverseAndSubstituteCTE` before `substituteCTE`, as the relations
// in the inner CTE have higher priority over the relations in the outer CTE when resolving
// inner CTE relations. For example:
// WITH t1 AS (SELECT 1)
// t2 AS (
// WITH t1 AS (SELECT 2)
// WITH t3 AS (SELECT * FROM t1)
// )
// t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`.
traverseAndSubstituteCTE(relation, isCommand, cteDefs)._1
}

if (cteDefs.length > lastCTEDefCount) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider #36146 as an alternative to substituting and changing accumulated cteDefs so far.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @peter-toth , this PR contains all the CTE bug fixes we have found so far internally. Can you rebase #36146 after this one gets merged if you think your fix is cleaner? thanks!

Copy link
Contributor

@peter-toth peter-toth Apr 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can rebase my #36146, no problem with that.

But I'm more concerned about #32298. This PR seems to contain a mix of improvements and bugfixes and changes a lot in CTE handling and conflicts with my PR. As mine is on the 3.3 whitelist do you think we can merge that first and rebase this on that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR will be merged to 3.2 because of the fixes for the correctness bugs and performance regressions. It doesn't really improve the performance compared to Spark 3.1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But can we merge it after #32298?

Copy link
Contributor

@cloud-fan cloud-fan Apr 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#32298 won't go to 3.2, right? If we really need a different CTE handling in master/3.3 for the merging scalar subqueries feature, we should still merge this PR first and make a followup PR to change CTE in master/3.3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I though you just made a typo and want to merge it 3.3+ only and maybe backport the bugfix parts to 3.2...

// We have added more CTE relations to the `cteDefs` from the inner CTE, and these relations
// should also be substituted with `resolvedCTERelations` as inner CTE relation can refer to
// outer CTE relation. For example:
// WITH t1 AS (SELECT 1)
// t2 AS (
// WITH t3 AS (SELECT * FROM t1)
// )
for (i <- lastCTEDefCount until cteDefs.length) {
val substituted =
substituteCTE(cteDefs(i).child, isLegacy || isCommand, resolvedCTERelations.toSeq)
cteDefs(i) = cteDefs(i).copy(child = substituted)
}
}

// CTE definition can reference a previous one
val substituted =
substituteCTE(innerCTEResolved, isLegacy || isCommand, resolvedCTERelations.toSeq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, PercentileCont}
import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, DecorrelateInnerQuery}
import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, DecorrelateInnerQuery, InlineCTE}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
Expand Down Expand Up @@ -93,8 +93,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {

def checkAnalysis(plan: LogicalPlan): Unit = {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
plan.foreachUp {
// of the result of cascading resolution failures. Inline all CTEs in the plan to help check
// query plan structures in subqueries.
val inlineCTE = InlineCTE(alwaysInline = true)
inlineCTE(plan).foreachUp {

case p if p.analyzed => // Skip already analyzed sub-plans

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,37 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{CTE, PLAN_EXPRESSION}

/**
* Inlines CTE definitions into corresponding references if either of the conditions satisfies:
* 1. The CTE definition does not contain any non-deterministic expressions. If this CTE
* definition references another CTE definition that has non-deterministic expressions, it
* is still OK to inline the current CTE definition.
* 1. The CTE definition does not contain any non-deterministic expressions or contains attribute
* references to an outer query. If this CTE definition references another CTE definition that
* has non-deterministic expressions, it is still OK to inline the current CTE definition.
* 2. The CTE definition is only referenced once throughout the main query and all the subqueries.
*
* In addition, due to the complexity of correlated subqueries, all CTE references in correlated
* subqueries are inlined regardless of the conditions above.
* CTE definitions that appear in subqueries and are not inlined will be pulled up to the main
* query level.
*
* @param alwaysInline if true, inline all CTEs in the query plan.
*/
object InlineCTE extends Rule[LogicalPlan] {
case class InlineCTE(alwaysInline: Boolean = false) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = {
if (!plan.isInstanceOf[Subquery] && plan.containsPattern(CTE)) {
val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int)]
buildCTEMap(plan, cteMap)
inlineCTE(plan, cteMap, forceInline = false)
val notInlined = mutable.ArrayBuffer.empty[CTERelationDef]
val inlined = inlineCTE(plan, cteMap, notInlined)
// CTEs in SQL Commands have been inlined by `CTESubstitution` already, so it is safe to add
// WithCTE as top node here.
if (notInlined.isEmpty) {
inlined
} else {
WithCTE(inlined, notInlined.toSeq)
}
} else {
plan
}
}

private def shouldInline(cteDef: CTERelationDef, refCount: Int): Boolean = {
private def shouldInline(cteDef: CTERelationDef, refCount: Int): Boolean = alwaysInline || {
// We do not need to check enclosed `CTERelationRef`s for `deterministic` or `OuterReference`,
// because:
// 1) It is fine to inline a CTE if it references another CTE that is non-deterministic;
Expand Down Expand Up @@ -93,25 +104,24 @@ object InlineCTE extends Rule[LogicalPlan] {
private def inlineCTE(
plan: LogicalPlan,
cteMap: mutable.HashMap[Long, (CTERelationDef, Int)],
forceInline: Boolean): LogicalPlan = {
val (stripped, notInlined) = plan match {
notInlined: mutable.ArrayBuffer[CTERelationDef]): LogicalPlan = {
plan match {
case WithCTE(child, cteDefs) =>
val notInlined = mutable.ArrayBuffer.empty[CTERelationDef]
cteDefs.foreach { cteDef =>
val (cte, refCount) = cteMap(cteDef.id)
if (refCount > 0) {
val inlined = cte.copy(child = inlineCTE(cte.child, cteMap, forceInline))
val inlined = cte.copy(child = inlineCTE(cte.child, cteMap, notInlined))
cteMap.update(cteDef.id, (inlined, refCount))
if (!forceInline && !shouldInline(inlined, refCount)) {
if (!shouldInline(inlined, refCount)) {
notInlined.append(inlined)
}
}
}
(inlineCTE(child, cteMap, forceInline), notInlined.toSeq)
inlineCTE(child, cteMap, notInlined)

case ref: CTERelationRef =>
val (cteDef, refCount) = cteMap(ref.cteId)
val newRef = if (forceInline || shouldInline(cteDef, refCount)) {
if (shouldInline(cteDef, refCount)) {
if (ref.outputSet == cteDef.outputSet) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we compare output set, not output? is it possible that the ref and def have different output column order?

cteDef.child
} else {
Expand All @@ -125,24 +135,16 @@ object InlineCTE extends Rule[LogicalPlan] {
} else {
ref
}
(newRef, Seq.empty)

case _ if plan.containsPattern(CTE) =>
val newPlan = plan
.withNewChildren(plan.children.map(child => inlineCTE(child, cteMap, forceInline)))
plan
.withNewChildren(plan.children.map(child => inlineCTE(child, cteMap, notInlined)))
.transformExpressionsWithPruning(_.containsAllPatterns(PLAN_EXPRESSION, CTE)) {
case e: SubqueryExpression =>
e.withNewPlan(inlineCTE(e.plan, cteMap, forceInline = e.isCorrelated))
e.withNewPlan(inlineCTE(e.plan, cteMap, notInlined))
}
(newPlan, Seq.empty)

case _ => (plan, Seq.empty)
}

if (notInlined.isEmpty) {
stripped
} else {
WithCTE(stripped, notInlined)
case _ => plan
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
OptimizeUpdateFields,
SimplifyExtractValueOps,
OptimizeCsvJsonExprs,
CombineConcats) ++
CombineConcats,
PushdownPredicatesAndPruneColumnsForCTEDef) ++
extendedOperatorOptimizationRules

val operatorOptimizationBatch: Seq[Batch] = {
Expand All @@ -147,22 +148,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
}

val batches = (
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
// we do not eliminate subqueries or compute current time in the analyzer.
Batch("Finish Analysis", Once,
EliminateResolvedHint,
EliminateSubqueryAliases,
EliminateView,
InlineCTE,
ReplaceExpressions,
RewriteNonCorrelatedExists,
PullOutGroupingExpressions,
ComputeCurrentTime,
ReplaceCurrentLike(catalogManager),
SpecialDatetimeValues,
RewriteAsOfJoin) ::
Batch("Finish Analysis", Once, FinishAnalysis) ::
//////////////////////////////////////////////////////////////////////////////////////////
// Optimizer rules start here
//////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -172,6 +158,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
// extra operators between two adjacent Union operators.
// - Call CombineUnions again in Batch("Operator Optimizations"),
// since the other rules might make two separate Unions operators adjacent.
Batch("Inline CTE", Once,
InlineCTE()) ::
Batch("Union", Once,
RemoveNoopOperators,
CombineUnions,
Expand Down Expand Up @@ -208,6 +196,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions) :: Nil ++
operatorOptimizationBatch) :+
Batch("Clean Up Temporary CTE Info", Once, CleanUpTempCTEInfo) :+
// This batch rewrites plans after the operator optimization and
// before any batches that depend on stats.
Batch("Pre CBO Rules", Once, preCBORules: _*) :+
Expand Down Expand Up @@ -266,14 +255,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
* (defaultBatches - (excludedRules - nonExcludableRules)).
*/
def nonExcludableRules: Seq[String] =
EliminateDistinct.ruleName ::
EliminateResolvedHint.ruleName ::
EliminateSubqueryAliases.ruleName ::
EliminateView.ruleName ::
ReplaceExpressions.ruleName ::
ComputeCurrentTime.ruleName ::
SpecialDatetimeValues.ruleName ::
ReplaceCurrentLike(catalogManager).ruleName ::
FinishAnalysis.ruleName ::
RewriteDistinctAggregates.ruleName ::
ReplaceDeduplicateWithAggregate.ruleName ::
ReplaceIntersectWithSemiJoin.ruleName ::
Expand All @@ -287,10 +269,38 @@ abstract class Optimizer(catalogManager: CatalogManager)
RewritePredicateSubquery.ruleName ::
NormalizeFloatingNumbers.ruleName ::
ReplaceUpdateFieldsExpression.ruleName ::
PullOutGroupingExpressions.ruleName ::
RewriteAsOfJoin.ruleName ::
RewriteLateralSubquery.ruleName :: Nil

/**
* Apply finish-analysis rules for the entire plan including all subqueries.
*/
object FinishAnalysis extends Rule[LogicalPlan] {
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
// we do not eliminate subqueries or compute current time in the analyzer.
private val rules = Seq(
EliminateResolvedHint,
EliminateSubqueryAliases,
EliminateView,
ReplaceExpressions,
RewriteNonCorrelatedExists,
PullOutGroupingExpressions,
ComputeCurrentTime,
ReplaceCurrentLike(catalogManager),
SpecialDatetimeValues,
RewriteAsOfJoin)

override def apply(plan: LogicalPlan): LogicalPlan = {
rules.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case s: SubqueryExpression =>
val Subquery(newPlan, _) = apply(Subquery.fromExpression(s))
s.withNewPlan(newPlan)
}
}
}

/**
* Optimize all the subqueries inside expression.
*/
Expand Down
Loading