Skip to content

Commit

Permalink
[opt](nereids) generate in-bloom filter if target is local for pipeli…
Browse files Browse the repository at this point in the history
…ne mode (apache#20112)

update in-filter usage in pipeline mode:
1. if the target is local, we use in-bloom filter. Let BE choose in or bloom according to actual distinctive number
2. set default runtime_filter_max_in_num to 1024
  • Loading branch information
englefly authored May 31, 2023
1 parent c03a19e commit 5f591a6
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.processor.post;

import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;

/**
* generate fragment id for nereids physical plan
*/
public class FragmentProcessor extends PlanPostProcessor {
private int frId = 0;

public PhysicalDistribute visitPhysicalDistribute(PhysicalDistribute<? extends Plan> distribute,
CascadesContext ctx) {
frId++;
distribute.child().accept(this, ctx);
return distribute;
}

public PhysicalHashJoin visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
CascadesContext ctx) {
join.setMutableState(AbstractPlan.FRAGMENT_ID, frId);
join.left().accept(this, ctx);
join.right().accept(this, ctx);
return join;
}

public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan, CascadesContext ctx) {
scan.setMutableState(AbstractPlan.FRAGMENT_ID, frId);
return scan;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public List<PlanPostProcessor> getProcessors() {
Builder<PlanPostProcessor> builder = ImmutableList.builder();
builder.add(new MergeProjectPostProcessor());
builder.add(new PushdownFilterThroughProject());
builder.add(new FragmentProcessor());
if (!cascadesContext.getConnectContext().getSessionVariable().getRuntimeFilterMode()
.toUpperCase().equals(TRuntimeFilterMode.OFF.name())) {
builder.add(new RuntimeFilterGenerator());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
import org.apache.doris.planner.RuntimeFilterId;
Expand Down Expand Up @@ -71,7 +72,7 @@ public class RuntimeFilterContext {
// alias -> alias's child, if there's a key that is alias's child, the key-value will change by this way
// Alias(A) = B, now B -> A in map, and encounter Alias(B) -> C, the kv will be C -> A.
// you can see disjoint set data structure to learn the processing detailed.
private final Map<NamedExpression, Pair<ObjectId, Slot>> aliasTransferMap = Maps.newHashMap();
private final Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = Maps.newHashMap();

private final Map<Slot, ScanNode> scanNodeOfLegacyRuntimeFilterTarget = Maps.newHashMap();

Expand Down Expand Up @@ -126,7 +127,7 @@ public Map<ExprId, SlotRef> getExprIdToOlapScanNodeSlotRef() {
return exprIdToOlapScanNodeSlotRef;
}

public Map<NamedExpression, Pair<ObjectId, Slot>> getAliasTransferMap() {
public Map<NamedExpression, Pair<PhysicalRelation, Slot>> getAliasTransferMap() {
return aliasTransferMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.ObjectId;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
Expand All @@ -44,6 +43,7 @@
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.thrift.TRuntimeFilterType;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;

import java.util.Arrays;
Expand Down Expand Up @@ -85,7 +85,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
CascadesContext context) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
Map<NamedExpression, Pair<ObjectId, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
join.right().accept(this, context);
join.left().accept(this, context);
if (DENIED_JOIN_TYPES.contains(join.getJoinType()) || join.isMarkJoin()) {
Expand All @@ -105,10 +105,6 @@ public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? ext
if (type == TRuntimeFilterType.BITMAP) {
continue;
}
// in-filter is not friendly to pipeline
if (type == TRuntimeFilterType.IN_OR_BLOOM && ctx.getSessionVariable().enablePipelineEngine()) {
type = TRuntimeFilterType.BLOOM;
}
// currently, we can ensure children in the two side are corresponding to the equal_to's.
// so right maybe an expression and left is a slot
Slot unwrappedSlot = checkTargetChild(equalTo.left());
Expand All @@ -118,18 +114,35 @@ public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? ext
continue;
}
Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
// in-filter is not friendly to pipeline
if (type == TRuntimeFilterType.IN_OR_BLOOM
&& ctx.getSessionVariable().enablePipelineEngine()
&& hasRemoteTarget(join, scan)) {
type = TRuntimeFilterType.BLOOM;
}

long buildSideNdv = getBuildSideNdv(join, equalTo);
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
equalTo.right(), olapScanSlot, type, i, join, buildSideNdv);
ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first, olapScanSlot);
ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
}
}
}
return join;
}

