Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ object CTESubstitution extends Rule[LogicalPlan] {
if (cteDefs.isEmpty) {
substituted
} else if (substituted eq lastSubstituted.get) {
WithCTE(substituted, cteDefs.sortBy(_.id).toSeq)
WithCTE(substituted, cteDefs.toSeq)
} else {
var done = false
substituted.resolveOperatorsWithPruning(_ => !done) {
case p if p eq lastSubstituted.get =>
done = true
WithCTE(p, cteDefs.sortBy(_.id).toSeq)
WithCTE(p, cteDefs.toSeq)
}
}
}
Expand Down Expand Up @@ -200,7 +200,6 @@ object CTESubstitution extends Rule[LogicalPlan] {
cteDefs: mutable.ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = {
val resolvedCTERelations = new mutable.ArrayBuffer[(String, CTERelationDef)](relations.size)
for ((name, relation) <- relations) {
val lastCTEDefCount = cteDefs.length
val innerCTEResolved = if (isLegacy) {
// In legacy mode, outer CTE relations take precedence. Here we don't resolve the inner
// `With` nodes, later we will substitute `UnresolvedRelation`s with outer CTE relations.
Expand All @@ -209,33 +208,8 @@ object CTESubstitution extends Rule[LogicalPlan] {
} else {
// A CTE definition might contain an inner CTE that has a higher priority, so traverse and
// substitute CTE defined in `relation` first.
// NOTE: we must call `traverseAndSubstituteCTE` before `substituteCTE`, as the relations
// in the inner CTE have higher priority over the relations in the outer CTE when resolving
// inner CTE relations. For example:
// WITH t1 AS (SELECT 1)
// t2 AS (
// WITH t1 AS (SELECT 2)
// WITH t3 AS (SELECT * FROM t1)
// )
// t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`.
traverseAndSubstituteCTE(relation, isCommand, cteDefs)._1
}

if (cteDefs.length > lastCTEDefCount) {
// We have added more CTE relations to the `cteDefs` from the inner CTE, and these relations
// should also be substituted with `resolvedCTERelations` as inner CTE relation can refer to
// outer CTE relation. For example:
// WITH t1 AS (SELECT 1)
// t2 AS (
// WITH t3 AS (SELECT * FROM t1)
// )
for (i <- lastCTEDefCount until cteDefs.length) {
val substituted =
substituteCTE(cteDefs(i).child, isLegacy || isCommand, resolvedCTERelations.toSeq)
cteDefs(i) = cteDefs(i).copy(child = substituted)
}
}

// CTE definition can reference a previous one
val substituted =
substituteCTE(innerCTEResolved, isLegacy || isCommand, resolvedCTERelations.toSeq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, DecorrelateInnerQuery, InlineCTE}
import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, DecorrelateInnerQuery}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils}
Expand Down Expand Up @@ -90,10 +90,8 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {

def checkAnalysis(plan: LogicalPlan): Unit = {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures. Inline all CTEs in the plan to help check
// query plan structures in subqueries.
val inlineCTE = InlineCTE(alwaysInline = true)
inlineCTE(plan).foreachUp {
// of the result of cascading resolution failures.
plan.foreachUp {

case p if p.analyzed => // Skip already analyzed sub-plans

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,26 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{CTE, PLAN_EXPRESSION}

/**
* Inlines CTE definitions into corresponding references if either of the conditions satisfies:
* 1. The CTE definition does not contain any non-deterministic expressions or contains attribute
* references to an outer query. If this CTE definition references another CTE definition that
* has non-deterministic expressions, it is still OK to inline the current CTE definition.
* 1. The CTE definition does not contain any non-deterministic expressions. If this CTE
* definition references another CTE definition that has non-deterministic expressions, it
* is still OK to inline the current CTE definition.
* 2. The CTE definition is only referenced once throughout the main query and all the subqueries.
*
* CTE definitions that appear in subqueries and are not inlined will be pulled up to the main
* query level.
*
* @param alwaysInline if true, inline all CTEs in the query plan.
* In addition, due to the complexity of correlated subqueries, all CTE references in correlated
* subqueries are inlined regardless of the conditions above.
*/
case class InlineCTE(alwaysInline: Boolean = false) extends Rule[LogicalPlan] {

object InlineCTE extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
if (!plan.isInstanceOf[Subquery] && plan.containsPattern(CTE)) {
val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int)]
buildCTEMap(plan, cteMap)
val notInlined = mutable.ArrayBuffer.empty[CTERelationDef]
val inlined = inlineCTE(plan, cteMap, notInlined)
// CTEs in SQL Commands have been inlined by `CTESubstitution` already, so it is safe to add
// WithCTE as top node here.
if (notInlined.isEmpty) {
inlined
} else {
WithCTE(inlined, notInlined.toSeq)
}
inlineCTE(plan, cteMap, forceInline = false)
} else {
plan
}
}

private def shouldInline(cteDef: CTERelationDef, refCount: Int): Boolean = alwaysInline || {
private def shouldInline(cteDef: CTERelationDef, refCount: Int): Boolean = {
// We do not need to check enclosed `CTERelationRef`s for `deterministic` or `OuterReference`,
// because:
// 1) It is fine to inline a CTE if it references another CTE that is non-deterministic;
Expand Down Expand Up @@ -104,24 +93,25 @@ case class InlineCTE(alwaysInline: Boolean = false) extends Rule[LogicalPlan] {
private def inlineCTE(
plan: LogicalPlan,
cteMap: mutable.HashMap[Long, (CTERelationDef, Int)],
notInlined: mutable.ArrayBuffer[CTERelationDef]): LogicalPlan = {
plan match {
forceInline: Boolean): LogicalPlan = {
val (stripped, notInlined) = plan match {
case WithCTE(child, cteDefs) =>
val notInlined = mutable.ArrayBuffer.empty[CTERelationDef]
cteDefs.foreach { cteDef =>
val (cte, refCount) = cteMap(cteDef.id)
if (refCount > 0) {
val inlined = cte.copy(child = inlineCTE(cte.child, cteMap, notInlined))
val inlined = cte.copy(child = inlineCTE(cte.child, cteMap, forceInline))
cteMap.update(cteDef.id, (inlined, refCount))
if (!shouldInline(inlined, refCount)) {
if (!forceInline && !shouldInline(inlined, refCount)) {
notInlined.append(inlined)
}
}
}
inlineCTE(child, cteMap, notInlined)
(inlineCTE(child, cteMap, forceInline), notInlined.toSeq)

case ref: CTERelationRef =>
val (cteDef, refCount) = cteMap(ref.cteId)
if (shouldInline(cteDef, refCount)) {
val newRef = if (forceInline || shouldInline(cteDef, refCount)) {
if (ref.outputSet == cteDef.outputSet) {
cteDef.child
} else {
Expand All @@ -135,16 +125,24 @@ case class InlineCTE(alwaysInline: Boolean = false) extends Rule[LogicalPlan] {
} else {
ref
}
(newRef, Seq.empty)

case _ if plan.containsPattern(CTE) =>
plan
.withNewChildren(plan.children.map(child => inlineCTE(child, cteMap, notInlined)))
val newPlan = plan
.withNewChildren(plan.children.map(child => inlineCTE(child, cteMap, forceInline)))
.transformExpressionsWithPruning(_.containsAllPatterns(PLAN_EXPRESSION, CTE)) {
case e: SubqueryExpression =>
e.withNewPlan(inlineCTE(e.plan, cteMap, notInlined))
e.withNewPlan(inlineCTE(e.plan, cteMap, forceInline = e.isCorrelated))
}
(newPlan, Seq.empty)

case _ => plan
case _ => (plan, Seq.empty)
}

if (notInlined.isEmpty) {
stripped
} else {
WithCTE(stripped, notInlined)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
OptimizeUpdateFields,
SimplifyExtractValueOps,
OptimizeCsvJsonExprs,
CombineConcats,
PushdownPredicatesAndPruneColumnsForCTEDef) ++
CombineConcats) ++
extendedOperatorOptimizationRules

val operatorOptimizationBatch: Seq[Batch] = {
Expand All @@ -146,7 +145,21 @@ abstract class Optimizer(catalogManager: CatalogManager)
}

val batches = (Batch("Eliminate Distinct", Once, EliminateDistinct) ::
Batch("Finish Analysis", Once, FinishAnalysis) ::
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
// we do not eliminate subqueries or compute current time in the analyzer.
Batch("Finish Analysis", Once,
EliminateResolvedHint,
EliminateSubqueryAliases,
EliminateView,
InlineCTE,
ReplaceExpressions,
RewriteNonCorrelatedExists,
PullOutGroupingExpressions,
ComputeCurrentTime,
ReplaceCurrentLike(catalogManager),
SpecialDatetimeValues) ::
//////////////////////////////////////////////////////////////////////////////////////////
// Optimizer rules start here
//////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -155,8 +168,6 @@ abstract class Optimizer(catalogManager: CatalogManager)
// extra operators between two adjacent Union operators.
// - Call CombineUnions again in Batch("Operator Optimizations"),
// since the other rules might make two separate Unions operators adjacent.
Batch("Inline CTE", Once,
InlineCTE()) ::
Batch("Union", Once,
RemoveNoopOperators,
CombineUnions,
Expand Down Expand Up @@ -193,7 +204,6 @@ abstract class Optimizer(catalogManager: CatalogManager)
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions) :: Nil ++
operatorOptimizationBatch) :+
Batch("Clean Up Temporary CTE Info", Once, CleanUpTempCTEInfo) :+
// This batch rewrites plans after the operator optimization and
// before any batches that depend on stats.
Batch("Pre CBO Rules", Once, preCBORules: _*) :+
Expand Down Expand Up @@ -250,7 +260,14 @@ abstract class Optimizer(catalogManager: CatalogManager)
* (defaultBatches - (excludedRules - nonExcludableRules)).
*/
def nonExcludableRules: Seq[String] =
FinishAnalysis.ruleName ::
EliminateDistinct.ruleName ::
EliminateResolvedHint.ruleName ::
EliminateSubqueryAliases.ruleName ::
EliminateView.ruleName ::
ReplaceExpressions.ruleName ::
ComputeCurrentTime.ruleName ::
SpecialDatetimeValues.ruleName ::
ReplaceCurrentLike(catalogManager).ruleName ::
RewriteDistinctAggregates.ruleName ::
ReplaceDeduplicateWithAggregate.ruleName ::
ReplaceIntersectWithSemiJoin.ruleName ::
Expand All @@ -264,37 +281,9 @@ abstract class Optimizer(catalogManager: CatalogManager)
RewritePredicateSubquery.ruleName ::
NormalizeFloatingNumbers.ruleName ::
ReplaceUpdateFieldsExpression.ruleName ::
PullOutGroupingExpressions.ruleName ::
RewriteLateralSubquery.ruleName :: Nil

/**
* Apply finish-analysis rules for the entire plan including all subqueries.
*/
object FinishAnalysis extends Rule[LogicalPlan] {
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
// we do not eliminate subqueries or compute current time in the analyzer.
private val rules = Seq(
EliminateResolvedHint,
EliminateSubqueryAliases,
EliminateView,
ReplaceExpressions,
RewriteNonCorrelatedExists,
PullOutGroupingExpressions,
ComputeCurrentTime,
ReplaceCurrentLike(catalogManager),
SpecialDatetimeValues)

override def apply(plan: LogicalPlan): LogicalPlan = {
rules.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case s: SubqueryExpression =>
val Subquery(newPlan, _) = apply(Subquery.fromExpression(s))
s.withNewPlan(newPlan)
}
}
}

/**
* Optimize all the subqueries inside expression.
*/
Expand Down
Loading