Skip to content

Commit

Permalink
[BugFix] Cached fragment misuses exogenous runtime filter (StarRocks#…
Browse files Browse the repository at this point in the history
…51150)

Signed-off-by: satanson <ranpanf@gmail.com>
Signed-off-by: zhiminr.ren <1240388654@qq.com>
  • Loading branch information
satanson authored and renzhimin7 committed Nov 7, 2024
1 parent b8beec6 commit 49c0431
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ public boolean normalize() {
// Get leftmost path
List<PlanNode> leftNodesTopDown = Lists.newArrayList();
for (PlanNode currNode = root; currNode != null && currNode.getFragment() == fragment;
currNode = currNode.getChild(0)) {
currNode = currNode.getChild(0)) {
leftNodesTopDown.add(currNode);
}

Expand Down Expand Up @@ -774,15 +774,17 @@ public boolean normalize() {
// Not cacheable unless alien GRF(s) take effects on this PlanFragment.
// The alien GRF(s) mean the GRF(S) that not created by PlanNodes of the subtree rooted at
// the PlanFragment.planRoot.

Set<Integer> grfBuilders =
fragment.getProbeRuntimeFilters().values().stream().filter(RuntimeFilterDescription::isHasRemoteTargets)
.map(RuntimeFilterDescription::getBuildPlanNodeId).collect(Collectors.toSet());
if (!grfBuilders.isEmpty()) {
List<PlanFragment> rightSiblings = Lists.newArrayList();
collectRightSiblingFragments(root, rightSiblings, Sets.newHashSet());
Set<Integer> acceptableGrfBuilders = rightSiblings.stream().flatMap(
frag -> frag.getBuildRuntimeFilters().values().stream().map(
RuntimeFilterDescription::getBuildPlanNodeId)).collect(Collectors.toSet());
Set<Integer> acceptableGrfBuilders = rightSiblings.stream()
.flatMap(frag -> frag.getBuildRuntimeFilters().values().stream())
.map(RuntimeFilterDescription::getBuildPlanNodeId)
.collect(Collectors.toSet());
boolean hasAlienGrf = !Sets.difference(grfBuilders, acceptableGrfBuilders).isEmpty();
if (hasAlienGrf) {
return false;
Expand Down
53 changes: 27 additions & 26 deletions fe/fe-core/src/main/java/com/starrocks/planner/PlanFragment.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedList;
Expand All @@ -71,6 +72,7 @@
import java.util.Queue;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -156,8 +158,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {

protected double fragmentCost;

protected final Map<Integer, RuntimeFilterDescription> buildRuntimeFilters = Maps.newTreeMap();
protected final Map<Integer, RuntimeFilterDescription> probeRuntimeFilters = Maps.newTreeMap();
protected Map<Integer, RuntimeFilterDescription> buildRuntimeFilters = Maps.newHashMap();
protected Map<Integer, RuntimeFilterDescription> probeRuntimeFilters = Maps.newHashMap();

protected List<Pair<Integer, ColumnDict>> queryGlobalDicts = Lists.newArrayList();
protected Map<Integer, Expr> queryGlobalDictExprs;
Expand Down Expand Up @@ -704,34 +706,33 @@ public boolean isTransferQueryStatisticsWithEveryBatch() {
return transferQueryStatisticsWithEveryBatch;
}

public void collectBuildRuntimeFilters(PlanNode root) {
if (root instanceof ExchangeNode) {
return;
}

if (root instanceof RuntimeFilterBuildNode) {
RuntimeFilterBuildNode rfBuildNode = (RuntimeFilterBuildNode) root;
for (RuntimeFilterDescription description : rfBuildNode.getBuildRuntimeFilters()) {
buildRuntimeFilters.put(description.getFilterId(), description);
}
}

for (PlanNode node : root.getChildren()) {
collectBuildRuntimeFilters(node);
}
public void collectBuildRuntimeFilters() {
Map<Integer, RuntimeFilterDescription> filters = Maps.newHashMap();
collectNodes().stream()
.filter(node -> node instanceof RuntimeFilterBuildNode)
.flatMap(node -> ((RuntimeFilterBuildNode) node).getBuildRuntimeFilters().stream())
.forEach(desc -> filters.put(desc.getFilterId(), desc));
buildRuntimeFilters = filters;
}

public void collectProbeRuntimeFilters(PlanNode root) {
for (RuntimeFilterDescription description : root.getProbeRuntimeFilters()) {
probeRuntimeFilters.put(description.getFilterId(), description);
}
public void collectProbeRuntimeFilters() {
Map<Integer, RuntimeFilterDescription> filters = Maps.newHashMap();
collectNodes().stream()
.flatMap(node -> node.getProbeRuntimeFilters().stream())
.forEach(desc -> filters.put(desc.getFilterId(), desc));
probeRuntimeFilters = filters;
}

if (root instanceof ExchangeNode) {
return;
}
public List<PlanNode> collectNodes() {
List<PlanNode> nodes = Lists.newArrayList();
collectNodesImpl(getPlanRoot(), nodes);
return nodes;
}

for (PlanNode node : root.getChildren()) {
collectProbeRuntimeFilters(node);
private void collectNodesImpl(PlanNode root, List<PlanNode> nodes) {
nodes.add(root);
if (!(root instanceof ExchangeNode)) {
root.getChildren().forEach(child -> collectNodesImpl(child, nodes));
}
}

Expand Down
10 changes: 0 additions & 10 deletions fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -606,14 +606,6 @@ private void prepareResultSink() throws AnalysisException {
setGlobalRuntimeFilterParams(rootExecFragment, worker.getBrpcIpAddress());
boolean isLoadType = !(rootExecFragment.getPlanFragment().getSink() instanceof ResultSink);
if (isLoadType) {
// TODO (by satanson): Other DataSink except ResultSink can not support global
// runtime filter merging at present, we should support it in future.
// pipeline-level runtime filter needs to derive RuntimeFilterLayout, so we collect
// RuntimeFilterDescription
for (ExecutionFragment execFragment : executionDAG.getFragmentsInPreorder()) {
PlanFragment fragment = execFragment.getPlanFragment();
fragment.collectBuildRuntimeFilters(fragment.getPlanRoot());
}
return;
}

Expand Down Expand Up @@ -769,8 +761,6 @@ private void setGlobalRuntimeFilterParams(ExecutionFragment topParams,

for (ExecutionFragment execFragment : executionDAG.getFragmentsInPreorder()) {
PlanFragment fragment = execFragment.getPlanFragment();
fragment.collectBuildRuntimeFilters(fragment.getPlanRoot());
fragment.collectProbeRuntimeFilters(fragment.getPlanRoot());
for (Map.Entry<Integer, RuntimeFilterDescription> kv : fragment.getProbeRuntimeFilters().entrySet()) {
List<TRuntimeFilterProberParams> probeParamList = Lists.newArrayList();
for (final FragmentInstance instance : execFragment.getInstances()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,8 @@ private static ExecPlan finalizeFragments(ExecPlan execPlan, TResultSinkType res
}

fragments.forEach(PlanFragment::removeDictMappingProbeRuntimeFilters);
fragments.forEach(PlanFragment::collectBuildRuntimeFilters);
fragments.forEach(PlanFragment::collectProbeRuntimeFilters);

if (useQueryCache(execPlan)) {
for (PlanFragment fragment : execPlan.getFragments()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -958,4 +958,20 @@ public void testQueryTimeout() {
Assert.assertThrows(StarRocksPlannerException.class,
() -> getPlanFragment(getDumpInfoFromFile("query_dump/query_timeout"), null, TExplainLevel.NORMAL));
}

@Test
public void testQueryCacheMisuseExogenousRuntimeFilter() throws Exception {
String savedSv = connectContext.getSessionVariable().getJsonString();
try {
connectContext.getSessionVariable().setEnableQueryCache(true);
QueryDumpInfo dumpInfo =
getDumpInfoFromJson(getDumpInfoFromFile("query_dump/query_cache_misuse_exogenous_runtime_filter"));
ExecPlan execPlan = UtFrameUtils.getPlanFragmentFromQueryDump(connectContext, dumpInfo);
Assert.assertTrue(execPlan.getFragments().stream().noneMatch(frag -> frag.getCacheParam() != null));
Assert.assertTrue(
execPlan.getFragments().stream().anyMatch(frag -> !frag.getProbeRuntimeFilters().isEmpty()));
} finally {
connectContext.getSessionVariable().replayFromJson(savedSv);
}
}
}

Large diffs are not rendered by default.

0 comments on commit 49c0431

Please sign in to comment.