Skip to content

Commit 02e3a68

Browse files
committed
add more comments
1 parent 306cc84 commit 02e3a68

File tree

1 file changed

+136
-32
lines changed

1 file changed

+136
-32
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/merge/MergeScalarSubqueries.scala

Lines changed: 136 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
251251
// - the merged plan,
252252
// - the attribute mapping from the new to the merged version,
253253
// - optional filters of both plans that need to be propagated and merged in an ancestor
254-
// `Aggregate` node if possible.
254+
// `Aggregate` node if possible.
255255
//
256256
// Please note that merging arbitrary plans can be complicated, the current version supports only
257257
// some of the most important nodes.
@@ -348,24 +348,130 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
348348
case _ => None
349349
}
350350

351-
// If `Filter`s are not exactly the same we can still try propagating up their differing
352-
// condition because in some cases we will be able to merge them in an `Aggregate` parent
353-
// node.
354-
// E.g.:
355-
// SELECT avg(a) FROM t WHERE c = 1
351+
// If `Filter` conditions are not exactly the same we can still try propagating up their
352+
// differing condition because in some cases we will be able to merge them in an `Aggregate`
353+
// parent node. E.g. we can merge:
354+
//
355+
// SELECT avg(a) FROM t WHERE c = 1
356+
//
356357
// and:
357-
// SELECT sum(b) FROM t WHERE c = 2
358-
// can be merged to:
359-
// SELECT namedStruct(
360-
// 'a', avg(a) FILTER (WHERE c = 1),
361-
// 'b', sum(b) FILTER (WHERE c = 2)) AS mergedValue
358+
//
359+
// SELECT sum(b) FROM t WHERE c = 2
360+
//
361+
// into:
362+
//
363+
// SELECT
364+
// avg(a) FILTER (WHERE c = 1),
365+
// sum(b) FILTER (WHERE c = 2)
362366
// FORM t
363367
// WHERE c = 1 OR c = 2
364368
//
365-
// Please note that depending on where the different `Filter`s reside in the plan and on
366-
// which column the predicates are defined, we need to check the physical plan to make sure
367-
// if `c` is not a partitioning or bucketing column and `c` is not present in pushed down
368-
// filters. Otherwise the merged query can suffer performance degradation.
369+
// But there are some sp2cial cases we need to consider:
370+
//
371+
// - The plans to be merged might contain multiple adjacent `Filter` nodes and the parent
372+
// `Filter` nodes should incorporate the propagated filters from child ones during merge.
373+
//
374+
// E.g. adjacent filters can appear in plans when some of the optimization rules (like
375+
// `PushDownPredicates`) are disabled.
376+
//
377+
// Let's consider we want to merge query 1:
378+
//
379+
// SELECT avg(a)
380+
// FROM (
381+
// SELECT * FROM t WHERE c1 = 1
382+
// ) t
383+
// WHERE c2 = 1
384+
//
385+
// and query 2:
386+
//
387+
// SELECT sum(b)
388+
// FROM (
389+
// SELECT * FROM t WHERE c1 = 2
390+
// ) t
391+
// WHERE c2 = 2
392+
//
393+
// then the optimal merged query is:
394+
//
395+
// SELECT
396+
// avg(a) FILTER (WHERE c2 = 1 AND c1 = 1),
397+
// sum(b) FILTER (WHERE c2 = 2 AND c1 = 2)
398+
// FORM (
399+
// SELECT * FROM t WHERE c1 = 1 OR c1 = 2
400+
// ) t
401+
// WHERE (c2 = 1 AND c1 = 1) OR (c2 = 2 AND c1 = 2)
402+
//
403+
// This is because the `WHERE (c2 = 1 AND c1 = 1) OR (c2 = 2 AND c1 = 2)` parent `Filter`
404+
// condition is more selective than a simple `WHERE c2 = 1 OR c2 = 2` would be as the
405+
// simple condition would let trough rows containing c1 = 1 and c2 = 2, which none of the
406+
// original queries do.
407+
//
408+
// - When we are merging plans to already merged plans the propagated filter conditions
409+
// could grow quickly, which we can avoid with tagging the already propagated filters.
410+
//
411+
// E.g. if we merged the previous optimal merged query and query 3:
412+
//
413+
// SELECT max(b)
414+
// FROM (
415+
// SELECT * FROM t WHERE c1 = 3
416+
// ) t
417+
// WHERE c2 = 3
418+
//
419+
// then a new double-merged query would look like this:
420+
//
421+
// SELECT
422+
// avg(a) FILTER (WHERE
423+
// (c2 = 1 AND c1 = 1) AND
424+
// ((c2 = 1 AND c1 = 1) OR (c2 = 2 AND c1 = 2) AND (c1 = 1 OR c1 = 2))
425+
// ),
426+
// sum(b) FILTER (WHERE
427+
// (c2 = 2 AND c1 = 2) AND
428+
// ((c2 = 1 AND c1 = 1) OR (c2 = 2 AND c1 = 2) AND (c1 = 1 OR c1 = 2))
429+
// ),
430+
// max(b) FILTER (WHERE c2 = 3 AND c1 = 3)
431+
// FORM (
432+
// SELECT * FROM t WHERE (c1 = 1 OR c1 = 2) OR c1 = 3
433+
// ) t
434+
// WHERE
435+
// ((c2 = 1 AND c1 = 1) OR (c2 = 2 AND c1 = 2) AND (c1 = 1 OR c1 = 2)) OR
436+
// (c2 = 3 AND c1 = 3)
437+
//
438+
// which is not optimal and contains unnecessary complex conditions.
439+
//
440+
// Please note that `BooleanSimplification` and other rules could help simplifying filter
441+
// conditions, but when we merge large number if queries in this rule, the plan size can
442+
// increase exponentially and can cause memory issues before `BooleanSimplification` could
443+
// run.
444+
//
445+
// But we can avoid that complexity if we tag already propagated filter conditions with a
446+
// simple `PropagatedFilter` wrapper during merge.
447+
// E.g. the actual merged query of query 1 and query 2 produced by this rule looks like
448+
// this:
449+
//
450+
// SELECT
451+
// avg(a) FILTER (WHERE c2 = 1 AND c1 = 1),
452+
// sum(b) FILTER (WHERE c2 = 2 AND c1 = 2)
453+
// FORM (
454+
// SELECT * FROM t WHERE PropagatedFilter(c1 = 1 OR c1 = 2)
455+
// ) t
456+
// WHERE PropagatedFilter((c2 = 1 AND c1 = 1) OR (c2 = 2 AND c1 = 2))
457+
//
458+
// And so when we merge query 3 we know that filter conditions tagged with
459+
// `PropagatedFilter` can be ignored during filter propagation and thus the we get a much
460+
// simpler double-merged query:
461+
//
462+
// SELECT
463+
// avg(a) FILTER (WHERE c2 = 1 AND c1 = 1),
464+
// sum(b) FILTER (WHERE c2 = 2 AND c1 = 2),
465+
// max(b) FILTER (WHERE c2 = 3 AND c1 = 3)
466+
// FORM (
467+
// SELECT * FROM t WHERE PropagatedFilter(PropagatedFilter(c1 = 1 OR c1 = 2) OR c1 = 3)
468+
// ) t
469+
// WHERE
470+
// PropagatedFilter(
471+
// PropagatedFilter((c2 = 1 AND c1 = 1) OR (c2 = 2 AND c1 = 2) OR
472+
// (c2 = 3 AND c1 = 3))
473+
//
474+
// At the end of the rule we remove the `PropagatedFilter` wrappers.
369475
case (_, np: Filter, cp: Filter) =>
370476
tryMergePlans(np.child, cp.child, scanCheck).flatMap {
371477
case (mergedChild, outputMap, newChildFilter, mergedChildFilter) =>
@@ -459,29 +565,27 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
459565
}
460566

