-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-34079][SQL] Merge non-correlated scalar subqueries #32298
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-34079][SQL] Merge non-correlated scalar subqueries #32298
Conversation
…column scalar subqueries for better reuse
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #137809 has finished for PR 32298 at commit
|
…079-multi-column-scalar-subquery # Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
…079-multi-column-scalar-subquery # Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
Kubernetes integration test unable to build dist. exiting with code: 1 |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #137860 has finished for PR 32298 at commit
|
Test build #137861 has finished for PR 32298 at commit
|
…079-multi-column-scalar-subquery
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138112 has finished for PR 32298 at commit
|
cc @cloud-fan, @maropu, @viirya, @wangyum Any feedback is welcome. |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
Outdated
Show resolved
Hide resolved
case (np, ep: Project) => | ||
mergePlans(np, ep.child).map { case (mergedChild, outputMap) => | ||
Project(distinctExpressions(ep.projectList ++ outputMap.values), mergedChild) -> outputMap | ||
} | ||
case (np: Project, ep) => | ||
mergePlans(np.child, ep).map { case (mergedChild, outputMap) => | ||
val newProjectList = replaceAttributes(np.projectList, outputMap) | ||
val newOutputMap = createOutputMap(np.projectList, newProjectList) | ||
Project(distinctExpressions(ep.output ++ newProjectList), mergedChild) -> newOutputMap | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which kind of queries does this case handle? (this PR already has any test for this code path?) Is it safe to accept any plan node if the other side is Project
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, could we use a white-list here to check if it can merge plans or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A good example is in the description of this PR:
SELECT
(SELECT avg(a) FROM t GROUP BY b),
(SELECT sum(b) FROM t GROUP BY b)
Where we have an additional Project [b#240]
in the second subquery due to column pruning:
Project [scalar-subquery#231 [] AS scalarsubquery()#241, scalar-subquery#232 [] AS scalarsubquery()#242L]
: :- Aggregate [b#234], [avg(a#233) AS avg(a)#236]
: : +- Relation default.t[a#233,b#234] parquet
: +- Aggregate [b#240], [sum(b#240) AS sum(b)#238L]
: +- Project [b#240]
: +- Relation default.t[a#239,b#240] parquet
+- OneRowRelation
and this rule can merge the 2 queries as:
Project [multi-scalar-subquery#231.avg(a) AS scalarsubquery()#241, multi-scalar-subquery#232.sum(b) AS scalarsubquery()#242L]
: :- Aggregate [b#234], [avg(a#233) AS avg(a)#236, sum(b#234) AS sum(b)#238L]
: : +- Project [a#233, b#234]
: : +- Relation default.t[a#233,b#234] parquet
: +- Aggregate [b#234], [avg(a#233) AS avg(a)#236, sum(b#234) AS sum(b)#238L]
: +- Project [a#233, b#234]
: +- Relation default.t[a#233,b#234] parquet
+- OneRowRelation
IMO the above 2 cases are safe and just handle the cases when there is an extra Project
node in one of the plans but the child plan under the Project
and the plan of the other side are mergeable. In these cases the merged plan should contain the Project
node but it should also contain the output of the other side transparently.
I will add this test case to SubquerySuite
.
@@ -1353,6 +1353,14 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val SCALAR_SUBQUERY_MERGE_ENABLED = | |||
buildConf("spark.sql.scalarSubqueyMerge.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.sql.optimizer.scalarSubqueyMerging.enabled
(or spark.sql.optimizer.mergeScalarSubqueries.enabled
) instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to describe more, e.g., To enable this feature,spark.sql.execution.reuseSubquery
needs to be true, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm no longer sure we need this flag actually as we can disable this rule from optimizer with spark.sql.optimizer.excludedRules
too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't removed this flag yet, but I still think it is not needed.
sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
Outdated
Show resolved
Hide resolved
val newOutputMap = createOutputMap(np.projectList, newProjectList) | ||
Project(distinctExpressions(ep.output ++ newProjectList), mergedChild) -> newOutputMap | ||
} | ||
case (np: Aggregate, ep: Aggregate) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can always assume merging two (or more) aggregates makes performance better? For example, we have two aggregates in a plan, one side is a hash-aggregate and the other side is an object hash-aggregate. In this case, the merged plan node seems to be an object-hash aggregate. If this is true, this rewrite can easily cause high memory pressure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, since this rewrite itself is not an optimization, but a pre-process to reuse sub-queries, so it might be better to implement this logic inside the ReuseSubquery
side, and merge them if physical plans are the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can always assume merging two (or more) aggregates makes performance better? For example, we have two aggregates in a plan, one side is a hash-aggregate and the other side is an object hash-aggregate. In this case, the merged plan node seems to be an object-hash aggregate. If this is true, this rewrite can easily cause high memory pressure.
Thanks, this is a very good question, let me look into this...
IMHO, since this rewrite itself is not an optimization, but a pre-process to reuse sub-queries, so it might be better to implement this logic inside the ReuseSubquery side, and merge them if physical plans are the same.
The reason why I implemented this feature as an Optimizer
rule is that merging LogicalPlans
seems much easier than merging physical ones. The example in the description has the following physical subquery plans:
*(1) Project [Subquery scalar-subquery#231, [id=#110] AS scalarsubquery()#241, Subquery scalar-subquery#232, [id=#132] AS scalarsubquery()#242L]
: :- Subquery scalar-subquery#231, [id=#110]
: : +- *(2) HashAggregate(keys=[b#234], functions=[avg(a#233)], output=[avg(a)#236])
: : +- Exchange hashpartitioning(b#234, 5), ENSURE_REQUIREMENTS, [id=#106]
: : +- *(1) HashAggregate(keys=[b#234], functions=[partial_avg(a#233)], output=[b#234, sum#247, count#248L])
: : +- *(1) ColumnarToRow
: : +- FileScan parquet default.t[a#233,b#234] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/petertoth/git/apache/spark/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int>
: +- Subquery scalar-subquery#232, [id=#132]
: +- *(2) HashAggregate(keys=[b#240], functions=[sum(b#240)], output=[sum(b)#238L])
: +- Exchange hashpartitioning(b#240, 5), ENSURE_REQUIREMENTS, [id=#128]
: +- *(1) HashAggregate(keys=[b#240], functions=[partial_sum(b#240)], output=[b#240, sum#250L])
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t[b#240] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/petertoth/git/apache/spark/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<b:int>
Merging these 2 physical subqueries would require much more complex mergePlans()
function that can handle Exchange
and Scan
nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed the hash/objecthash/sorted aggregate merge issue in 2828345 and I also added a test.
val newGroupingExpression = replaceAttributes(np.groupingExpressions, outputMap) | ||
if (ExpressionSet(newGroupingExpression) == ExpressionSet(ep.groupingExpressions)) { | ||
val newAggregateExpressions = replaceAttributes(np.aggregateExpressions, outputMap) | ||
val newOutputMap = createOutputMap(np.aggregateExpressions, newAggregateExpressions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the tests added in this PR, it seems this map always has the same exprId mapping, e.g., sum(a)#3
-> sum(a)#3
. So, could you add more tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, added a new test in 6134fa9 to cover this.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala
Outdated
Show resolved
Hide resolved
/** | ||
* A subquery that is capable to return multiple scalar values. | ||
*/ | ||
case class MultiScalarSubquery( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have to create a new subquery expression? It seems like we can just use CreateNamedStruct
in ScalarSubquery.plan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dropped 'MultiScalarSubquery` in 1f2f75c, will change the docs and the PR description soon.
* : : +- Relation default.t[a#233,b#234] parquet | ||
* : +- Aggregate [b#240], [sum(b#240) AS sum(b)#238L] | ||
* : +- Project [b#240] | ||
* : +- Relation default.t[a#239,b#240] parquet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to rewrite them into a single scalar subquery using struct, e.g.,
SELECT
(SELECT avg(a) FROM t ),
(SELECT sum(b) FROM t )
FROM R
=>
SELECT st.avg_a, st.sum_b
FROM (
SELECT (SELECT STRUCT(avg(a) AS avg_a, sum(b) AS sum_b) FROM t) AS st
FROM R
)
This way,
- (1) you don't need to rely on ReuseExchangeAndSubquery;
- (2) the rewrite can then work for any subqueries, regardless of uncorrelated v.s. correlated, reading from tables v.s. reading from an array column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that is smart rewrite way but let's check a more complex example: E.g. can we rewrite
SELECT *
FROM r
JOIN r2 ON r2.x = r.x
WHERE r.y = (SELECT sum(b) FROM t) AND r2.y = (SELECT avg(b) FROM t)
? Maybe
SELECT *
FROM (
SELECT (
SELECT STRUCT(sum(b) AS sum_b, avg(b) AS avg_b) FROM t) AS st, x, y
FROM r
) AS r
)
JOIN r2 ON r2.x = r.x
WHERE r.y = r.st.sum_b AND r2.y = r.st.avg_b
? Does this work with outer joins? And isn't this more complex than the reuse way in this PR?
I was also thinking about "whole plan subquery merge" (similar to my "whole plan reuse" suggestion: #28885) where subqueries at "different level" could be merged (and reused) as a possible improvement to this PR in the future.
BTW, the ReuseExchangeAndSubquery
rule you mentioned is suggested in my "whole plan reuse" PR, which got stuck a bit due to lack of reviews. Do you also have a similar rule in production or you just saw my PR? If you have some time, any feedback is appreciated there as well. :)
I didn't check how correlated subqueries could benefit from rewriting the query (this PR focuses on uncorrelated ones), but I think at this point in the optimizer those have been transformed to joins.
Can you please elaborate on the "reading from tables v.s. reading from an array column" part?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And isn't this more complex than the reuse way in this PR?
IIUC, I think this PR's complexity is in extra dependencies. The proposed rule augments two subqueries, makes them look identical, and hopes (a) column-pruning doesn't prune too aggressively and (b) physical de-dup could dedup them. In case (a) changes later and the two aggregate trees are not deduped in the physical plan, there could potentially be regressions -- each aggregation then becomes more expensive.
Therefore, I think there're two directions that you can go:
- (1) Add a top plan node that contains the main plan tree as well as CommonSubplan trees. For each merged subquery, put it into a CommonSubplan. In a main plan's node, it can reference a CommonSubplan and converts its output relation into a scalar value. This approach only works for uncorrelated cases.
- (2) Only extract common subqueries within the same logical node, and put merged scalar subqueries into a Project node below the logical node. It may be less ambitious than what you proposed, but it at least does not have extra dependencies. This approach works for both correlated and uncorrelated cases.
Can you please elaborate on the "reading from tables v.s. reading from an array column" part?
This is an example:
SELECT y
FROM LATERAL VIEW explode(ARRAY(ARRAY(1), ARRAY(1, 2), ARRAY(1, 2, 3))) AS y
WHERE
( SELECT COUNT(*) FROM LATERAL VIEW explode(y) AS element ) > 1
AND
( SELECT SUM(element) FROM LATERAL VIEW explode(y) AS element ) > 3
I noticed that such subqueries do not work for now. But they align with the language spec and has well defined semantics. Once we support them, we want your proposed rule to be able to speedup them as well.
? Does this work with outer joins?
I don't think outer joins are relevant to this problem:
- if we go with (1), it doesn't matter;
- if we go with (2), pushing projection expressions through join/outer join is separate topic/rule. A subquery expression doesn't have much difference from an ordinary expression in this regard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The proposed rule augments two subqueries, makes them look identical, and hopes (a) column-pruning doesn't prune too aggressively and (b) physical de-dup could dedup them. In case (a) changes later and the two aggregate trees are not deduped in the physical plan, there could potentially be regressions -- each aggregation then becomes more expensive.
In this PR the new MergeScalarSubqueries
rule runs in a separate batch after column pruning, close to the end of optimization. This is by design to make sure no subsequent rule changes the structure of different instances of a merged subquery plan at different places in the logical plan differently. So the physical planing creates the same physical plan for these instances and there shouldn't be any dedup issues.
Update: I need to recheck this part as the current PR might not do what I wanted to.
I think probably the downside of my current PR is that the physical planning of merged subqueries happen multiple times (as many times as they they appear in the logical plan) and physical dedup comes only after that. This could be improved if we had subquery references in logical plan as well (something like ReuseSubqueryExec
). But I think that's what your (1) is about. Move the merged subqueries to a special top logical plan node and add subquery references at places where they are actually used.
SELECT y
FROM LATERAL VIEW explode(ARRAY(ARRAY(1), ARRAY(1, 2), ARRAY(1, 2, 3))) AS y
WHERE
( SELECT COUNT(*) FROM LATERAL VIEW explode(y) AS element ) > 1
AND
( SELECT SUM(element) FROM LATERAL VIEW explode(y) AS element ) > 3
I noticed that such subqueries do not work for now. But they align with the language spec and has well defined semantics. Once we support them, we want your proposed rule to be able to speedup them as well.
Ah ok, but what should be the optimized plan of that query? This looks like we have 2 correlated subqueries and (2) makes perfect sense to merge them. But I don't think we need lateral views, just take the following query:
SELECT
(SELECT avg(a) FROM t WHERE t.a = outer.a),
(SELECT sum(b) FROM t WHERE t.a = outer.a)
FROM t AS outer
which is
Project [scalar-subquery#231 [a#233] AS scalarsubquery(a)#243, scalar-subquery#232 [a#233] AS scalarsubquery(a)#244L]
: :- Aggregate [avg(a#239) AS avg(a)#236]
: : +- Filter (a#239 = outer(a#233))
: : +- SubqueryAlias spark_catalog.default.t
: : +- Relation default.t[a#239,b#240] parquet
: +- Aggregate [sum(b#242) AS sum(b)#238L]
: +- Filter (a#241 = outer(a#233))
: +- SubqueryAlias spark_catalog.default.t
: +- Relation default.t[a#241,b#242] parquet
+- SubqueryAlias outer
+- SubqueryAlias spark_catalog.default.t
+- Relation default.t[a#233,b#234] parquet
/
Project [avg(a)#236 AS scalarsubquery(a)#243, sum(b)#238L AS scalarsubquery(a)#244L]
+- Join LeftOuter, (a#241 = a#233)
:- Project [a#233, avg(a)#236]
: +- Join LeftOuter, (a#239 = a#233)
: :- Project [a#233]
: : +- Relation default.t[a#233,b#234] parquet
: +- Aggregate [a#239], [avg(a#239) AS avg(a)#236, a#239]
: +- Project [a#239]
: +- Filter isnotnull(a#239)
: +- Relation default.t[a#239,b#240] parquet
+- Aggregate [a#241], [sum(b#242) AS sum(b)#238L, a#241]
+- Filter isnotnull(a#241)
+- Relation default.t[a#241,b#242] parquet
now, and this PR doesn't help at all, but it could be optimized using your (2).
I wonder the following steps (tickets/PRs) would make sense:
- Finish this PR and support only non-correlated subqueries. Mainly focus on merging plans and keep the physical reuse dependency for simplicity. This supports subquery merging within a plan regardless they are in the same logical node.
- Add a performance improvement to 1. so as to physical plan a merged subquery only once. This is your (1) basically. Move the merged subqueries to a top node and introduce subquery references in logical plan.
- Add support for correlated subqueries using your (2). As you mentioned this will only support subqueries within the same logical node.
Probably we should implement separate rules for 1. + 2. and 3. but the plan merging logic can be common.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR the new MergeScalarSubqueries rule runs in a separate batch
after column pruning, close to the end of optimization.
This is by design to make sure no subsequent rule changes the structure
I don't see how we can guarantee that. I think the hidden, inter-rule dependency can add complexities for future development and maintenance.
For instance, someone could implement a new Strategy that internally calls ColumnPruning after exploring one logical plan alternative. By the time such a Strategy is implemented, the authors wouldn't be aware of the fact that ColumnPruning should not be called after MergeScalarSubqueries.
- First of all, such issues can hardly be detected by the author's new unit/query tests, as an effective test has to have both effective patterns to trigger the Strategy and MergeScalarSubqueries. However, such a case can happen in prod traffic;
- Second, if they do find that's an issue, they would then have to either (a) add some hacks in the Aggregate to mark that MergeScalarSubqueries has been applied and hence ColumnPruning should not go through it, or (b) re-implement MergeScalarSubqueries per my proposal (1).
I'm wondering whether we can pursue (2) for now, if it meets your need. It's less ambitious but may address most of your issue? If you indeed have to extract subqueries over the entire tree, I don't have a clean approach in mind other than (1).
Add a performance improvement to 1. so as to physical plan a merged
subquery only once. This is your (1) basically.
The performance was not my initial concern, but rather, I think we'd better make MergeScalarSubqueries self-contained and does not depend on an assumption that could later be changed.
But I don't think we need lateral views
Sub-querying over arrays are important use cases, for which we don't want to de-correlate. In this case, a subquery is more like an ordinary expression and should be evaluated within the operator node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't that mean that (2) also assumes that
no subsequent transformation changes the 2 instances differently and
ReuseSubquery does the dedup?
The difference is that transformation (2) is a self-contained transformation that results in a better plan, while this PR's MergeScalarSubqueries is not. If for some reason ReuseSubquery does not trigger, applying (2) does not make the plan worse than not applying (2). However, iiuc, in the same situation, MergeScalarSubqueries would make the plan worse because each aggregate pipeline runs additional computations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If for some reason ReuseSubquery does not trigger, applying (2) does not make the plan worse than not applying (2).
In the above example the final optimized plan of (2) is the very same as with this PR. There are 2 aggregates in both subqueries so without dedup both (2) and this PR could cause regressions.
I agree that (2) is self-contained and this PR is not, but IMO it looks like there are inter-rule dependencies currently in Spark (like PushDownPredicates
relies on ReuseSubquery
) that overall doesn't make (2) safer than this PR.
I think this means that your (1) suggestion is probably the right approach and we need to move common non-correlated subqueries to a top node and reference to them in logical plan.
I also think that (2) is a good improvement for correlated subqueries, but I would pursue (1) in this PR first and maybe (2) in a separate one. Does this sound acceptable?
@cloud-fan, @maropu do you have any thoughts on this topic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would pursue (1) in this PR first and maybe (2) in a separate one. Does this sound acceptable?
Yeah, that sounds great. Thanks a lot, @peter-toth!
There are 2 aggregates in both subqueries so without dedup both (2) and this PR could cause regressions.
IIUC, I think it sounds like an existing bug (or missing feature) for struct subfield pruning, which could be blocking (2) but is orthogonal to (2). For instance, if I write your example join query manually, I'd expect the struct subfield pruning to happen to the struct constructor, regardless of the existence of subqueries.
I've never seen such transformations in SparkStrategys.
It's not uncommon in exploration Strategies such as index selection, common subplan dedup etc., when we substitute the subtree of a tree node T with another subtree (from somewhere else in the plan or a different access path) that may contain unneeded columns for T. Spark doesn't have those strategies for now, but I'll not be surprised if some contributors add them down the road.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sigmod. I will try to update the PR by end of this week.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I got a bit distracted. But updated the PR to follow your (1) suggestion now. Please let me know your thoughts.
…079-multi-column-scalar-subquery # Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -663,11 +663,13 @@ case class UnresolvedWith( | |||
* predicates that have been pushed down into `child`. This is | |||
* a temporary field used by optimization rules for CTE predicate | |||
* pushdown to help ensure rule idempotency. | |||
* @param mergedScalarSubquery If this definition is a merged scalar subquery. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to clarify what this flag means to CTE relation, instead of linking to another feature. how about
@param underSubquery If true, it means we don't need to add a shuffle for this CTE relation as subquery reuse will be applied to reuse CTE relation output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, fixed in 19128ff.
I finished rebasing this PR on the top of #34929. @cloud-fan, @sigmod, please take a look at this PR again and let me know if you have any comments. |
thanks, merging to master/3.3! |
### What changes were proposed in this pull request? This PR adds a new optimizer rule `MergeScalarSubqueries` to merge multiple non-correlated `ScalarSubquery`s to compute multiple scalar values once. E.g. the following query: ``` SELECT (SELECT avg(a) FROM t), (SELECT sum(b) FROM t) ``` is optimized from: ``` == Optimized Logical Plan == Project [scalar-subquery#242 [] AS scalarsubquery()#253, scalar-subquery#243 [] AS scalarsubquery()#254L] : :- Aggregate [avg(a#244) AS avg(a)#247] : : +- Project [a#244] : : +- Relation default.t[a#244,b#245] parquet : +- Aggregate [sum(a#251) AS sum(a)#250L] : +- Project [a#251] : +- Relation default.t[a#251,b#252] parquet +- OneRowRelation ``` to: ``` == Optimized Logical Plan == Project [scalar-subquery#242 [].avg(a) AS scalarsubquery()#253, scalar-subquery#243 [].sum(a) AS scalarsubquery()#254L] : :- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] : : +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L] : : +- Project [a#244] : : +- Relation default.t[a#244,b#245] parquet : +- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] : +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L] : +- Project [a#244] : +- Relation default.t[a#244,b#245] parquet +- OneRowRelation ``` and in the physical plan subqueries are reused: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(1) Project [Subquery subquery#242, [id=#113].avg(a) AS scalarsubquery()#253, ReusedSubquery Subquery subquery#242, [id=#113].sum(a) AS scalarsubquery()#254L] : :- Subquery subquery#242, [id=#113] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] +- *(2) HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#158] +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L]) +- *(1) ColumnarToRow +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int> +- == Initial Plan == Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260] +- HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#110] +- HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L]) +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int> : +- ReusedSubquery Subquery subquery#242, [id=#113] +- *(1) Scan OneRowRelation[] +- == Initial Plan == ... ``` Please note that the above simple example could be easily optimized into a common select expression without reuse node, but this PR can handle more complex queries as well. ### Why are the changes needed? Performance improvement. ``` [info] TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] q9 - MergeScalarSubqueries off 50798 52521 1423 0.0 Infinity 1.0X [info] q9 - MergeScalarSubqueries on 19484 19675 226 0.0 Infinity 2.6X [info] TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] q9b - MergeScalarSubqueries off 15430 17803 NaN 0.0 Infinity 1.0X [info] q9b - MergeScalarSubqueries on 3862 4002 196 0.0 Infinity 4.0X ``` Please find `q9b` in the description of SPARK-34079. It is a variant of [q9.sql](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q9.sql) using CTE. The performance improvement in case of `q9` comes from merging 15 subqueries into 5 and in case of `q9b` it comes from merging 5 subqueries into 1. ### Does this PR introduce _any_ user-facing change? No. But this optimization can be disabled with `spark.sql.optimizer.excludedRules` config. ### How was this patch tested? Existing and new UTs. Closes #32298 from peter-toth/SPARK-34079-multi-column-scalar-subquery. Lead-authored-by: Peter Toth <peter.toth@gmail.com> Co-authored-by: attilapiros <piros.attila.zsolt@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit e00b81e) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Thanks @cloud-fan, @sigmod, @tgravescs, @attilapiros, @maropu, @zinking for the review! |
// `MergeScalarSubqueries` can duplicate subqueries in the optimized plan, but the subqueries will | ||
// be reused in the physical plan. | ||
def getNumBloomFilters(plan: LogicalPlan, scalarSubqueryCTEMultiplicator: Int = 1): Integer = { | ||
print(plan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I missed this one, we should remove the print.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ops, sorry. I will open a follow-up PR right now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened #36354.
### What changes were proposed in this pull request? To remove debug logging accidentally left in code after #32298. ### Why are the changes needed? No need for that logging. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #36354 from peter-toth/SPARK-34079-multi-column-scalar-subquery-follow-up. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit f24c9b0) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? To remove debug logging accidentally left in code after #32298. ### Why are the changes needed? No need for that logging. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #36354 from peter-toth/SPARK-34079-multi-column-scalar-subquery-follow-up. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
assert(bfAgg.numBitsExpression.isInstanceOf[Literal]) | ||
1 | ||
}.sum | ||
// `MergeScalarSubqueries` can duplicate subqueries in the optimized plan, but the subqueries will |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we should run MergeScalarSubqueries
before InjectRuntimeFilter
, so that we don't need to change this test suite. cc @peter-toth @somani
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But, isn't it beneficial that bloom filter subqueries can be merged? Here was an example: https://github.com/apache/spark/pull/32298/files/2590edf74f78096249b31dffae6ff94f293fc78d#r834760177
Actually, after #34929 the final optimized plan will not contain any WithCTE
nodes (the subqueries will be under the Filter
node) so we can revert this change, but I think the new scalarSubqueryCTEMultiplicator
param in the final assert is still needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened #36361.
…terSuite ### What changes were proposed in this pull request? To remove unnecessary changes from `InjectRuntimeFilterSuite` after apache#32298. These are not needed after apache#34929 as the final optimized plan does'n contain any `WithCTE` nodes. ### Why are the changes needed? No need for those changes. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added new test. Closes apache#36361 from peter-toth/SPARK-34079-multi-column-scalar-subquery-follow-up-2. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…terSuite To remove unnecessary changes from `InjectRuntimeFilterSuite` after #32298. These are not needed after #34929 as the final optimized plan does'n contain any `WithCTE` nodes. No need for those changes. No. Added new test. Closes #36361 from peter-toth/SPARK-34079-multi-column-scalar-subquery-follow-up-2. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit d05e01d) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? Unfortunately #32298 introduced a regression from Spark 3.2 to 3.3 as after that change a merged subquery can contain multiple distict type aggregates. Those aggregates need to be rewritten by the `RewriteDistinctAggregates` rule to get the correct results. This PR fixed that. ### Why are the changes needed? The following query: ``` SELECT (SELECT count(distinct c1) FROM t1), (SELECT count(distinct c2) FROM t1) ``` currently fails with: ``` java.lang.IllegalStateException: You hit a query analyzer bug. Please report your query to Spark user mailing list. at org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:538) ``` but works again after this PR. ### Does this PR introduce _any_ user-facing change? Yes, the above query works again. ### How was this patch tested? Added new UT. Closes #39887 from peter-toth/SPARK-42346-rewrite-distinct-aggregates-after-subquery-merge. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request? Unfortunately #32298 introduced a regression from Spark 3.2 to 3.3 as after that change a merged subquery can contain multiple distict type aggregates. Those aggregates need to be rewritten by the `RewriteDistinctAggregates` rule to get the correct results. This PR fixed that. ### Why are the changes needed? The following query: ``` SELECT (SELECT count(distinct c1) FROM t1), (SELECT count(distinct c2) FROM t1) ``` currently fails with: ``` java.lang.IllegalStateException: You hit a query analyzer bug. Please report your query to Spark user mailing list. at org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:538) ``` but works again after this PR. ### Does this PR introduce _any_ user-facing change? Yes, the above query works again. ### How was this patch tested? Added new UT. Closes #39887 from peter-toth/SPARK-42346-rewrite-distinct-aggregates-after-subquery-merge. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Yuming Wang <yumwang@ebay.com> (cherry picked from commit 5940b98) Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request? Unfortunately #32298 introduced a regression from Spark 3.2 to 3.3 as after that change a merged subquery can contain multiple distict type aggregates. Those aggregates need to be rewritten by the `RewriteDistinctAggregates` rule to get the correct results. This PR fixed that. ### Why are the changes needed? The following query: ``` SELECT (SELECT count(distinct c1) FROM t1), (SELECT count(distinct c2) FROM t1) ``` currently fails with: ``` java.lang.IllegalStateException: You hit a query analyzer bug. Please report your query to Spark user mailing list. at org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:538) ``` but works again after this PR. ### Does this PR introduce _any_ user-facing change? Yes, the above query works again. ### How was this patch tested? Added new UT. Closes #39887 from peter-toth/SPARK-42346-rewrite-distinct-aggregates-after-subquery-merge. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Yuming Wang <yumwang@ebay.com> (cherry picked from commit 5940b98) Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request? Unfortunately apache#32298 introduced a regression from Spark 3.2 to 3.3 as after that change a merged subquery can contain multiple distict type aggregates. Those aggregates need to be rewritten by the `RewriteDistinctAggregates` rule to get the correct results. This PR fixed that. ### Why are the changes needed? The following query: ``` SELECT (SELECT count(distinct c1) FROM t1), (SELECT count(distinct c2) FROM t1) ``` currently fails with: ``` java.lang.IllegalStateException: You hit a query analyzer bug. Please report your query to Spark user mailing list. at org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:538) ``` but works again after this PR. ### Does this PR introduce _any_ user-facing change? Yes, the above query works again. ### How was this patch tested? Added new UT. Closes apache#39887 from peter-toth/SPARK-42346-rewrite-distinct-aggregates-after-subquery-merge. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Yuming Wang <yumwang@ebay.com> (cherry picked from commit 5940b98) Signed-off-by: Yuming Wang <yumwang@ebay.com>
…nt filters This is a WIP version of apache#37630 at commit 83c59ab5e7e2abfaf83abe7ec418f30a5c7a41ea, but we introduce the `spark.cloudera.sql.advancedSubqueryMerge.enabled` (default true) to disable the feature if needed. After apache#32298 we were able to merge scalar subquery plans. This PR is a follow-up improvement to the merging logic to be able to combine `Filter` nodes with different conditions if those conditions can be merged in an ancestor `Aggregate` node. Consider the following query with 2 subqueries: ``` SELECT (SELECT avg(a) FROM t WHERE c = 1) (SELECT sum(a) FROM t WHERE c = 2) ``` where the subqueries can be merged to: ``` SELECT avg(a) FILTER (WHERE c = 1), sum(b) FILTER (WHERE c = 2) FORM t WHERE c = 1 OR c = 2 ``` After this PR the 2 subqueries are merged to this optimized form: ``` == Optimized Logical Plan == Project [scalar-subquery#260 [].avg(a) AS scalarsubquery()apache#277, scalar-subquery#261 [].sum(b) AS scalarsubquery()#278L] : :- Project [named_struct(avg(a), avg(a)apache#268, sum(b), sum(b)#271L) AS mergedValue#286] : : +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)apache#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L] : : +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285] : : +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2))) : : +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet : +- Project [named_struct(avg(a), avg(a)apache#268, sum(b), sum(b)#271L) AS mergedValue#286] : +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)apache#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L] : +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285] : +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2))) : +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet +- OneRowRelation ``` and physical form: ``` == Physical Plan == *(1) Project [Subquery scalar-subquery#260, [id=apache#148].avg(a) AS scalarsubquery()apache#277, ReusedSubquery Subquery scalar-subquery#260, [id=apache#148].sum(b) AS scalarsubquery()#278L] : :- Subquery scalar-subquery#260, [id=apache#148] : : +- *(2) Project [named_struct(avg(a), avg(a)apache#268, sum(b), sum(b)#271L) AS mergedValue#286] : : +- *(2) HashAggregate(keys=[], functions=[avg(a#264), sum(b#265)], output=[avg(a)apache#268, sum(b)#271L]) : : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=143] : : +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#264) FILTER (WHERE propagatedFilter#285), partial_sum(b#265) FILTER (WHERE propagatedFilter#284)], output=[sum#288, count#289L, sum#290L]) : : +- *(1) Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285] : : +- *(1) Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2))) : : +- *(1) ColumnarToRow : : +- FileScan parquet spark_catalog.default.t[a#264,b#265,c#266] Batched: true, DataFilters: [((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [Or(And(IsNotNull(c),EqualTo(c,1)),And(IsNotNull(c),EqualTo(c,2)))], ReadSchema: struct<a:int,b:int,c:int> : +- ReusedSubquery Subquery scalar-subquery#260, [id=apache#148] +- *(1) Scan OneRowRelation[] ``` The optimization in this PR doesn't kick in if the filters (`c = 1`, `c = 2`) are partition or bucket filters (to avoid possible performance degradation), but allows merging pushed-down data filters depending on a new `spark.sql.planMerge.ignorePushedDataFilters` config value (default `true`). Performance improvement. ``` [info] TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] q9 - Merge different filters off 9526 9634 97 0.0 244257993.6 1.0X [info] q9 - Merge different filters on 3798 3881 133 0.0 97381735.1 2.5X ``` The performance improvement in case of `q9` comes from merging 15 subqueries into 1 subquery (apache#32298 was able to merge 15 subqueries into 5). No. Existing and new UTs. Change-Id: Ibeab5772549660ed217707f9b7cdac39491bf096
What changes were proposed in this pull request?
This PR adds a new optimizer rule
MergeScalarSubqueries
to merge multiple non-correlatedScalarSubquery
s to compute multiple scalar values once.E.g. the following query:
is optimized from:
to:
and in the physical plan subqueries are reused:
Please note that the above simple example could be easily optimized into a common select expression without reuse node, but this PR can handle more complex queries as well.
Why are the changes needed?
Performance improvement.
Please find
q9b
in the description of SPARK-34079. It is a variant of q9.sql using CTE.The performance improvement in case of
q9
comes from merging 15 subqueries into 5 and in case ofq9b
it comes from merging 5 subqueries into 1.Does this PR introduce any user-facing change?
No. But this optimization can be disabled with
spark.sql.optimizer.excludedRules
config.How was this patch tested?
Existing and new UTs.