Skip to content

Commit

Permalink
[Enhancement] plan for table on lake to use global dictionary (StarRo…
Browse files Browse the repository at this point in the history
…cks#55346)

Signed-off-by: zombee0 <ewang2027@gmail.com>
  • Loading branch information
zombee0 authored Feb 5, 2025
1 parent a8aeca0 commit 8d5fdef
Show file tree
Hide file tree
Showing 23 changed files with 689 additions and 41 deletions.
3 changes: 3 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2056,6 +2056,9 @@ public class Config extends ConfigBase {
@ConfField
public static int dict_collect_thread_pool_size = 16;

@ConfField
public static int dict_collect_thread_pool_for_lake_size = 4;

/**
* The column statistic cache update interval
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,20 @@ public class ThreadPoolManager {

private static final long KEEP_ALIVE_TIME = 60L;

private static final ThreadPoolExecutor DICT_CACHE_THREAD =
private static final ThreadPoolExecutor DICT_CACHE_THREAD_POOL =
ThreadPoolManager.newDaemonCacheThreadPool(Config.dict_collect_thread_pool_size, "cache-dict",
false);

private static final ThreadPoolExecutor DICT_CACHE_THREAD_POOL_FOR_LAKE =
ThreadPoolManager.newDaemonCacheThreadPool(Config.dict_collect_thread_pool_for_lake_size,
"cache-dict-lake", false);

public static ThreadPoolExecutor getDictCacheThread() {
return DICT_CACHE_THREAD;
return DICT_CACHE_THREAD_POOL;
}

public static ThreadPoolExecutor getDictCacheThreadPoolForLake() {
return DICT_CACHE_THREAD_POOL_FOR_LAKE;
}

public static void registerAllThreadPoolMetric() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public boolean isMaxRunningConcurrencyReached() {
return runningTasks.size() >= Config.connector_table_query_trigger_analyze_max_running_task_num;
}

public void scheduledPendingTask() {
public void schedulePendingTask() {
// do not dispatch task if max running concurrency reached or no pending task
if (isMaxRunningConcurrencyReached()) {
LOG.info("Connector Analyze TaskQueue running task num reach limit: {}, current: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Table;
import com.starrocks.common.Config;
import com.starrocks.common.ThreadPoolManager;
import com.starrocks.common.util.DateUtils;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.statistic.StatisticExecutor;
import io.trino.hive.$internal.org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -27,7 +30,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -36,16 +41,41 @@ public class ConnectorTableTriggerAnalyzeMgr {
private static final Logger LOG = LogManager.getLogger(ConnectorTableTriggerAnalyzeMgr.class);

private final ConnectorAnalyzeTaskQueue connectorAnalyzeTaskQueue = new ConnectorAnalyzeTaskQueue();
private final Map<ConnectorTableColumnKey, Optional<String>> keyToFileForGlobalDict = new ConcurrentHashMap<>();
private final ScheduledExecutorService dispatchScheduler = Executors.newScheduledThreadPool(1);
private final AtomicBoolean isStart = new AtomicBoolean(false);

public void start() {
if (isStart.compareAndSet(false, true)) {
dispatchScheduler.scheduleAtFixedRate(connectorAnalyzeTaskQueue::scheduledPendingTask, 0,
dispatchScheduler.scheduleAtFixedRate(this::schedulePendingTask, 0,
Config.connector_table_query_trigger_analyze_schedule_interval, TimeUnit.SECONDS);
}
}

private void schedulePendingTask() {
if (GlobalStateMgr.getCurrentState().isLeader()) {
connectorAnalyzeTaskQueue.schedulePendingTask();
}
scheduleDictUpdate();
}

private void scheduleDictUpdate() {
for (Map.Entry<ConnectorTableColumnKey, Optional<String>> entry : keyToFileForGlobalDict.entrySet()) {
Optional<String> value = keyToFileForGlobalDict.remove(entry.getKey());
String tableUUID = entry.getKey().tableUUID;
String columnName = entry.getKey().column;
Runnable task = () -> {
StatisticExecutor.updateDictSync(tableUUID, columnName, value);
};
try {
ThreadPoolManager.getDictCacheThreadPoolForLake().submit(task);
} catch (RejectedExecutionException e) {
keyToFileForGlobalDict.put(entry.getKey(), value);
break;
}
}
}

public void checkAndUpdateTableStats(Map<ConnectorTableColumnKey, Optional<ConnectorTableColumnStats>> columnStats) {
if (columnStats == null || columnStats.isEmpty()) {
return;
Expand Down Expand Up @@ -102,4 +132,12 @@ public void checkAndUpdateTableStats(Map<ConnectorTableColumnKey, Optional<Conne
}
}
}

public void addDictUpdateTask(ConnectorTableColumnKey key, Optional<String> fileName) {
// the subsequent file will invalid previous ones
Optional<String> old = keyToFileForGlobalDict.get(key);
if (old == null || old.isPresent()) {
keyToFileForGlobalDict.put(key, fileName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ protected String getNodeExplainString(String prefix, TExplainLevel detailLevel)
if (detailLevel == TExplainLevel.VERBOSE) {
HdfsScanNode.appendDataCacheOptionsInExplain(output, prefix, dataCacheOptions);

output.append(explainColumnDict(prefix));

for (SlotDescriptor slotDescriptor : desc.getSlots()) {
Type type = slotDescriptor.getOriginType();
if (type.isComplexType()) {
Expand Down
22 changes: 1 addition & 21 deletions fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ public class OlapScanNode extends ScanNode {
private final List<String> selectedPartitionNames = Lists.newArrayList();
private List<Long> selectedPartitionVersions = Lists.newArrayList();
private final HashSet<Long> scanBackendIds = new HashSet<>();
// The column names applied dict optimization
// used for explain
private final List<String> appliedDictStringColumns = new ArrayList<>();
private final List<String> unUsedOutputStringColumns = new ArrayList<>();
// a bucket seq may map to many tablets, and each tablet has a TScanRangeLocations.
public ArrayListMultimap<Integer, TScanRangeLocations> bucketSeq2locations = ArrayListMultimap.create();
Expand Down Expand Up @@ -309,14 +306,6 @@ public void setBucketExprs(List<Expr> bucketExprs) {
this.bucketExprs = bucketExprs;
}

public void updateAppliedDictStringColumns(Set<Integer> appliedColumnIds) {
for (SlotDescriptor slot : desc.getSlots()) {
if (appliedColumnIds.contains(slot.getId().asInt())) {
appliedDictStringColumns.add(slot.getColumn().getName());
}
}
}

public List<SlotDescriptor> getSlots() {
return desc.getSlots();
}
Expand Down Expand Up @@ -858,16 +847,7 @@ protected String getNodeExplainString(String prefix, TExplainLevel detailLevel)
output.append("\n");
}

if (!appliedDictStringColumns.isEmpty()) {
int maxSize = Math.min(appliedDictStringColumns.size(), 5);
List<String> printList = appliedDictStringColumns.subList(0, maxSize);
String format_template = "dict_col=%s";
if (dictStringIdToIntIds.size() > 5) {
format_template = format_template + "...";
}
output.append(prefix).append(String.format(format_template, Joiner.on(",").join(printList)));
output.append("\n");
}
output.append(explainColumnDict(prefix));
}

if (detailLevel != TExplainLevel.VERBOSE) {
Expand Down
33 changes: 33 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

package com.starrocks.planner;

import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.SlotDescriptor;
Expand All @@ -48,9 +49,11 @@
import com.starrocks.thrift.TScanRangeLocations;
import org.jetbrains.annotations.TestOnly;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand All @@ -64,6 +67,9 @@ public abstract class ScanNode extends PlanNode {
protected DataCacheOptions dataCacheOptions = null;
protected long warehouseId = WarehouseManager.DEFAULT_WAREHOUSE_ID;
protected ScanOptimzeOption scanOptimzeOption;
// The column names applied dict optimization
// used for explain
protected final List<String> appliedDictStringColumns = new ArrayList<>();

public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
super(id, desc.getId().asList(), planNodeName);
Expand Down Expand Up @@ -99,6 +105,18 @@ public ScanOptimzeOption getScanOptimzeOption() {
return scanOptimzeOption;
}

public void updateAppliedDictStringColumns(Set<Integer> appliedColumnIds) {
for (SlotDescriptor slot : desc.getSlots()) {
if (appliedColumnIds.contains(slot.getId().asInt())) {
appliedDictStringColumns.add(slot.getColumn().getName());
}
}
}

public TupleDescriptor getDesc() {
return desc;
}

public String getTableName() {
return desc.getTable().getName();
}
Expand Down Expand Up @@ -183,4 +201,19 @@ public boolean isRunningAsConnectorOperator() {

public void setScanSampleStrategy(RemoteFilesSampleStrategy strategy) {
}

protected String explainColumnDict(String prefix) {
StringBuilder output = new StringBuilder();
if (!appliedDictStringColumns.isEmpty()) {
int maxSize = Math.min(appliedDictStringColumns.size(), 5);
List<String> printList = appliedDictStringColumns.subList(0, maxSize);
String format_template = "dict_col=%s";
if (appliedDictStringColumns.size() > 5) {
format_template = format_template + "...";
}
output.append(prefix).append(String.format(format_template, Joiner.on(",").join(printList)));
output.append("\n");
}
return output.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.starrocks.common.util.AuditStatisticsUtil;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.common.util.RuntimeProfile;
import com.starrocks.connector.exception.GlobalDictNotMatchException;
import com.starrocks.connector.exception.RemoteFileNotFoundException;
import com.starrocks.datacache.DataCacheSelectMetrics;
import com.starrocks.mysql.MysqlCommand;
Expand Down Expand Up @@ -906,6 +907,10 @@ public RowBatch getNext() throws Exception {
throw new RemoteFileNotFoundException(copyStatus.getErrorMsg());
}

if (copyStatus.isGlobalDictNotMatch()) {
throw new GlobalDictNotMatchException(copyStatus.getErrorMsg());
}

if (copyStatus.isRpcError()) {
throw new RpcException("unknown", copyStatus.getErrorMsg());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
package com.starrocks.qe;

import com.google.common.collect.ImmutableSet;
import com.starrocks.analysis.SlotId;
import com.starrocks.catalog.HiveTable;
import com.starrocks.common.Config;
import com.starrocks.common.InternalErrorCode;
import com.starrocks.common.Pair;
import com.starrocks.common.StarRocksException;
import com.starrocks.common.profile.Tracers;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.exception.GlobalDictNotMatchException;
import com.starrocks.connector.exception.RemoteFileNotFoundException;
import com.starrocks.connector.statistics.ConnectorTableColumnKey;
import com.starrocks.planner.HdfsScanNode;
import com.starrocks.planner.ScanNode;
import com.starrocks.rpc.RpcException;
Expand All @@ -31,13 +35,15 @@
import com.starrocks.sql.StatementPlanner;
import com.starrocks.sql.ast.QueryStatement;
import com.starrocks.sql.ast.StatementBase;
import com.starrocks.sql.optimizer.statistics.IRelaxDictManager;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.thrift.TExplainLevel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;

public class ExecuteExceptionHandler {
Expand All @@ -53,6 +59,8 @@ public static void handle(Exception e, RetryContext context) throws Exception {
handleRpcException((RpcException) e, context);
} else if (e instanceof StarRocksException) {
handleUserException((StarRocksException) e, context);
} else if (e instanceof GlobalDictNotMatchException) {
handleGlobalDictNotMatchException((GlobalDictNotMatchException) e, context);
} else {
throw e;
}
Expand Down Expand Up @@ -88,6 +96,36 @@ private static void handleRemoteFileNotFound(RemoteFileNotFoundException e, Retr
Tracers.record(Tracers.Module.EXTERNAL, "HMS.RETRY", String.valueOf(context.retryTime + 1));
}

private static void tryTriggerRefreshDictAsync(GlobalDictNotMatchException e, RetryContext context) {
Pair<Optional<Integer>, Optional<String>> err = e.extract();
if (err.first.isEmpty()) {
return;
}
SlotId slotId = new SlotId(err.first.get());
for (ScanNode scanNode : context.execPlan.getScanNodes()) {
if (scanNode.getDesc().getSlots().stream().anyMatch(x -> x.getId().equals(slotId))) {
String columnName = scanNode.getDesc().getSlot(slotId.asInt()).getColumn().getName();
String tableUUID = scanNode.getDesc().getTable().getUUID();
IRelaxDictManager.getInstance().invalidTemporarily(tableUUID, columnName);
GlobalStateMgr.getCurrentState().getConnectorTableTriggerAnalyzeMgr().
addDictUpdateTask(new ConnectorTableColumnKey(tableUUID, columnName), err.second);
return;
}
}
}

private static void handleGlobalDictNotMatchException(GlobalDictNotMatchException e, RetryContext context)
throws Exception {
// trigger async collect dict
tryTriggerRefreshDictAsync(e, context);

// rerun without low cardinality optimization
ConnectContext connectContext = context.connectContext;
connectContext.getSessionVariable().setUseLowCardinalityOptimizeOnLake(false);
rebuildExecPlan(e, context);
connectContext.getSessionVariable().setUseLowCardinalityOptimizeOnLake(true);
}

private static void handleRpcException(RpcException e, RetryContext context) throws Exception {
// When enable_collect_query_detail_info is set to true, the plan will be recorded in the query detail,
// and hence there is no need to log it here.
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,10 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String CBO_ENABLE_REPLICATED_JOIN = "cbo_enable_replicated_join";
public static final String CBO_USE_CORRELATED_JOIN_ESTIMATE = "cbo_use_correlated_join_estimate";
public static final String ALWAYS_COLLECT_LOW_CARD_DICT = "always_collect_low_card_dict";
public static final String ALWAYS_COLLECT_LOW_CARD_DICT_ON_LAKE = "always_collect_low_card_dict_on_lake";
public static final String CBO_ENABLE_LOW_CARDINALITY_OPTIMIZE = "cbo_enable_low_cardinality_optimize";
public static final String LOW_CARDINALITY_OPTIMIZE_V2 = "low_cardinality_optimize_v2";
public static final String LOW_CARDINALITY_OPTIMIZE_ON_LAKE = "low_cardinality_optimize_on_lake";
public static final String ARRAY_LOW_CARDINALITY_OPTIMIZE = "array_low_cardinality_optimize";
public static final String CBO_USE_NTH_EXEC_PLAN = "cbo_use_nth_exec_plan";
public static final String CBO_CTE_REUSE = "cbo_cte_reuse";
Expand Down Expand Up @@ -1391,12 +1393,18 @@ public static MaterializedViewRewriteMode parse(String str) {
@VariableMgr.VarAttr(name = ALWAYS_COLLECT_LOW_CARD_DICT, flag = VariableMgr.INVISIBLE)
private boolean alwaysCollectDict = false;

@VariableMgr.VarAttr(name = ALWAYS_COLLECT_LOW_CARD_DICT_ON_LAKE, flag = VariableMgr.INVISIBLE)
private boolean alwaysCollectDictOnLake = true;

@VariableMgr.VarAttr(name = CBO_ENABLE_LOW_CARDINALITY_OPTIMIZE)
private boolean enableLowCardinalityOptimize = true;

@VariableMgr.VarAttr(name = LOW_CARDINALITY_OPTIMIZE_V2)
private boolean useLowCardinalityOptimizeV2 = true;

@VariableMgr.VarAttr(name = LOW_CARDINALITY_OPTIMIZE_ON_LAKE)
private boolean useLowCardinalityOptimizeOnLake = true;

@VarAttr(name = ARRAY_LOW_CARDINALITY_OPTIMIZE)
private boolean enableArrayLowCardinalityOptimize = true;

Expand Down Expand Up @@ -3549,6 +3557,10 @@ public boolean isAlwaysCollectDict() {
return alwaysCollectDict;
}

public boolean isAlwaysCollectDictOnLake() {
return alwaysCollectDictOnLake;
}

public boolean isEnableLowCardinalityOptimize() {
return enableLowCardinalityOptimize;
}
Expand All @@ -3561,6 +3573,14 @@ public void setUseLowCardinalityOptimizeV2(boolean useLowCardinalityOptimizeV2)
this.useLowCardinalityOptimizeV2 = useLowCardinalityOptimizeV2;
}

public boolean isUseLowCardinalityOptimizeOnLake() {
return useLowCardinalityOptimizeOnLake;
}

public void setUseLowCardinalityOptimizeOnLake(boolean useLowCardinalityOptimizeOnLake) {
this.useLowCardinalityOptimizeOnLake = useLowCardinalityOptimizeOnLake;
}

public boolean isEnableRewriteGroupingsetsToUnionAll() {
return enableRewriteGroupingSetsToUnionAll;
}
Expand Down
Loading

0 comments on commit 8d5fdef

Please sign in to comment.