Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Support Skew join in FE #50073

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

before-Sunrise
Copy link
Contributor

Why I'm doing:

skew values use boradcast join, and other values use shuffle join for skew shuffle join

What I'm doing:

support skew join optimization in FE. This is the first pr of #48655

Fixes #issue

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
  • I have added documentation for my new feature or new function
  • This is a backport pr

Bugfix cherry-pick branch check:

  • I have checked the version labels which the pr will be auto-backported to the target branch
    • 3.3
    • 3.2
    • 3.1
    • 3.0
    • 2.5

@before-Sunrise before-Sunrise requested review from a team as code owners August 21, 2024 07:40
@wanpengfei-git wanpengfei-git requested a review from a team August 21, 2024 07:41
@before-Sunrise before-Sunrise force-pushed the skew_join_fe_rewrite_without_rf branch from 1312d5d to 42345d9 Compare August 21, 2024 07:41

public class SplitCastDataSink extends DataSink {
// SplitCastDataSink will have multiple dest fragment
private final List<DataStreamSink> dataStreamSinks = Lists.newArrayList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At present, only two DataStreamSink, one for random while the other for shuffle, in future, is there a possiblity that more than two dataStreamSinks are introduced to support other scenarios?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At present, only two DataStreamSink, one for random while the other for shuffle, in future, is there a possiblity that more than two dataStreamSinks are introduced to support other scenarios?

right now, left table 's exchange Node has two DataStreamSink, one for random while the other for shuffle. and right table's exchange Node also has two DataStreamSink, one for broadcast while the other for shuffle. you can define what outputPartition you want, it's very flexible

// every DataStreamSink can have multiple destinations
private final List<List<TPlanFragmentDestination>> destinations = Lists.newArrayList();
// if splitExprs[i] is true for [0,i], then data will be sent to dest fragment i
private final List<Expr> splitExprs = Lists.newArrayList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest to adopt style like Collectors.groupBy(), so we can cook a splitExpr from this splitExprs, the splitExpr always map input to 0..n-1 that correspond n partitions. for an example:

we can make a case-when expr from this splitExprs.

case 
when splitExprs[0]  then 0
when splitExprs[1] then 1
...
else n
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest to adopt style like Collectors.groupBy(), so we can cook a splitExpr from this splitExprs, the splitExpr always map input to 0..n-1 that correspond n partitions. for an example:

we can make a case-when expr from this splitExprs.

case 
when splitExprs[0]  then 0
when splitExprs[1] then 1
...
else n
end

right now actually it's want you want. one chunk will first be evaluted by splitExprs[length - 1], if one row's result is true then it will be sent to dataStreamSinks[length - 1], if not, it will be evaluted by splitExprs[length - 2] etc. so every splitExprs[i] will cut cur_chunk into two chunk, one is sent to dataStreamSinks[i], another is fed to splitExprs[i-1].


public void setSkewJoin(boolean skewJoin) {
isSkewJoin =
skewJoin && ConnectContext.get().getSessionVariable().isEnableOptimizerSkewJoinByBroadCastSkewValues();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to not depend on ConnectContext here. and ConnectContext.get(). may be null in some cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to not depend on ConnectContext here. and ConnectContext.get(). may be null in some cases.
this is because Yangwenbo's optimization also set join's skew column, so I need to check session variables to distinguish it. This flag in HashJoinNode will be used for runtimr filter later. After all pr merged, I wll consider to remove Wenbo's optimization, so this code line can be deleted

* | / \ /
* | / \ /
* child1 child2
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. chid1 and child2's non-skew data are using shuffle join, chid1 and child2's skew data are using broadcast join?
  2. What's the broadcast's distribution type? how to determine its dop?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see CTE is used in the code. Can you add more infos in the description?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. chid1 and child2's non-skew data are using shuffle join, chid1 and child2's skew data are using broadcast join?
  2. What's the broadcast's distribution type? how to determine its dop?
  1. correct
  2. for broadcast, left table's skew data will be sent randomly, right table's coresponding data will be sent broadcast. They are pointed by leftSplitConsumerOptForBroadcastJoin and rightSplitConsumerOptForBroadcastJoin's DistributionSpec, the former is RoundRobinDistributionSpec, the latter is DistributionSpec.createReplicatedDistributionSpec(). And PlanFragmentBuilder will translate DistributionSpec into DataPartition in visitPhysicalSplitConsumer, then this DataPartition will be stored in splitProduceFragment.getOutputPartitions().add(dataPartition); and Exchang Node to tell exchange sink and source

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see CTE is used in the code. Can you add more infos in the description?

PhysicalCTEAnchorOperator is actually only used for planFragment builder, it will first visit left child which is split producer, then visit right child which is split consumer, actually it has nothing to do with cte.You can see document for details

.setInputs(rightExchangeOptExp.getInputs())
.setLogicalProperty(rightExchangeOptExp.getLogicalProperty())
.setStatistics(rightExchangeOptExp.getStatistics())
.setCost(rightExchangeOptExp.getCost()).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about using a method to create PhysicalSplitProduceOperator by using a parameter to decide left or right?

}
}

return new SkewColumnAndValues(skewValues, Pair.create(leftSkewColumn, rightSkewColumn));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. What if join's on predicates contain multi columns(eg: t1.a =t2.a and t1.b=t2.b)? can we optimize this case?
  2. What about multi joins(eg: t1.a=t2.a=t3.a)?

Copy link
Contributor Author

