-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Support Skew join in FE #50073
base: main
Are you sure you want to change the base?
[WIP] Support Skew join in FE #50073
Conversation
1312d5d
to
42345d9
Compare
|
||
public class SplitCastDataSink extends DataSink { | ||
// SplitCastDataSink will have multiple dest fragment | ||
private final List<DataStreamSink> dataStreamSinks = Lists.newArrayList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At present, only two DataStreamSink, one for random while the other for shuffle, in future, is there a possiblity that more than two dataStreamSinks are introduced to support other scenarios?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At present, only two DataStreamSink, one for random while the other for shuffle, in future, is there a possiblity that more than two dataStreamSinks are introduced to support other scenarios?
right now, left table 's exchange Node has two DataStreamSink, one for random while the other for shuffle. and right table's exchange Node also has two DataStreamSink, one for broadcast while the other for shuffle. you can define what outputPartition you want, it's very flexible
// every DataStreamSink can have multiple destinations | ||
private final List<List<TPlanFragmentDestination>> destinations = Lists.newArrayList(); | ||
// if splitExprs[i] is true for [0,i], then data will be sent to dest fragment i | ||
private final List<Expr> splitExprs = Lists.newArrayList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest to adopt style like Collectors.groupBy(), so we can cook a splitExpr from this splitExprs, the splitExpr always map input to 0..n-1 that correspond n partitions. for an example:
we can make a case-when expr from this splitExprs.
case
when splitExprs[0] then 0
when splitExprs[1] then 1
...
else n
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest to adopt style like Collectors.groupBy(), so we can cook a splitExpr from this splitExprs, the splitExpr always map input to 0..n-1 that correspond n partitions. for an example:
we can make a case-when expr from this splitExprs.
case when splitExprs[0] then 0 when splitExprs[1] then 1 ... else n end
right now actually it's want you want. one chunk will first be evaluted by splitExprs[length - 1], if one row's result is true then it will be sent to dataStreamSinks[length - 1], if not, it will be evaluted by splitExprs[length - 2] etc. so every splitExprs[i] will cut cur_chunk into two chunk, one is sent to dataStreamSinks[i], another is fed to splitExprs[i-1].
...core/src/main/java/com/starrocks/sql/optimizer/rule/tree/SkewShuffleJoinEliminationRule.java
Outdated
Show resolved
Hide resolved
|
||
public void setSkewJoin(boolean skewJoin) { | ||
isSkewJoin = | ||
skewJoin && ConnectContext.get().getSessionVariable().isEnableOptimizerSkewJoinByBroadCastSkewValues(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's better to not depend on ConnectContext here. and ConnectContext.get().
may be null in some cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's better to not depend on ConnectContext here. and
ConnectContext.get().
may be null in some cases.
this is because Yangwenbo's optimization also set join's skew column, so I need to check session variables to distinguish it. This flag in HashJoinNode will be used for runtimr filter later. After all pr merged, I wll consider to remove Wenbo's optimization, so this code line can be deleted
* | / \ / | ||
* | / \ / | ||
* child1 child2 | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- chid1 and child2's non-skew data are using shuffle join, chid1 and child2's
skew
data are using broadcast join? - What's the broadcast's distribution type? how to determine its dop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see CTE is used in the code. Can you add more infos in the description?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- chid1 and child2's non-skew data are using shuffle join, chid1 and child2's
skew
data are using broadcast join?- What's the broadcast's distribution type? how to determine its dop?
- correct
- for broadcast, left table's skew data will be sent randomly, right table's coresponding data will be sent broadcast. They are pointed by leftSplitConsumerOptForBroadcastJoin and rightSplitConsumerOptForBroadcastJoin's DistributionSpec, the former is RoundRobinDistributionSpec, the latter is DistributionSpec.createReplicatedDistributionSpec(). And PlanFragmentBuilder will translate DistributionSpec into DataPartition in visitPhysicalSplitConsumer, then this DataPartition will be stored in splitProduceFragment.getOutputPartitions().add(dataPartition); and Exchang Node to tell exchange sink and source
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see CTE is used in the code. Can you add more infos in the description?
PhysicalCTEAnchorOperator is actually only used for planFragment builder, it will first visit left child which is split producer, then visit right child which is split consumer, actually it has nothing to do with cte.You can see document for details
.setInputs(rightExchangeOptExp.getInputs()) | ||
.setLogicalProperty(rightExchangeOptExp.getLogicalProperty()) | ||
.setStatistics(rightExchangeOptExp.getStatistics()) | ||
.setCost(rightExchangeOptExp.getCost()).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about using a method to create PhysicalSplitProduceOperator by using a parameter to decide left or right?
} | ||
} | ||
|
||
return new SkewColumnAndValues(skewValues, Pair.create(leftSkewColumn, rightSkewColumn)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- What if join's on predicates contain multi columns(eg: t1.a =t2.a and t1.b=t2.b)? can we optimize this case?
- What about multi joins(eg: t1.a=t2.a=t3.a)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- What if join's on predicates contain multi columns(eg: t1.a =t2.a and t1.b=t2.b)? can we optimize this case?
- What about multi joins(eg: t1.a=t2.a=t3.a)?
right now we only support skew hint. so if you specify join's one column in on predicate like t1.a is skew, we visit plan tree top down and find whether join has specified skew column and whether it's shuffle join. if yes, then we find the corresponding column in right talbe like t2.a, and rewrite this join using skew join rule. we won't visit original join's child any more.
so answer for 1 is yes, although we only consider one predicate( t1.b=t2.b is not taken into consideration even if t1.b is also skew) for simplicity
answer for 2 is yes, but right now only support top join's skew optimization. if case like below, only join2 can be optimized
join2(skew)
/ \
join1(skew) t3
/ \
t1 t2
originalShuffleJoinOperator.getJoinType(), originalShuffleJoinOperator.getOnPredicate(), | ||
originalShuffleJoinOperator.getJoinHint(), originalShuffleJoinOperator.getLimit(), | ||
originalShuffleJoinOperator.getPredicate(), projectionOnJoin, | ||
originalShuffleJoinOperator.getSkewColumn(), originalShuffleJoinOperator.getSkewValues()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why use originalShuffleJoinOperator.getSkewValues()
rather than skewValues
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why use
originalShuffleJoinOperator.getSkewValues()
rather thanskewValues
?
actually skewValus and skewColumn only used in this rule, so it's ok to pass originalShuffleJoinOperator.getSkewValues(), and these two new join won't be optimized by this rule again
|
||
import java.util.List; | ||
|
||
public class PhysicalMergeOperator extends PhysicalSetOperation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use PhysicalSetOperation? what's the difference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use PhysicalSetOperation? what's the difference?
mostly because planFragment builder's logic for PhysicalSetOperation is not sutiable for skew join. PhysicalSetOperation will always create new fragment and add exchange node between set and it's child.
...-core/src/main/java/com/starrocks/sql/optimizer/operator/physical/PhysicalMergeOperator.java
Outdated
Show resolved
Hide resolved
@@ -172,7 +170,6 @@ public void init(Analyzer analyzer) { | |||
} | |||
|
|||
protected void toThrift(TPlanNode msg, TPlanNodeType nodeType) { | |||
Preconditions.checkState(materializedResultExprLists_.size() == children.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove this?
if set's children are all union passthrough ops, it won't use materializedResultExprLists_, so i think this check doesn't make any sense
.setInputs(List.of(leftSplitConsumerOptExpForShuffleJoin, rightSplitConsumerOptExpForShuffleJoin)) | ||
.setLogicalProperty(opt.getLogicalProperty()) | ||
.setStatistics(opt.getStatistics()) | ||
.setRequiredProperties(opt.getRequiredProperties()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the new join OptExpression requiredProperties for its children same with the old join OptExpression? I think it's different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the new join OptExpression requiredProperties for its children same with the old join OptExpression? I think it's different.
I think new shuffle join is the same as old shuffle join, broadcast join is different :
OptExpression newBroadcastJoin = OptExpression.builder().setOp(newBroadcastJoinOpt)
.setInputs(
List.of(leftSplitConsumerOptExpForBroadcastJoin, rightSplitConsumerOptExpForBroadcastJoin))
.setLogicalProperty(opt.getLogicalProperty())
.setStatistics(opt.getStatistics())
.setRequiredProperties(requiredPropertiesForBroadcastJoin)
.setCost(opt.getCost()).build();
fe/fe-core/src/main/java/com/starrocks/planner/SplitPlanFragment.java
Outdated
Show resolved
Hide resolved
...-core/src/main/java/com/starrocks/sql/optimizer/operator/physical/PhysicalMergeOperator.java
Outdated
Show resolved
Hide resolved
Quality Gate failedFailed conditions |
Signed-off-by: before-Sunrise <unclejyj@gmail.com>
4e0c539
to
b5fa6a3
Compare
[Java-Extensions Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
[FE Incremental Coverage Report]✅ pass : 60 / 68 (88.24%) file detail
|
[BE Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
Why I'm doing:
skew values use boradcast join, and other values use shuffle join for skew shuffle join
What I'm doing:
support skew join optimization in FE. This is the first pr of #48655
Fixes #issue
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check: