-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-37670][SQL] Support predicate pushdown and column pruning for de-duped CTEs #34929
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
386a427
pred push down
maryannxue 8642220
fix id conflict
maryannxue 645d3ac
fix InlineCTE
maryannxue 392d748
add more tests
maryannxue 841aa2a
fix compilation
maryannxue d248f57
Merge remote-tracking branch 'upstream/master' into cte-followup
maryannxue 74e73c9
CTE ID conflict in union of Dataframes leads to wrong results
maryannxue fcd32ef
CTE definitions out of original order when not inlined
maryannxue 80c7712
Inline all CTEs before CheckAnalysis
maryannxue f84dda4
address review comments
maryannxue be33b47
inner CTE relation can refer outer CTE relations
maryannxue 3602589
Merge remote-tracking branch 'origin/master' into help
cloud-fan 2a55dbf
update golden files
cloud-fan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,26 +28,37 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{CTE, PLAN_EXPRESSION} | |
|
||
/** | ||
* Inlines CTE definitions into corresponding references if either of the conditions satisfies: | ||
* 1. The CTE definition does not contain any non-deterministic expressions. If this CTE | ||
* definition references another CTE definition that has non-deterministic expressions, it | ||
* is still OK to inline the current CTE definition. | ||
* 1. The CTE definition does not contain any non-deterministic expressions or contains attribute | ||
* references to an outer query. If this CTE definition references another CTE definition that | ||
* has non-deterministic expressions, it is still OK to inline the current CTE definition. | ||
* 2. The CTE definition is only referenced once throughout the main query and all the subqueries. | ||
* | ||
* In addition, due to the complexity of correlated subqueries, all CTE references in correlated | ||
* subqueries are inlined regardless of the conditions above. | ||
* CTE definitions that appear in subqueries and are not inlined will be pulled up to the main | ||
* query level. | ||
* | ||
* @param alwaysInline if true, inline all CTEs in the query plan. | ||
*/ | ||
object InlineCTE extends Rule[LogicalPlan] { | ||
case class InlineCTE(alwaysInline: Boolean = false) extends Rule[LogicalPlan] { | ||
|
||
override def apply(plan: LogicalPlan): LogicalPlan = { | ||
if (!plan.isInstanceOf[Subquery] && plan.containsPattern(CTE)) { | ||
val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int)] | ||
buildCTEMap(plan, cteMap) | ||
inlineCTE(plan, cteMap, forceInline = false) | ||
val notInlined = mutable.ArrayBuffer.empty[CTERelationDef] | ||
val inlined = inlineCTE(plan, cteMap, notInlined) | ||
// CTEs in SQL Commands have been inlined by `CTESubstitution` already, so it is safe to add | ||
// WithCTE as top node here. | ||
if (notInlined.isEmpty) { | ||
inlined | ||
} else { | ||
WithCTE(inlined, notInlined.toSeq) | ||
} | ||
} else { | ||
plan | ||
} | ||
} | ||
|
||
private def shouldInline(cteDef: CTERelationDef, refCount: Int): Boolean = { | ||
private def shouldInline(cteDef: CTERelationDef, refCount: Int): Boolean = alwaysInline || { | ||
// We do not need to check enclosed `CTERelationRef`s for `deterministic` or `OuterReference`, | ||
// because: | ||
// 1) It is fine to inline a CTE if it references another CTE that is non-deterministic; | ||
|
@@ -93,25 +104,24 @@ object InlineCTE extends Rule[LogicalPlan] { | |
private def inlineCTE( | ||
plan: LogicalPlan, | ||
cteMap: mutable.HashMap[Long, (CTERelationDef, Int)], | ||
forceInline: Boolean): LogicalPlan = { | ||
val (stripped, notInlined) = plan match { | ||
notInlined: mutable.ArrayBuffer[CTERelationDef]): LogicalPlan = { | ||
plan match { | ||
case WithCTE(child, cteDefs) => | ||
val notInlined = mutable.ArrayBuffer.empty[CTERelationDef] | ||
cteDefs.foreach { cteDef => | ||
val (cte, refCount) = cteMap(cteDef.id) | ||
if (refCount > 0) { | ||
val inlined = cte.copy(child = inlineCTE(cte.child, cteMap, forceInline)) | ||
val inlined = cte.copy(child = inlineCTE(cte.child, cteMap, notInlined)) | ||
cteMap.update(cteDef.id, (inlined, refCount)) | ||
if (!forceInline && !shouldInline(inlined, refCount)) { | ||
if (!shouldInline(inlined, refCount)) { | ||
notInlined.append(inlined) | ||
} | ||
} | ||
} | ||
(inlineCTE(child, cteMap, forceInline), notInlined.toSeq) | ||
inlineCTE(child, cteMap, notInlined) | ||
|
||
case ref: CTERelationRef => | ||
val (cteDef, refCount) = cteMap(ref.cteId) | ||
val newRef = if (forceInline || shouldInline(cteDef, refCount)) { | ||
if (shouldInline(cteDef, refCount)) { | ||
if (ref.outputSet == cteDef.outputSet) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we compare output set, not output? is it possible that the ref and def have different output column order? |
||
cteDef.child | ||
} else { | ||
|
@@ -125,24 +135,16 @@ object InlineCTE extends Rule[LogicalPlan] { | |
} else { | ||
ref | ||
} | ||
(newRef, Seq.empty) | ||
|
||
case _ if plan.containsPattern(CTE) => | ||
val newPlan = plan | ||
.withNewChildren(plan.children.map(child => inlineCTE(child, cteMap, forceInline))) | ||
plan | ||
.withNewChildren(plan.children.map(child => inlineCTE(child, cteMap, notInlined))) | ||
.transformExpressionsWithPruning(_.containsAllPatterns(PLAN_EXPRESSION, CTE)) { | ||
case e: SubqueryExpression => | ||
e.withNewPlan(inlineCTE(e.plan, cteMap, forceInline = e.isCorrelated)) | ||
e.withNewPlan(inlineCTE(e.plan, cteMap, notInlined)) | ||
} | ||
(newPlan, Seq.empty) | ||
|
||
case _ => (plan, Seq.empty) | ||
} | ||
|
||
if (notInlined.isEmpty) { | ||
stripped | ||
} else { | ||
WithCTE(stripped, notInlined) | ||
case _ => plan | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider #36146 as an alternative to substituting and changing accumulated
cteDefs
so far.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @peter-toth , this PR contains all the CTE bug fixes we have found so far internally. Can you rebase #36146 after this one gets merged if you think your fix is cleaner? thanks!
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can rebase my #36146, no problem with that.
But I'm more concerned about #32298. This PR seems to contain a mix of improvements and bugfixes and changes a lot in CTE handling and conflicts with my PR. As mine is on the 3.3 whitelist do you think we can merge that first and rebase this on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR will be merged to 3.2 because of the fixes for the correctness bugs and performance regressions. It doesn't really improve the performance compared to Spark 3.1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But can we merge it after #32298?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#32298 won't go to 3.2, right? If we really need a different CTE handling in master/3.3 for the merging scalar subqueries feature, we should still merge this PR first and make a followup PR to change CTE in master/3.3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I though you just made a typo and want to merge it 3.3+ only and maybe backport the bugfix parts to 3.2...