Skip to content

Commit

Permalink
[pipeline](exec) disable shared scan in default and disable shared sc…
Browse files Browse the repository at this point in the history
…an in limit with where scan (apache#25952)
  • Loading branch information
HappenLee authored and seawinde committed Nov 12, 2023
1 parent dae852d commit 76304a9
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1606,9 +1606,6 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL)
public static boolean enable_cpu_hard_limit = false;

@ConfField(mutable = true)
public static boolean disable_shared_scan = false;

@ConfField(mutable = false, masterOnly = true)
public static int backend_rpc_timeout_ms = 60000; // 1 min

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1267,11 +1267,9 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
public int getNumInstances() {
// In pipeline exec engine, the instance num equals be_num * parallel instance.
// so here we need count distinct be_num to do the work. make sure get right instance
if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()) {
int parallelInstance = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
long numBackend = scanRangeLocations.stream().flatMap(rangeLoc -> rangeLoc.getLocations().stream())
.map(loc -> loc.backend_id).distinct().count();
return (int) (parallelInstance * numBackend);
if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()
&& ConnectContext.get().getSessionVariable().getEnableSharedScan()) {
return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
}
return scanRangeLocations.size();
}
Expand Down Expand Up @@ -1328,7 +1326,7 @@ public boolean getShouldColoScan() {
// If scan is key search, should not enable the shared scan opt to prevent the performance problem
// 1. where contain the eq or in expr of key column slot
// 2. key column slot is distribution column and first column
public boolean isKeySearch() {
protected boolean isKeySearch() {
List<SlotRef> whereSlot = Lists.newArrayList();
for (Expr conjunct : conjuncts) {
if (conjunct instanceof BinaryPredicate) {
Expand Down
19 changes: 17 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.external.FederationBackendPolicy;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.query.StatsDelta;
Expand Down Expand Up @@ -148,6 +149,13 @@ protected Expr castToSlot(SlotDescriptor slotDesc, Expr expr) throws UserExcepti
*/
public abstract List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength);

// If scan is key search, should not enable the shared scan opt to prevent the performance problem
// 1. where contain the eq or in expr of key column slot
// 2. key column slot is distribution column and first column
protected boolean isKeySearch() {
return false;
}

/**
* Update required_slots in scan node contexts. This is called after Nereids planner do the projection.
* In the projection process, some slots may be removed. So call this to update the slots info.
Expand Down Expand Up @@ -653,7 +661,14 @@ public static TScanRangeLocations createSingleScanRangeLocations(FederationBacke
return scanRangeLocation;
}

public boolean isKeySearch() {
return false;
// some scan should not enable the shared scan opt to prevent the performance problem
// 1. is key search
// 2. session variable not enable_shared_scan
public boolean shouldDisableSharedScan(ConnectContext context) {
return isKeySearch() || !context.getSessionVariable().getEnableSharedScan();
}

public boolean haveLimitAndConjunts() {
return hasLimit() && !conjuncts.isEmpty();
}
}
26 changes: 18 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1955,27 +1955,37 @@ private void computeFragmentHosts() throws Exception {

// disable shared scan optimization if one of conditions below is met:
// 1. Use non-pipeline or pipelineX engine
// 2. Number of scan ranges is larger than instances
// 3. This fragment has a colocated scan node
// 4. This fragment has a FileScanNode
// 5. Disable shared scan optimization by session variable
if (!enablePipelineEngine || perNodeScanRanges.size() > parallelExecInstanceNum
|| (node.isPresent() && node.get().getShouldColoScan())
// 2. This fragment has a colocated scan node
// 3. This fragment has a FileScanNode
// 4. Disable shared scan optimization by session variable
if (!enablePipelineEngine || (node.isPresent() && node.get().getShouldColoScan())
|| (node.isPresent() && node.get() instanceof FileScanNode)
|| (node.isPresent() && node.get().isKeySearch())
|| Config.disable_shared_scan || enablePipelineXEngine) {
|| (node.isPresent() && node.get().shouldDisableSharedScan(context))
|| enablePipelineXEngine) {
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets num
expectedInstanceNum = Math.min(perNodeScanRanges.size(), parallelExecInstanceNum);
}
// if have limit and conjunts, only need 1 instance to save cpu and
// mem resource
if (node.isPresent() && node.get().haveLimitAndConjunts()) {
expectedInstanceNum = 1;
}

perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges,
expectedInstanceNum);
sharedScanOpts = Collections.nCopies(perInstanceScanRanges.size(), false);
} else {
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
// if have limit and conjunts, only need 1 instance to save cpu and
// mem resource
if (node.isPresent() && node.get().haveLimitAndConjunts()) {
expectedInstanceNum = 1;
}

perInstanceScanRanges = Collections.nCopies(expectedInstanceNum, perNodeScanRanges);
sharedScanOpts = Collections.nCopies(perInstanceScanRanges.size(), true);
}
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_PIPELINE_X_ENGINE = "enable_pipeline_x_engine";

public static final String ENABLE_SHARED_SCAN = "enable_shared_scan";

public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle";

public static final String ENABLE_AGG_STATE = "enable_agg_state";
Expand Down Expand Up @@ -738,6 +740,11 @@ public class SessionVariable implements Serializable, Writable {

@VariableMgr.VarAttr(name = ENABLE_PIPELINE_X_ENGINE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL)
private boolean enablePipelineXEngine = false;

@VariableMgr.VarAttr(name = ENABLE_SHARED_SCAN, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
needForward = true)
private boolean enableSharedScan = false;

@VariableMgr.VarAttr(name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL)
private boolean enableLocalShuffle = false;

Expand Down Expand Up @@ -2832,6 +2839,10 @@ public boolean getEnablePipelineEngine() {
return enablePipelineEngine || enablePipelineXEngine;
}

public boolean getEnableSharedScan() {
return enableSharedScan;
}

public boolean getEnablePipelineXEngine() {
return enablePipelineXEngine;
}
Expand Down

0 comments on commit 76304a9

Please sign in to comment.