@before-Sunrise before-Sunrise Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. What if join's on predicates contain multi columns(eg: t1.a =t2.a and t1.b=t2.b)? can we optimize this case?
  2. What about multi joins(eg: t1.a=t2.a=t3.a)?

right now we only support skew hint. so if you specify join's one column in on predicate like t1.a is skew, we visit plan tree top down and find whether join has specified skew column and whether it's shuffle join. if yes, then we find the corresponding column in right talbe like t2.a, and rewrite this join using skew join rule. we won't visit original join's child any more.

so answer for 1 is yes, although we only consider one predicate( t1.b=t2.b is not taken into consideration even if t1.b is also skew) for simplicity
answer for 2 is yes, but right now only support top join's skew optimization. if case like below, only join2 can be optimized

                         join2(skew)
                   /                              \
         join1(skew)                             t3
    /                        \
t1                         t2

originalShuffleJoinOperator.getJoinType(), originalShuffleJoinOperator.getOnPredicate(),
originalShuffleJoinOperator.getJoinHint(), originalShuffleJoinOperator.getLimit(),
originalShuffleJoinOperator.getPredicate(), projectionOnJoin,
originalShuffleJoinOperator.getSkewColumn(), originalShuffleJoinOperator.getSkewValues());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use originalShuffleJoinOperator.getSkewValues() rather than skewValues ?

Copy link
Contributor Author

@before-Sunrise before-Sunrise Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use originalShuffleJoinOperator.getSkewValues() rather than skewValues ?

actually skewValus and skewColumn only used in this rule, so it's ok to pass originalShuffleJoinOperator.getSkewValues(), and these two new join won't be optimized by this rule again


import java.util.List;

public class PhysicalMergeOperator extends PhysicalSetOperation {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use PhysicalSetOperation? what's the difference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use PhysicalSetOperation? what's the difference?

mostly because planFragment builder's logic for PhysicalSetOperation is not sutiable for skew join. PhysicalSetOperation will always create new fragment and add exchange node between set and it's child.

@@ -172,7 +170,6 @@ public void init(Analyzer analyzer) {
}

protected void toThrift(TPlanNode msg, TPlanNodeType nodeType) {
Preconditions.checkState(materializedResultExprLists_.size() == children.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this?
if set's children are all union passthrough ops, it won't use materializedResultExprLists_, so i think this check doesn't make any sense

.setInputs(List.of(leftSplitConsumerOptExpForShuffleJoin, rightSplitConsumerOptExpForShuffleJoin))
.setLogicalProperty(opt.getLogicalProperty())
.setStatistics(opt.getStatistics())
.setRequiredProperties(opt.getRequiredProperties())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the new join OptExpression requiredProperties for its children same with the old join OptExpression? I think it's different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the new join OptExpression requiredProperties for its children same with the old join OptExpression? I think it's different.
I think new shuffle join is the same as old shuffle join, broadcast join is different :

OptExpression newBroadcastJoin = OptExpression.builder().setOp(newBroadcastJoinOpt)
                    .setInputs(
                            List.of(leftSplitConsumerOptExpForBroadcastJoin, rightSplitConsumerOptExpForBroadcastJoin))
                    .setLogicalProperty(opt.getLogicalProperty())
                    .setStatistics(opt.getStatistics())
                    .setRequiredProperties(requiredPropertiesForBroadcastJoin)
                    .setCost(opt.getCost()).build();

Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
3.5% Duplication on New Code (required ≤ 3%)

See analysis details on SonarCloud

Signed-off-by: before-Sunrise <unclejyj@gmail.com>
@before-Sunrise before-Sunrise changed the title [Feature] Support Skew join in FE [WIP] Support Skew join in FE Feb 6, 2025
@before-Sunrise before-Sunrise force-pushed the skew_join_fe_rewrite_without_rf branch from 4e0c539 to b5fa6a3 Compare February 6, 2025 11:38
Copy link

github-actions bot commented Feb 6, 2025

[Java-Extensions Incremental Coverage Report]

pass : 0 / 0 (0%)

Copy link

github-actions bot commented Feb 6, 2025

[FE Incremental Coverage Report]

pass : 60 / 68 (88.24%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 com/starrocks/planner/UnionNode.java 2 4 50.00% [51, 66]
🔵 com/starrocks/qe/SessionVariable.java 2 3 66.67% [1418]
🔵 com/starrocks/planner/HashJoinNode.java 13 18 72.22% [63, 64, 89, 90, 91]
🔵 com/starrocks/sql/optimizer/operator/physical/PhysicalCTEAnchorOperator.java 2 2 100.00% []
🔵 com/starrocks/sql/optimizer/rule/implementation/HashJoinImplementationRule.java 3 3 100.00% []
🔵 com/starrocks/planner/DataPartition.java 2 2 100.00% []
🔵 com/starrocks/sql/plan/PlanFragmentBuilder.java 15 15 100.00% []
🔵 com/starrocks/planner/JoinNode.java 6 6 100.00% []
🔵 com/starrocks/sql/plan/ExecPlan.java 4 4 100.00% []
🔵 com/starrocks/sql/optimizer/operator/physical/PhysicalHashJoinOperator.java 4 4 100.00% []
🔵 com/starrocks/analysis/SlotDescriptor.java 1 1 100.00% []
🔵 com/starrocks/sql/optimizer/operator/OperatorType.java 3 3 100.00% []
🔵 com/starrocks/sql/optimizer/operator/Projection.java 3 3 100.00% []

Copy link

github-actions bot commented Feb 6, 2025

[BE Incremental Coverage Report]

pass : 0 / 0 (0%)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

support New skew join optimization
5 participants