private boolean hasRemoteTarget(AbstractPlan join, AbstractPlan scan) {
Preconditions.checkArgument(join.getMutableState(AbstractPlan.FRAGMENT_ID).isPresent(),
"cannot find fragment id for Join node");
Preconditions.checkArgument(scan.getMutableState(AbstractPlan.FRAGMENT_ID).isPresent(),
"cannot find fragment id for scan node");
return join.getMutableState(AbstractPlan.FRAGMENT_ID).get()
!= scan.getMutableState(AbstractPlan.FRAGMENT_ID).get();
}

private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
AbstractPlan right = (AbstractPlan) join.right();
//make ut test friendly
Expand All @@ -151,7 +164,7 @@ public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? extends
return join;
}
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
Map<NamedExpression, Pair<ObjectId, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();

if ((ctx.getSessionVariable().getRuntimeFilterType() & TRuntimeFilterType.BITMAP.getValue()) == 0) {
//only generate BITMAP filter for nested loop join
Expand Down Expand Up @@ -185,7 +198,8 @@ public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? extends
bitmapContains.child(1), type, i, join, isNot, -1L);
ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first, olapScanSlot);
ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first.getId(),
olapScanSlot);
join.addBitmapRuntimeFilterCondition(bitmapRuntimeFilterCondition);
}
}
Expand All @@ -196,7 +210,7 @@ public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? extends
@Override
public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> project, CascadesContext context) {
project.child().accept(this, context);
Map<NamedExpression, Pair<ObjectId, Slot>> aliasTransferMap
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap
= context.getRuntimeFilterContext().getAliasTransferMap();
// change key when encounter alias.
for (Expression expression : project.getProjects()) {
Expand All @@ -218,7 +232,7 @@ public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> project
public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext context) {
// add all the slots in map.
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
scan.getOutput().forEach(slot -> ctx.getAliasTransferMap().put(slot, Pair.of(scan.getId(), slot)));
scan.getOutput().forEach(slot -> ctx.getAliasTransferMap().put(slot, Pair.of(scan, slot)));
return scan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* Abstract class for all concrete plan node.
*/
public abstract class AbstractPlan extends AbstractTreeNode<Plan> implements Plan {
public static final String FRAGMENT_ID = "fragment";
private static final EventProducer PLAN_CONSTRUCT_TRACER = new EventProducer(CounterEvent.class,
EventChannel.getDefaultChannel()
.addEnhancers(new AddCounterEventEnhancer())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.MarkJoinSlotReference;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
Expand Down Expand Up @@ -132,7 +133,8 @@ public String toString() {
List<Object> args = Lists.newArrayList("type", joinType,
"hashJoinCondition", hashJoinConjuncts,
"otherJoinCondition", otherJoinConjuncts,
"stats", statistics);
"stats", statistics,
"fr", getMutableState(AbstractPlan.FRAGMENT_ID));
if (markJoinSlotReference.isPresent()) {
args.add("isMarkJoin");
args.add("true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.ObjectId;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
Expand Down Expand Up @@ -119,7 +120,7 @@ public List<Slot> getBaseOutputs() {
public String toString() {
return Utils.toSqlString("PhysicalOlapScan[" + id.asInt() + "]" + getGroupIdAsString(),
"qualified", Utils.qualifiedName(qualifier, olapTable.getName()),
"stats", statistics
"stats", statistics, "fr", getMutableState(AbstractPlan.FRAGMENT_ID)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ public class SessionVariable implements Serializable, Writable {
private int runtimeFilterType = 8;

@VariableMgr.VarAttr(name = RUNTIME_FILTER_MAX_IN_NUM)
private int runtimeFilterMaxInNum = 102400;
private int runtimeFilterMaxInNum = 1024;

@VariableMgr.VarAttr(name = USE_RF_DEFAULT)
public boolean useRuntimeFilterDefaultSize = false;
Expand Down

0 comments on commit 5f591a6

Please sign in to comment.