461567
/**
462-
* - When we merge projection nodes (`Project` and `Aggregate`) we need to merge the named
463-
* expression list coming from the new plan node into the expressions of the projection node of
464-
* the merged child plan and return a merged list of expressions that will be placed into the
465-
* merged projection node.
568+
* Merges named expression lists of `Project` or `Aggregate` nodes of the new plan into the named
569+
* expression list of a similar node of the cached plan.
570+
*
466571
* - Before we can merge the new expressions, we need to take into account the propagated
467-
* attribute mapping that describes the transformation from the input attributes the new plan's
468-
* projection node to the input attributes of the merged child plan's projection node.
469-
* - While merging the new expressions we need to build a new attribute mapping that describes
470-
* the transformation from the output attributes of the new expressions to the output attributes
471-
* of the merged list of expression.
472-
* - If any filters are propagated from `Filter` nodes below, we need to transform the expressions
473-
* to named expressions and merge them into the cached expressions as we did with new expressions.
572+
* attribute mapping that describes the transformation from the input attributes of the new plan
573+
* node to the output attributes of the already merged child plan node.
574+
* - While merging the new expressions we need to build a new attribute mapping to propagate.
575+
* - If any filters are propagated from `Filter` nodes below then we could add all the referenced
576+
* attributes of filter conditions to the merged expression list, but it is better if we alias
577+
* whole filter conditions and propagate only the new boolean attributes.
474578
*
475-
* @param newExpressions the expressions of the new plan's projection node
476-
* @param outputMap the propagated attribute mapping
477-
* @param cachedExpressions the expressions of the cached plan's projection node
478-
* @param newChildFilter the propagated filters from `Filter` nodes of the new plan
579+
* @param newExpressions the expression list of the new plan node
580+
* @param outputMap the propagated attribute mapping
581+
* @param cachedExpressions the expression list of the cached plan node
582+
* @param newChildFilter the propagated filters from `Filter` nodes of the new plan
479583
* @param mergedChildFilter the propagated filters from `Filter` nodes of the merged child plan
480584
* @return A tuple of:
481585
* - the merged expression list,
482586
* - the new attribute mapping to propagate,
483-
* - the output attributes of the merged newChildFilter to propagate,
484-
* - the output attributes of the merged mergedChildFilter to propagate,
587+
* - the output attribute of the merged newChildFilter to propagate,
588+
* - the output attribute of the merged mergedChildFilter to propagate
485589
*/
486590
private def mergeNamedExpressions(
487591
newExpressions: Seq[NamedExpression],

0 commit comments

Comments
 (0)