From 3c733c44df6c7caabfd68b3fbdd496634573fe27 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 27 Jun 2024 20:12:21 +0800 Subject: [PATCH] [BugFix] Support force cancel refresh materialized view & optimize some task run strategies (backport #46131) (#47584) Signed-off-by: shuming.li Co-authored-by: shuming.li --- .../java/com/starrocks/common/Config.java | 7 + .../connector/CatalogConnectorMetadata.java | 304 ++++++++++++++++++ .../connector/ConnectorMetadata.java | 3 +- .../com/starrocks/qe/DDLStmtExecutor.java | 5 +- .../java/com/starrocks/qe/StmtExecutor.java | 9 +- .../com/starrocks/scheduler/Constants.java | 9 +- .../starrocks/scheduler/ExecuteOption.java | 7 +- .../PartitionBasedMvRefreshProcessor.java | 12 +- .../scheduler/PendingTaskRunFIFOQueue.java | 23 +- .../com/starrocks/scheduler/TaskManager.java | 32 +- .../starrocks/scheduler/TaskRunExecutor.java | 2 + .../starrocks/scheduler/TaskRunManager.java | 21 +- .../starrocks/scheduler/TaskRunScheduler.java | 8 +- .../persist/MVTaskRunExtraMessage.java | 32 +- .../scheduler/persist/TaskRunStatus.java | 15 + .../com/starrocks/server/LocalMetastore.java | 15 +- .../CancelRefreshMaterializedViewStmt.java | 11 +- .../materialization/MvUtils.java | 24 +- .../com/starrocks/sql/parser/AstBuilder.java | 3 +- .../com/starrocks/sql/parser/StarRocks.g4 | 2 +- .../java/com/starrocks/alter/AlterTest.java | 4 +- .../CancelRefreshMaterializedViewTest.java | 13 + .../CatalogConnectorMetadataTest.java | 228 +++++++++++++ .../scheduler/MockTaskRunProcessor.java | 12 + .../starrocks/scheduler/TaskManagerTest.java | 175 ++++++++++ .../persist/MVTaskRunExtraMessageTest.java | 113 +++++++ 26 files changed, 1023 insertions(+), 66 deletions(-) create mode 100644 fe/fe-core/src/main/java/com/starrocks/connector/CatalogConnectorMetadata.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/connector/CatalogConnectorMetadataTest.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessageTest.java diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index cb9ac53a5ce8d0..792d1316bd4315 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -2673,6 +2673,13 @@ public class Config extends ConfigBase { @ConfField(mutable = true, comment = "The default try lock timeout for mv refresh to try base table/mv dbs' lock") public static int mv_refresh_try_lock_timeout_ms = 60 * 1000; + @ConfField(mutable = true, comment = "Whether enable to refresh materialized view in sync mode mergeable or not") + public static boolean enable_mv_refresh_sync_refresh_mergeable = false; + + @ConfField(mutable = true, comment = "The max length for mv task run extra message's values(set/map) to avoid " + + "occupying too much meta memory") + public static int max_mv_task_run_meta_message_values_length = 16; + /** * The refresh partition number when refreshing materialized view at once by default. */ diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/CatalogConnectorMetadata.java b/fe/fe-core/src/main/java/com/starrocks/connector/CatalogConnectorMetadata.java new file mode 100644 index 00000000000000..21376a29920263 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/connector/CatalogConnectorMetadata.java @@ -0,0 +1,304 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.connector; + +import com.google.common.collect.ImmutableList; +import com.starrocks.catalog.Column; +import com.starrocks.catalog.Database; +import com.starrocks.catalog.MaterializedIndexMeta; +import com.starrocks.catalog.PartitionKey; +import com.starrocks.catalog.Table; +import com.starrocks.common.AlreadyExistsException; +import com.starrocks.common.AnalysisException; +import com.starrocks.common.DdlException; +import com.starrocks.common.MetaNotFoundException; +import com.starrocks.common.Pair; +import com.starrocks.common.UserException; +import com.starrocks.common.profile.Tracers; +import com.starrocks.connector.informationschema.InformationSchemaMetadata; +import com.starrocks.credential.CloudConfiguration; +import com.starrocks.sql.ast.AddPartitionClause; +import com.starrocks.sql.ast.AlterMaterializedViewStmt; +import com.starrocks.sql.ast.AlterTableCommentClause; +import com.starrocks.sql.ast.AlterTableStmt; +import com.starrocks.sql.ast.AlterViewStmt; +import com.starrocks.sql.ast.CancelRefreshMaterializedViewStmt; +import com.starrocks.sql.ast.CreateMaterializedViewStatement; +import com.starrocks.sql.ast.CreateMaterializedViewStmt; +import com.starrocks.sql.ast.CreateTableLikeStmt; +import com.starrocks.sql.ast.CreateTableStmt; +import com.starrocks.sql.ast.CreateViewStmt; +import com.starrocks.sql.ast.DropMaterializedViewStmt; +import com.starrocks.sql.ast.DropPartitionClause; +import com.starrocks.sql.ast.DropTableStmt; +import com.starrocks.sql.ast.PartitionRenameClause; +import com.starrocks.sql.ast.RefreshMaterializedViewStatement; +import com.starrocks.sql.ast.TableRenameClause; +import com.starrocks.sql.ast.TruncateTableStmt; +import com.starrocks.sql.optimizer.OptimizerContext; +import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator; +import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator; +import com.starrocks.sql.optimizer.statistics.Statistics; +import com.starrocks.thrift.TSinkCommitInfo; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.starrocks.catalog.system.information.InfoSchemaDb.isInfoSchemaDb; +import static java.util.Objects.requireNonNull; + +// CatalogConnectorMetadata provides a uniform interface to provide normal tables and information schema tables. +// The database name/id is used to route request to specific metadata. +public class CatalogConnectorMetadata implements ConnectorMetadata { + private final ConnectorMetadata normal; + private final ConnectorMetadata informationSchema; + + public CatalogConnectorMetadata(ConnectorMetadata normal, ConnectorMetadata informationSchema) { + requireNonNull(normal, "metadata is null"); + requireNonNull(informationSchema, "infoSchemaDb is null"); + checkArgument(informationSchema instanceof InformationSchemaMetadata); + this.normal = normal; + this.informationSchema = informationSchema; + } + + private ConnectorMetadata metadataOfDb(String dBName) { + if (isInfoSchemaDb(dBName)) { + return informationSchema; + } + return normal; + } + + @Override + public Table.TableType getTableType() { + return normal.getTableType(); + } + + @Override + public List listDbNames() { + return ImmutableList.builder() + .addAll(this.normal.listDbNames()) + .addAll(this.informationSchema.listDbNames()) + .build(); + } + + @Override + public List listTableNames(String dbName) { + ConnectorMetadata metadata = metadataOfDb(dbName); + return metadata.listTableNames(dbName); + } + + @Override + public List listPartitionNames(String databaseName, String tableName) { + return normal.listPartitionNames(databaseName, tableName); + } + + @Override + public List listPartitionNamesByValue(String databaseName, String tableName, + List> partitionValues) { + return normal.listPartitionNamesByValue(databaseName, tableName, partitionValues); + } + + @Override + public Table getTable(String dbName, String tblName) { + ConnectorMetadata metadata = metadataOfDb(dbName); + return metadata.getTable(dbName, tblName); + } + + @Override + public boolean tableExists(String dbName, String tblName) { + ConnectorMetadata metadata = metadataOfDb(dbName); + return metadata.tableExists(dbName, tblName); + } + + @Override + public Pair getMaterializedViewIndex(String dbName, String tblName) { + return normal.getMaterializedViewIndex(dbName, tblName); + } + + @Override + public List getRemoteFileInfos(Table table, List partitionKeys, long snapshotId, + ScalarOperator predicate, List fieldNames, long limit) { + return normal.getRemoteFileInfos(table, partitionKeys, snapshotId, predicate, fieldNames, limit); + } + + @Override + public boolean prepareMetadata(MetaPreparationItem item, Tracers tracers) { + return normal.prepareMetadata(item, tracers); + } + + @Override + public List getPartitions(Table table, List partitionNames) { + return normal.getPartitions(table, partitionNames); + } + + @Override + public Statistics getTableStatistics(OptimizerContext session, Table table, Map columns, + List partitionKeys, ScalarOperator predicate, long limit) { + return normal.getTableStatistics(session, table, columns, partitionKeys, predicate, limit); + } + + @Override + public List getPrunedPartitions(Table table, ScalarOperator predicate, long limit) { + return normal.getPrunedPartitions(table, predicate, limit); + } + + @Override + public void clear() { + normal.clear(); + } + + @Override + public void refreshTable(String srDbName, Table table, List partitionNames, boolean onlyCachedPartitions) { + normal.refreshTable(srDbName, table, partitionNames, onlyCachedPartitions); + } + + @Override + public void createDb(String dbName) throws DdlException, AlreadyExistsException { + normal.createDb(dbName); + } + + @Override + public boolean dbExists(String dbName) { + ConnectorMetadata metadata = metadataOfDb(dbName); + return metadata.dbExists(dbName); + } + + @Override + public void createDb(String dbName, Map properties) throws DdlException, AlreadyExistsException { + normal.createDb(dbName, properties); + } + + @Override + public void dropDb(String dbName, boolean isForceDrop) throws DdlException, MetaNotFoundException { + normal.dropDb(dbName, isForceDrop); + } + + @Override + public Database getDb(String name) { + ConnectorMetadata metadata = metadataOfDb(name); + return metadata.getDb(name); + } + + @Override + public boolean createTable(CreateTableStmt stmt) throws DdlException { + return normal.createTable(stmt); + } + + @Override + public void dropTable(DropTableStmt stmt) throws DdlException { + normal.dropTable(stmt); + } + + @Override + public void finishSink(String dbName, String table, List commitInfos) { + normal.finishSink(dbName, table, commitInfos); + } + + @Override + public void abortSink(String dbName, String table, List commitInfos) { + normal.abortSink(dbName, table, commitInfos); + } + + @Override + public void alterTable(AlterTableStmt stmt) throws UserException { + normal.alterTable(stmt); + } + + @Override + public void renameTable(Database db, Table table, TableRenameClause tableRenameClause) throws DdlException { + normal.renameTable(db, table, tableRenameClause); + } + + @Override + public void alterTableComment(Database db, Table table, AlterTableCommentClause clause) { + normal.alterTableComment(db, table, clause); + } + + @Override + public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlException { + normal.truncateTable(truncateTableStmt); + } + + @Override + public void createTableLike(CreateTableLikeStmt stmt) throws DdlException { + normal.createTableLike(stmt); + } + + @Override + public void addPartitions(Database db, String tableName, AddPartitionClause addPartitionClause) + throws DdlException, AnalysisException { + normal.addPartitions(db, tableName, addPartitionClause); + } + + @Override + public void dropPartition(Database db, Table table, DropPartitionClause clause) throws DdlException { + normal.dropPartition(db, table, clause); + } + + @Override + public void renamePartition(Database db, Table table, PartitionRenameClause renameClause) throws DdlException { + normal.renamePartition(db, table, renameClause); + } + + @Override + public void createMaterializedView(CreateMaterializedViewStmt stmt) throws AnalysisException, DdlException { + normal.createMaterializedView(stmt); + } + + @Override + public void createMaterializedView(CreateMaterializedViewStatement statement) throws DdlException { + normal.createMaterializedView(statement); + } + + @Override + public void dropMaterializedView(DropMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException { + normal.dropMaterializedView(stmt); + } + + @Override + public void alterMaterializedView(AlterMaterializedViewStmt stmt) + throws DdlException, MetaNotFoundException, AnalysisException { + normal.alterMaterializedView(stmt); + } + + @Override + public String refreshMaterializedView(RefreshMaterializedViewStatement refreshMaterializedViewStatement) + throws DdlException, MetaNotFoundException { + return normal.refreshMaterializedView(refreshMaterializedViewStatement); + } + + @Override + public void cancelRefreshMaterializedView( + CancelRefreshMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException { + normal.cancelRefreshMaterializedView(stmt); + } + + @Override + public void createView(CreateViewStmt stmt) throws DdlException { + normal.createView(stmt); + } + + @Override + public void alterView(AlterViewStmt stmt) throws DdlException, UserException { + normal.alterView(stmt); + } + + @Override + public CloudConfiguration getCloudConfiguration() { + return normal.getCloudConfiguration(); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/ConnectorMetadata.java b/fe/fe-core/src/main/java/com/starrocks/connector/ConnectorMetadata.java index d5a108fd93c15b..82fded22e167dd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/ConnectorMetadata.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/ConnectorMetadata.java @@ -33,6 +33,7 @@ import com.starrocks.sql.ast.AlterTableCommentClause; import com.starrocks.sql.ast.AlterTableStmt; import com.starrocks.sql.ast.AlterViewStmt; +import com.starrocks.sql.ast.CancelRefreshMaterializedViewStmt; import com.starrocks.sql.ast.CreateMaterializedViewStatement; import com.starrocks.sql.ast.CreateMaterializedViewStmt; import com.starrocks.sql.ast.CreateTableLikeStmt; @@ -264,7 +265,7 @@ default String refreshMaterializedView(RefreshMaterializedViewStatement refreshM return null; } - default void cancelRefreshMaterializedView(String dbName, String mvName) + default void cancelRefreshMaterializedView(CancelRefreshMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException { } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java index 22fc8a1ea0b95e..d95b415acfde80 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java @@ -314,10 +314,7 @@ public ShowResultSet visitRefreshMaterializedViewStatement(RefreshMaterializedVi public ShowResultSet visitCancelRefreshMaterializedViewStatement(CancelRefreshMaterializedViewStmt stmt, ConnectContext context) { ErrorReport.wrapWithRuntimeException(() -> { - context.getGlobalStateMgr().getLocalMetastore() - .cancelRefreshMaterializedView( - stmt.getMvName().getDb(), - stmt.getMvName().getTbl()); + context.getGlobalStateMgr().getLocalMetastore().cancelRefreshMaterializedView(stmt); }); return null; } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java index 7a042c0a221f68..a1e5dc309c4bef 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java @@ -39,6 +39,7 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; import com.google.gson.Gson; import com.starrocks.analysis.Analyzer; import com.starrocks.analysis.Expr; @@ -1732,7 +1733,8 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { long createTime = System.currentTimeMillis(); long loadedRows = 0; - int filteredRows = 0; + // filteredRows is stored in int64_t in the backend, so use long here. + long filteredRows = 0; long loadedBytes = 0; long jobId = -1; long estimateScanRows = -1; @@ -1849,7 +1851,7 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { loadedRows = Long.parseLong(coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL)); } if (coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) { - filteredRows = Integer.parseInt(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)); + filteredRows = Long.parseLong(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)); } if (coord.getLoadCounters().get(LoadJob.LOADED_BYTES) != null) { @@ -2060,7 +2062,8 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { } sb.append("}"); - context.getState().setOk(loadedRows, filteredRows, sb.toString()); + // filterRows may be overflow when to convert it into int, use `saturatedCast` to avoid overflow + context.getState().setOk(loadedRows, Ints.saturatedCast(filteredRows), sb.toString()); } public String getOriginStmtInString() { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/Constants.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/Constants.java index 5b1a6ae071c3fd..c2e9d0fb92986c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/Constants.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/Constants.java @@ -63,7 +63,14 @@ public enum TaskRunState { RUNNING, // The task run is scheduled into running queue and is running FAILED, // The task run is failed SUCCESS, // The task run is finished successfully - MERGED, // The task run is merged + MERGED; // The task run is merged + + /** + * Whether the task run state is a success state + */ + public boolean isSuccessState() { + return this.equals(TaskRunState.SUCCESS) || this.equals(TaskRunState.MERGED); + } } public static boolean isFinishState(TaskRunState state) { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/ExecuteOption.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/ExecuteOption.java index cef2ff9d10bc7e..0ddadca7342813 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/ExecuteOption.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/ExecuteOption.java @@ -17,6 +17,7 @@ import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; +import com.starrocks.common.Config; import java.util.Map; @@ -64,7 +65,11 @@ public void setPriority(int priority) { public boolean isMergeRedundant() { // If old task run is a sync-mode task, skip to merge it to avoid sync-mode task // hanging after removing it. - return !isSync && mergeRedundant; + if (Config.enable_mv_refresh_sync_refresh_mergeable) { + return mergeRedundant; + } else { + return !isSync && mergeRedundant; + } } public Map getTaskRunProperties() { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java index 22b774195b9c14..a55ac84f4c357d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java @@ -546,6 +546,16 @@ private void changeDefaultConnectContextIfNeeded(ConnectContext mvConnectCtx) { if (!mvProperty.getProperties().containsKey(MV_SESSION_TIMEOUT)) { mvSessionVariable.setQueryTimeoutS(MV_DEFAULT_QUERY_TIMEOUT); } + + // set enable_insert_strict by default + if (!isMVPropertyContains(SessionVariable.ENABLE_INSERT_STRICT)) { + mvSessionVariable.setEnableInsertStrict(false); + } + } + + private boolean isMVPropertyContains(String key) { + String mvKey = PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX + key; + return materializedView.getTableProperty().getProperties().containsKey(mvKey); } private void postProcess() { @@ -1389,7 +1399,7 @@ public void refreshMaterializedView(MvTaskRunContext mvContext, ExecPlan execPla } ctx.setStmtId(new AtomicInteger().incrementAndGet()); ctx.setExecutionId(UUIDUtil.toTUniqueId(ctx.getQueryId())); - ctx.getSessionVariable().setEnableInsertStrict(false); + LOG.info("[QueryId:{}] start to refresh materialized view {}", ctx.getQueryId(), materializedView.getName()); try { executor.handleDMLStmtWithProfile(execPlan, insertStmt); } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/PendingTaskRunFIFOQueue.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/PendingTaskRunFIFOQueue.java index f204a2820287e0..a860c40050677a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/PendingTaskRunFIFOQueue.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/PendingTaskRunFIFOQueue.java @@ -121,21 +121,30 @@ public boolean add(TaskRun taskRun) { /** * Remove a specific task run from the queue. * @param taskRun: task run to remove + * @param state: complete or cancel task run and set it with the state if the task run's future is not null */ - public boolean remove(TaskRun taskRun) { + public boolean remove(TaskRun taskRun, Constants.TaskRunState state) { if (taskRun == null) { return false; } wLock.lock(); try { - // make sure future is canceled. - CompletableFuture future = taskRun.getFuture(); - boolean isCancel = future.cancel(true); - if (!isCancel) { - LOG.warn("fail to cancel scheduler for task [{}]", taskRun); + CompletableFuture future = taskRun.getFuture(); + // make sure the future is canceled or completed + if (future != null) { + if (state != null && state.isSuccessState()) { + boolean isComplete = future.complete(state); + if (!isComplete) { + LOG.warn("fail to complete scheduler for task [{}]", taskRun); + } + } else { + boolean isCancel = future.cancel(true); + if (!isCancel) { + LOG.warn("fail to cancel scheduler for task [{}]", taskRun); + } + } } - // remove it from pending map. removeFromMapUnlock(taskRun); // remove it from pending queue. diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java index e736423c4c0727..d2e10b241dfff0 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java @@ -186,7 +186,7 @@ private void clearUnfinishedTaskRun() { GlobalStateMgr.getCurrentState().getEditLog().logUpdateTaskRun(statusChange); // remove pending task run - taskRunScheduler.removePendingTaskRun(taskRun); + taskRunScheduler.removePendingTaskRun(taskRun, Constants.TaskRunState.FAILED); } // clear running task runs @@ -268,24 +268,22 @@ private boolean stopScheduler(String taskName) { return isCancel; } - public boolean killTask(String taskName, boolean clearPending) { + public boolean killTask(String taskName, boolean force) { Task task = nameToTaskMap.get(taskName); if (task == null) { return false; } - if (clearPending) { - if (!taskRunManager.tryTaskRunLock()) { - return false; - } - try { - taskRunScheduler.removePendingTask(task); - } catch (Exception ex) { - LOG.warn("failed to kill task.", ex); - } finally { - taskRunManager.taskRunUnlock(); - } + if (!taskRunManager.tryTaskRunLock()) { + return false; + } + try { + taskRunScheduler.removePendingTask(task); + } catch (Exception ex) { + LOG.warn("failed to kill task.", ex); + } finally { + taskRunManager.taskRunUnlock(); } - return taskRunManager.killTaskRun(task.getId()); + return taskRunManager.killTaskRun(task.getId(), force); } public SubmitResult executeTask(String taskName) { @@ -335,7 +333,7 @@ public SubmitResult executeTaskSync(Task task, ExecuteOption option) { } try { Constants.TaskRunState taskRunState = taskRun.getFuture().get(); - if (taskRunState != Constants.TaskRunState.SUCCESS) { + if (!taskRunState.isSuccessState()) { String msg = taskRun.getStatus().getErrorMessage(); throw new DmlException("execute task %s failed: %s", task.getName(), msg); } @@ -373,7 +371,7 @@ public void dropTasks(List taskIdList, boolean isReplay) { } periodFutureMap.remove(task.getId()); } - if (!killTask(task.getName(), true)) { + if (!killTask(task.getName(), false)) { LOG.error("kill task failed: " + task.getName()); } idToTaskMap.remove(task.getId()); @@ -756,7 +754,7 @@ public void replayUpdateTaskRun(TaskRunStatusChange statusChange) { return; } // remove it from pending task queue - taskRunScheduler.removePendingTaskRun(pendingTaskRun); + taskRunScheduler.removePendingTaskRun(pendingTaskRun, toStatus); TaskRunStatus status = pendingTaskRun.getStatus(); diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunExecutor.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunExecutor.java index 8883a52355ce4e..fdc89bc1465daf 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunExecutor.java @@ -49,6 +49,8 @@ public boolean executeTaskRun(TaskRun taskRun) { CompletableFuture future = CompletableFuture.supplyAsync(() -> { status.setState(Constants.TaskRunState.RUNNING); + // set process start time + status.setProcessStartTime(System.currentTimeMillis()); try { boolean isSuccess = taskRun.executeTaskRun(); if (isSuccess) { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java index aa304ba8733b99..adbfd55c83bd51 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java @@ -78,16 +78,23 @@ public SubmitResult submitTaskRun(TaskRun taskRun, ExecuteOption option) { return new SubmitResult(queryId, SubmitResult.SubmitStatus.SUBMITTED); } - public boolean killTaskRun(Long taskId) { + public boolean killTaskRun(Long taskId, boolean force) { TaskRun taskRun = taskRunScheduler.getRunningTaskRun(taskId); if (taskRun == null) { return false; } - taskRun.kill(); - ConnectContext runCtx = taskRun.getRunCtx(); - if (runCtx != null) { - runCtx.kill(false); - return true; + try { + taskRun.kill(); + ConnectContext runCtx = taskRun.getRunCtx(); + if (runCtx != null) { + runCtx.kill(false); + return true; + } + } finally { + // if it's force, remove it from running TaskRun map no matter it's killed or not + if (force) { + taskRunScheduler.removeRunningTask(taskRun.getTaskId()); + } } return false; } @@ -161,7 +168,7 @@ public boolean arrangeTaskRun(TaskRun taskRun, boolean isReplay) { GlobalStateMgr.getCurrentState().getEditLog().logUpdateTaskRun(statusChange); // update the state of the old TaskRun to MERGED in LEADER oldTaskRun.getStatus().setState(Constants.TaskRunState.MERGED); - taskRunScheduler.removePendingTaskRun(oldTaskRun); + taskRunScheduler.removePendingTaskRun(oldTaskRun, Constants.TaskRunState.MERGED); taskRunHistory.addHistory(oldTaskRun.getStatus()); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunScheduler.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunScheduler.java index fed6e231b86833..cd74b5ffbdd43f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunScheduler.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunScheduler.java @@ -77,8 +77,12 @@ public boolean addPendingTaskRun(TaskRun taskRun) { return pendingTaskRunQueue.add(taskRun); } - public void removePendingTaskRun(TaskRun taskRun) { - pendingTaskRunQueue.remove(taskRun); + public void removePendingTaskRun(TaskRun taskRun, Constants.TaskRunState state) { + if (taskRun == null) { + return; + } + LOG.info("remove pending task run: {}", taskRun); + pendingTaskRunQueue.remove(taskRun, state); } public void removePendingTask(Task task) { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessage.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessage.java index 02bae5318312e9..14cd368af48b86 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessage.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessage.java @@ -19,10 +19,12 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; +import com.starrocks.common.Config; import com.starrocks.common.io.Text; import com.starrocks.common.io.Writable; import com.starrocks.persist.gson.GsonUtils; import com.starrocks.scheduler.ExecuteOption; +import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils; import org.apache.commons.lang3.StringUtils; import java.io.DataInput; @@ -32,6 +34,7 @@ import java.util.Set; public class MVTaskRunExtraMessage implements Writable { + @SerializedName("forceRefresh") private boolean forceRefresh; @SerializedName("partitionStart") @@ -49,6 +52,11 @@ public class MVTaskRunExtraMessage implements Writable { @SerializedName("basePartitionsToRefreshMap") private Map> basePartitionsToRefreshMap = Maps.newHashMap(); + // task run starts to process time + // NOTE: finishTime - processStartTime = process task run time(exclude pending time) + @SerializedName("processStartTime") + private long processStartTime = 0; + @SerializedName("executeOption") private ExecuteOption executeOption = new ExecuteOption(true); @@ -84,7 +92,8 @@ public Set getMvPartitionsToRefresh() { } public void setMvPartitionsToRefresh(Set mvPartitionsToRefresh) { - this.mvPartitionsToRefresh = mvPartitionsToRefresh; + this.mvPartitionsToRefresh = MvUtils.shrinkToSize(mvPartitionsToRefresh, + Config.max_mv_task_run_meta_message_values_length); } public Map> getBasePartitionsToRefreshMap() { @@ -95,9 +104,10 @@ public Map> getRefBasePartitionsToRefreshMap() { return refBasePartitionsToRefreshMap; } - public void setRefBasePartitionsToRefreshMap( - Map> refBasePartitionsToRefreshMap) { - this.refBasePartitionsToRefreshMap = refBasePartitionsToRefreshMap; + + public void setRefBasePartitionsToRefreshMap(Map> refBasePartitionsToRefreshMap) { + this.refBasePartitionsToRefreshMap = MvUtils.shrinkToSize(refBasePartitionsToRefreshMap, + Config.max_mv_task_run_meta_message_values_length); } public String getMvPartitionsToRefreshString() { @@ -118,9 +128,9 @@ public String getBasePartitionsToRefreshMapString() { } } - public void setBasePartitionsToRefreshMap( - Map> basePartitionsToRefreshMap) { - this.basePartitionsToRefreshMap = basePartitionsToRefreshMap; + public void setBasePartitionsToRefreshMap(Map> basePartitionsToRefreshMap) { + this.basePartitionsToRefreshMap = MvUtils.shrinkToSize(basePartitionsToRefreshMap, + Config.max_mv_task_run_meta_message_values_length); } public ExecuteOption getExecuteOption() { @@ -137,6 +147,14 @@ public static MVTaskRunExtraMessage read(DataInput in) throws IOException { } + public long getProcessStartTime() { + return processStartTime; + } + + public void setProcessStartTime(long processStartTime) { + this.processStartTime = processStartTime; + } + @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/TaskRunStatus.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/TaskRunStatus.java index 14fb24146b3d34..2525d8df46fd2d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/TaskRunStatus.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/TaskRunStatus.java @@ -39,6 +39,9 @@ public class TaskRunStatus implements Writable { @SerializedName("taskName") private String taskName; + @SerializedName("processStartTime") + private long processStartTime; + @SerializedName("createTime") private long createTime; @@ -256,6 +259,18 @@ public void setExtraMessage(String extraMessage) { } } + public long getProcessStartTime() { + return processStartTime; + } + + public void setProcessStartTime(long processStartTime) { + this.processStartTime = processStartTime; + // update process start time in mvTaskRunExtraMessage to display in the web page + if (mvTaskRunExtraMessage != null) { + mvTaskRunExtraMessage.setProcessStartTime(processStartTime); + } + } + public Map getProperties() { return properties; } diff --git a/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java b/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java index a999b4d8572088..1a111cc7a1b145 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java @@ -187,6 +187,7 @@ import com.starrocks.sql.ast.AlterViewStmt; import com.starrocks.sql.ast.AsyncRefreshSchemeDesc; import com.starrocks.sql.ast.CancelAlterTableStmt; +import com.starrocks.sql.ast.CancelRefreshMaterializedViewStmt; import com.starrocks.sql.ast.ColumnDef; import com.starrocks.sql.ast.ColumnRenameClause; import com.starrocks.sql.ast.CreateMaterializedViewStatement; @@ -3503,20 +3504,22 @@ public String refreshMaterializedView(RefreshMaterializedViewStatement refreshMa String dbName = refreshMaterializedViewStatement.getMvName().getDb(); String mvName = refreshMaterializedViewStatement.getMvName().getTbl(); boolean force = refreshMaterializedViewStatement.isForceRefresh(); - PartitionRangeDesc range = - refreshMaterializedViewStatement.getPartitionRangeDesc(); - + PartitionRangeDesc range = refreshMaterializedViewStatement.getPartitionRangeDesc(); return refreshMaterializedView(dbName, mvName, force, range, Constants.TaskRunPriority.HIGH.value(), - false, true, refreshMaterializedViewStatement.isSync()); + Config.enable_mv_refresh_sync_refresh_mergeable, true, refreshMaterializedViewStatement.isSync()); } @Override - public void cancelRefreshMaterializedView(String dbName, String mvName) throws DdlException, MetaNotFoundException { + public void cancelRefreshMaterializedView( + CancelRefreshMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException { + String dbName = stmt.getMvName().getDb(); + String mvName = stmt.getMvName().getTbl(); MaterializedView materializedView = getMaterializedViewToRefresh(dbName, mvName); TaskManager taskManager = GlobalStateMgr.getCurrentState().getTaskManager(); Task refreshTask = taskManager.getTask(TaskBuilder.getMvTaskName(materializedView.getId())); + boolean isForce = stmt.isForce(); if (refreshTask != null) { - taskManager.killTask(refreshTask.getName(), true); + taskManager.killTask(refreshTask.getName(), isForce); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/CancelRefreshMaterializedViewStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/CancelRefreshMaterializedViewStmt.java index bf89423bb376f8..3a53f702734b18 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/ast/CancelRefreshMaterializedViewStmt.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/CancelRefreshMaterializedViewStmt.java @@ -19,20 +19,25 @@ public class CancelRefreshMaterializedViewStmt extends DdlStmt { private final TableName mvName; + private final boolean force; - public CancelRefreshMaterializedViewStmt(TableName mvName) { - this(mvName, NodePosition.ZERO); + public CancelRefreshMaterializedViewStmt(TableName mvName, boolean force) { + this(mvName, force, NodePosition.ZERO); } - public CancelRefreshMaterializedViewStmt(TableName mvName, NodePosition pos) { + public CancelRefreshMaterializedViewStmt(TableName mvName, boolean force, NodePosition pos) { super(pos); this.mvName = mvName; + this.force = force; } public TableName getMvName() { return mvName; } + public boolean isForce() { + return force; + } @Override public R accept(AstVisitor visitor, C context) { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvUtils.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvUtils.java index 5a90b8509d8d11..268da9ce2fb04f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvUtils.java @@ -1126,4 +1126,26 @@ public static boolean isStr2Date(Expr expr) { return expr instanceof FunctionCallExpr && ((FunctionCallExpr) expr).getFnName().getFunction().equalsIgnoreCase(FunctionSet.STR2DATE); } -} + + /** + * Trim the input set if its size is larger than maxLength. + * @return the trimmed set. + */ + public static Set shrinkToSize(Set set, int maxLength) { + if (set != null && set.size() > maxLength) { + return set.stream().limit(maxLength).collect(Collectors.toSet()); + } + return set; + } + + /** + * Trim the input map if its size is larger than maxLength. + * @return the trimmed map. + */ + public static Map> shrinkToSize(Map> map, int maxLength) { + if (map != null && map.size() > maxLength) { + return map.entrySet().stream().limit(maxLength).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + return map; + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java b/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java index 4fdb62e2b0f174..8ec6fbdc520220 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java @@ -1620,7 +1620,8 @@ public ParseNode visitCancelRefreshMaterializedViewStatement( StarRocksParser.CancelRefreshMaterializedViewStatementContext context) { QualifiedName mvQualifiedName = getQualifiedName(context.qualifiedName()); TableName mvName = qualifiedNameToTableName(mvQualifiedName); - return new CancelRefreshMaterializedViewStmt(mvName, createPos(context)); + boolean force = context.FORCE() != null; + return new CancelRefreshMaterializedViewStmt(mvName, force, createPos(context)); } // ------------------------------------------- Catalog Statement --------------------------------------------------- diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 b/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 index b9fdfe63c73009..f6005df738bfe9 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 +++ b/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 @@ -624,7 +624,7 @@ refreshMaterializedViewStatement ; cancelRefreshMaterializedViewStatement - : CANCEL REFRESH MATERIALIZED VIEW mvName=qualifiedName + : CANCEL REFRESH MATERIALIZED VIEW mvName=qualifiedName FORCE? ; // ------------------------------------------- Admin Statement --------------------------------------------------------- diff --git a/fe/fe-core/src/test/java/com/starrocks/alter/AlterTest.java b/fe/fe-core/src/test/java/com/starrocks/alter/AlterTest.java index 1a6ea7e4c96686..22679c14822143 100644 --- a/fe/fe-core/src/test/java/com/starrocks/alter/AlterTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/alter/AlterTest.java @@ -328,9 +328,7 @@ private static void cancelRefreshMaterializedView(String sql, boolean expectedEx CancelRefreshMaterializedViewStmt cancelRefresh = (CancelRefreshMaterializedViewStmt) UtFrameUtils.parseStmtWithNewParser(sql, connectContext); try { - GlobalStateMgr.getCurrentState().getLocalMetastore() - .cancelRefreshMaterializedView(cancelRefresh.getMvName().getDb(), - cancelRefresh.getMvName().getTbl()); + GlobalStateMgr.getCurrentState().getLocalMetastore().cancelRefreshMaterializedView(cancelRefresh); if (expectedException) { Assert.fail(); } diff --git a/fe/fe-core/src/test/java/com/starrocks/analysis/CancelRefreshMaterializedViewTest.java b/fe/fe-core/src/test/java/com/starrocks/analysis/CancelRefreshMaterializedViewTest.java index 268480d3ab748a..74507b82960b1d 100644 --- a/fe/fe-core/src/test/java/com/starrocks/analysis/CancelRefreshMaterializedViewTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/analysis/CancelRefreshMaterializedViewTest.java @@ -40,5 +40,18 @@ public void testNormal() throws Exception { String mvName = cancelRefresh.getMvName().getTbl(); Assert.assertEquals("test1", dbName); Assert.assertEquals("mv1", mvName); + Assert.assertFalse(cancelRefresh.isForce()); + } + + @Test + public void testForceRefreshMaterializedView() throws Exception { + String refreshMvSql = "cancel refresh materialized view test1.mv1 force"; + CancelRefreshMaterializedViewStmt cancelRefresh = + (CancelRefreshMaterializedViewStmt) UtFrameUtils.parseStmtWithNewParser(refreshMvSql, connectContext); + String dbName = cancelRefresh.getMvName().getDb(); + String mvName = cancelRefresh.getMvName().getTbl(); + Assert.assertEquals("test1", dbName); + Assert.assertEquals("mv1", mvName); + Assert.assertTrue(cancelRefresh.isForce()); } } diff --git a/fe/fe-core/src/test/java/com/starrocks/connector/CatalogConnectorMetadataTest.java b/fe/fe-core/src/test/java/com/starrocks/connector/CatalogConnectorMetadataTest.java new file mode 100644 index 00000000000000..2db048ed008923 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/connector/CatalogConnectorMetadataTest.java @@ -0,0 +1,228 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.connector; + +import com.google.common.collect.ImmutableList; +import com.starrocks.catalog.Database; +import com.starrocks.catalog.Table; +import com.starrocks.catalog.system.information.InfoSchemaDb; +import com.starrocks.common.UserException; +import com.starrocks.connector.informationschema.InformationSchemaMetadata; +import com.starrocks.connector.jdbc.MockedJDBCMetadata; +import com.starrocks.sql.ast.CreateMaterializedViewStatement; +import com.starrocks.sql.ast.CreateMaterializedViewStmt; +import mockit.Expectations; +import mockit.Mocked; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class CatalogConnectorMetadataTest { + + private final InformationSchemaMetadata informationSchemaMetadata = new InformationSchemaMetadata("test_catalog"); + + @Test + void testListDbNames(@Mocked ConnectorMetadata connectorMetadata) { + new Expectations() { + { + connectorMetadata.listDbNames(); + result = ImmutableList.of("test_db1", "test_db2"); + times = 1; + } + }; + + CatalogConnectorMetadata catalogConnectorMetadata = new CatalogConnectorMetadata( + connectorMetadata, + informationSchemaMetadata + ); + + List dbNames = catalogConnectorMetadata.listDbNames(); + List expected = ImmutableList.of("test_db1", "test_db2", InfoSchemaDb.DATABASE_NAME); + assertEquals(expected, dbNames); + } + + @Test + void testListTableNames(@Mocked ConnectorMetadata connectorMetadata) { + new Expectations() { + { + connectorMetadata.listTableNames("test_db"); + result = ImmutableList.of("test_tbl1", "test_tbl2"); + times = 1; + } + }; + + CatalogConnectorMetadata catalogConnectorMetadata = new CatalogConnectorMetadata( + connectorMetadata, + informationSchemaMetadata + ); + + List tblNames = catalogConnectorMetadata.listTableNames(InfoSchemaDb.DATABASE_NAME); + List expected = ImmutableList.of("tables", "table_privileges", "referential_constraints", + "key_column_usage", "routines", "schemata", "columns", "character_sets", "collations", + "table_constraints", "engines", "user_privileges", "schema_privileges", "statistics", + "triggers", "events", "views", "partitions", "column_privileges" + ); + assertEquals(expected, tblNames); + + tblNames = catalogConnectorMetadata.listTableNames("test_db"); + expected = ImmutableList.of("test_tbl1", "test_tbl2"); + assertEquals(expected, tblNames); + } + + @Test + void testGetDb(@Mocked ConnectorMetadata connectorMetadata) { + new Expectations() { + { + connectorMetadata.getDb("test_db"); + result = null; + times = 1; + } + }; + + CatalogConnectorMetadata catalogConnectorMetadata = new CatalogConnectorMetadata( + connectorMetadata, + informationSchemaMetadata + ); + + Database db = catalogConnectorMetadata.getDb("test_db"); + assertNull(db); + assertNotNull(catalogConnectorMetadata.getDb(InfoSchemaDb.DATABASE_NAME)); + } + + @Test + void testDbExists(@Mocked ConnectorMetadata connectorMetadata) { + new Expectations() { + { + connectorMetadata.dbExists("test_db"); + result = true; + times = 1; + } + }; + + CatalogConnectorMetadata catalogConnectorMetadata = new CatalogConnectorMetadata( + connectorMetadata, + informationSchemaMetadata + ); + + assertTrue(catalogConnectorMetadata.dbExists("test_db")); + assertTrue(catalogConnectorMetadata.dbExists(InfoSchemaDb.DATABASE_NAME)); + } + + @Test + void testTableExists() { + MockedJDBCMetadata mockedJDBCMetadata = new MockedJDBCMetadata(new HashMap<>()); + assertTrue(mockedJDBCMetadata.tableExists("db1", "tbl1")); + } + + @Test + void testGetTable(@Mocked ConnectorMetadata connectorMetadata) { + new Expectations() { + { + connectorMetadata.getTable("test_db", "test_tbl"); + result = null; + times = 1; + } + }; + + CatalogConnectorMetadata catalogConnectorMetadata = new CatalogConnectorMetadata( + connectorMetadata, + informationSchemaMetadata + ); + + Table table = catalogConnectorMetadata.getTable("test_db", "test_tbl"); + assertNull(table); + assertNotNull(catalogConnectorMetadata.getTable(InfoSchemaDb.DATABASE_NAME, "tables")); + } + + @Test + void testMetadataRouting(@Mocked ConnectorMetadata connectorMetadata) throws UserException { + new Expectations() { + { + // the following methods are always routed to normal metadata + // therefore, we test if the normal metadata is called exactly once per method + times = 1; + + connectorMetadata.clear(); + connectorMetadata.listPartitionNames("test_db", "test_tbl"); + connectorMetadata.dropTable(null); + connectorMetadata.refreshTable("test_db", null, null, false); + connectorMetadata.alterMaterializedView(null); + connectorMetadata.addPartitions(null, null, null); + connectorMetadata.dropPartition(null, null, null); + connectorMetadata.renamePartition(null, null, null); + connectorMetadata.createMaterializedView((CreateMaterializedViewStatement) null); + connectorMetadata.createMaterializedView((CreateMaterializedViewStmt) null); + connectorMetadata.dropMaterializedView(null); + connectorMetadata.alterMaterializedView(null); + connectorMetadata.refreshMaterializedView(null); + connectorMetadata.cancelRefreshMaterializedView(null); + connectorMetadata.createView(null); + connectorMetadata.alterView(null); + connectorMetadata.truncateTable(null); + connectorMetadata.alterTableComment(null, null, null); + connectorMetadata.finishSink("test_db", "test_tbl", null); + connectorMetadata.abortSink("test_db", "test_tbl", null); + connectorMetadata.createTableLike(null); + connectorMetadata.createTable(null); + connectorMetadata.createDb("test_db"); + connectorMetadata.dropDb("test_db", false); + connectorMetadata.getRemoteFileInfos(null, null, 0, null, null, -1); + connectorMetadata.getPartitions(null, null); + connectorMetadata.getMaterializedViewIndex("test_db", "test_tbl"); + connectorMetadata.getTableStatistics(null, null, null, null, null, -1); + } + }; + + CatalogConnectorMetadata catalogConnectorMetadata = new CatalogConnectorMetadata( + connectorMetadata, + informationSchemaMetadata + ); + + catalogConnectorMetadata.clear(); + catalogConnectorMetadata.listPartitionNames("test_db", "test_tbl"); + catalogConnectorMetadata.dropTable(null); + catalogConnectorMetadata.refreshTable("test_db", null, null, false); + catalogConnectorMetadata.alterMaterializedView(null); + catalogConnectorMetadata.addPartitions(null, null, null); + catalogConnectorMetadata.dropPartition(null, null, null); + catalogConnectorMetadata.renamePartition(null, null, null); + catalogConnectorMetadata.createMaterializedView((CreateMaterializedViewStatement) null); + catalogConnectorMetadata.createMaterializedView((CreateMaterializedViewStmt) null); + catalogConnectorMetadata.dropMaterializedView(null); + catalogConnectorMetadata.alterMaterializedView(null); + catalogConnectorMetadata.refreshMaterializedView(null); + catalogConnectorMetadata.cancelRefreshMaterializedView(null); + catalogConnectorMetadata.createView(null); + catalogConnectorMetadata.alterView(null); + catalogConnectorMetadata.truncateTable(null); + catalogConnectorMetadata.alterTableComment(null, null, null); + catalogConnectorMetadata.finishSink("test_db", "test_tbl", null); + catalogConnectorMetadata.abortSink("test_db", "test_tbl", null); + catalogConnectorMetadata.createTableLike(null); + catalogConnectorMetadata.createTable(null); + catalogConnectorMetadata.createDb("test_db"); + catalogConnectorMetadata.dropDb("test_db", false); + catalogConnectorMetadata.getRemoteFileInfos(null, null, 0, null, null, -1); + catalogConnectorMetadata.getPartitions(null, null); + catalogConnectorMetadata.getMaterializedViewIndex("test_db", "test_tbl"); + catalogConnectorMetadata.getTableStatistics(null, null, null, null, null, -1); + } +} diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/MockTaskRunProcessor.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/MockTaskRunProcessor.java index 7c12850fbab416..d263d76a86b9cf 100644 --- a/fe/fe-core/src/test/java/com/starrocks/scheduler/MockTaskRunProcessor.java +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/MockTaskRunProcessor.java @@ -22,9 +22,21 @@ public class MockTaskRunProcessor implements TaskRunProcessor { private static final Logger LOG = LogManager.getLogger(TaskManagerTest.class); + private final long sleepTimeMs; + + public MockTaskRunProcessor() { + this.sleepTimeMs = 0; + } + + public MockTaskRunProcessor(long sleepTime) { + this.sleepTimeMs = sleepTime; + } @Override public void processTaskRun(TaskRunContext context) throws Exception { + if (sleepTimeMs > 0) { + Thread.sleep(sleepTimeMs); + } LOG.info("running a task. currentTime:" + LocalDateTime.now()); } diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/TaskManagerTest.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/TaskManagerTest.java index 71d626979b6462..bc34b2f494229b 100644 --- a/fe/fe-core/src/test/java/com/starrocks/scheduler/TaskManagerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/TaskManagerTest.java @@ -30,6 +30,8 @@ import com.starrocks.utframe.StarRocksAssert; import com.starrocks.utframe.UtFrameUtils; import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; import org.apache.hadoop.util.ThreadUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -615,4 +617,177 @@ public void testTaskRunMergeRedundant1() { Assert.assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingQueueCount()); Assert.assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size()); } + + @Test + public void testTaskEquality() { + Task task1 = new Task("test"); + task1.setDefinition("select 1"); + task1.setId(1); + + TaskRun taskRun1 = TaskRunBuilder + .newBuilder(task1) + .setExecuteOption(DEFAULT_MERGE_OPTION) + .build(); + { + long now = System.currentTimeMillis(); + taskRun1.setTaskId(task1.getId()); + taskRun1.initStatus("1", now + 10); + taskRun1.getStatus().setPriority(0); + } + + TaskRun taskRun2 = TaskRunBuilder + .newBuilder(task1) + .setExecuteOption(DEFAULT_MERGE_OPTION) + .build(); + { + long now = System.currentTimeMillis(); + taskRun2.setTaskId(task1.getId()); + taskRun2.initStatus("1", now + 10); + taskRun2.getStatus().setPriority(0); + Assert.assertFalse(taskRun1.equals(taskRun2)); + } + + { + long now = System.currentTimeMillis(); + taskRun2.setTaskId(task1.getId()); + taskRun2.initStatus("2", now + 10); + taskRun2.getStatus().setPriority(10); + Assert.assertFalse(taskRun1.equals(taskRun2)); + } + { + long now = System.currentTimeMillis(); + taskRun2.setTaskId(task1.getId()); + taskRun2.initStatus("2", now + 10); + taskRun2.getStatus().setPriority(10); + taskRun2.setExecuteOption(DEFAULT_NO_MERGE_OPTION); + Assert.assertFalse(taskRun1.equals(taskRun2)); + } + + { + long now = System.currentTimeMillis(); + taskRun2.setTaskId(2); + taskRun2.initStatus("2", now + 10); + taskRun2.getStatus().setPriority(10); + taskRun2.setExecuteOption(DEFAULT_NO_MERGE_OPTION); + Assert.assertFalse(taskRun1.equals(taskRun2)); + } + + { + long now = System.currentTimeMillis(); + taskRun2.setTaskId(task1.getId()); + taskRun2.initStatus("2", now + 10); + taskRun2.getStatus().setPriority(10); + try { + Field taskRunId = taskRun2.getClass().getDeclaredField("taskRunId"); + taskRunId.setAccessible(true); + taskRunId.set(taskRun2, taskRun1.getTaskRunId()); + } catch (Exception e) { + Assert.fail(); + } + Assert.assertTrue(taskRun1.equals(taskRun2)); + } + + { + Map map1 = Maps.newHashMap(); + map1.put(task1.getId(), taskRun1); + Map map2 = Maps.newHashMap(); + map2.put(task1.getId(), taskRun1); + Assert.assertTrue(map1.equals(map2)); + Map map3 = ImmutableMap.copyOf(map1); + Assert.assertTrue(map1.equals(map3)); + Assert.assertTrue(map1.get(task1.getId()).equals(map3.get(task1.getId()))); + } + } + + @Test + public void testSyncRefreshWithoutMergeable() { + Config.enable_mv_refresh_sync_refresh_mergeable = false; + TaskManager tm = new TaskManager(); + TaskRunScheduler taskRunScheduler = tm.getTaskRunScheduler(); + for (int i = 0; i < 10; i++) { + Task task = new Task("test"); + task.setDefinition("select 1"); + TaskRun taskRun = makeTaskRun(1, task, makeExecuteOption(true, true)); + taskRun.setProcessor(new MockTaskRunProcessor(5000)); + tm.getTaskRunManager().submitTaskRun(taskRun, taskRun.getExecuteOption()); + } + long pendingTaskRunsCount = taskRunScheduler.getPendingQueueCount(); + Assert.assertEquals(pendingTaskRunsCount, 10); + } + + @Test + public void testSyncRefreshWithMergeable1() { + TaskManager tm = new TaskManager(); + TaskRunScheduler taskRunScheduler = tm.getTaskRunScheduler(); + Config.enable_mv_refresh_sync_refresh_mergeable = true; + for (int i = 0; i < 10; i++) { + Task task = new Task("test"); + task.setDefinition("select 1"); + TaskRun taskRun = makeTaskRun(1, task, makeExecuteOption(true, true)); + taskRun.setProcessor(new MockTaskRunProcessor(5000)); + tm.getTaskRunManager().submitTaskRun(taskRun, taskRun.getExecuteOption()); + } + long pendingTaskRunsCount = taskRunScheduler.getPendingQueueCount(); + Assert.assertTrue(pendingTaskRunsCount == 1); + Config.enable_mv_refresh_sync_refresh_mergeable = false; + } + + @Test + public void testSyncRefreshWithMergeable2() { + TaskManager tm = new TaskManager(); + TaskRunScheduler taskRunScheduler = tm.getTaskRunScheduler(); + Config.enable_mv_refresh_sync_refresh_mergeable = true; + for (int i = 0; i < 10; i++) { + Task task = new Task("test"); + task.setDefinition("select 1"); + TaskRun taskRun = makeTaskRun(1, task, makeExecuteOption(true, true)); + taskRun.setProcessor(new MockTaskRunProcessor(5000)); + tm.getTaskRunManager().submitTaskRun(taskRun, taskRun.getExecuteOption()); + taskRunScheduler.scheduledPendingTaskRun(t -> { + try { + t.getProcessor().postTaskRun(null); + } catch (Exception e) { + Assert.fail("Process task run failed:" + e); + } + }); + } + long pendingTaskRunsCount = taskRunScheduler.getPendingQueueCount(); + Assert.assertTrue(pendingTaskRunsCount == 1); + Config.enable_mv_refresh_sync_refresh_mergeable = false; + } + + @Test + public void testKillTaskRun() { + TaskManager tm = new TaskManager(); + TaskRunScheduler taskRunScheduler = tm.getTaskRunScheduler(); + for (int i = 0; i < 10; i++) { + Task task = new Task("test"); + task.setDefinition("select 1"); + TaskRun taskRun = makeTaskRun(1, task, makeExecuteOption(true, false)); + taskRun.setProcessor(new MockTaskRunProcessor(5000)); + tm.getTaskRunManager().submitTaskRun(taskRun, taskRun.getExecuteOption()); + } + taskRunScheduler.scheduledPendingTaskRun(taskRun -> { + try { + taskRun.getProcessor().postTaskRun(null); + } catch (Exception e) { + Assert.fail("Process task run failed:" + e); + } + }); + long runningTaskRunsCount = taskRunScheduler.getRunningTaskCount(); + Assert.assertEquals(1, runningTaskRunsCount); + + new MockUp() { + @Mock + public ConnectContext getRunCtx() { + return null; + } + }; + // running task run will not be removed if force kill is false + TaskRunManager taskRunManager = tm.getTaskRunManager(); + taskRunManager.killTaskRun(1L, false); + Assert.assertEquals(1, taskRunScheduler.getRunningTaskCount()); + taskRunManager.killTaskRun(1L, true); + Assert.assertEquals(0, taskRunScheduler.getRunningTaskCount()); + } } diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessageTest.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessageTest.java new file mode 100644 index 00000000000000..2c704687f80893 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessageTest.java @@ -0,0 +1,113 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +package com.starrocks.scheduler.persist; + +import com.google.common.collect.Maps; +import com.starrocks.common.Config; +import org.assertj.core.util.Sets; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; +import java.util.Set; + +public class MVTaskRunExtraMessageTest { + @Test + public void testMessageWithNormalMVPartitionsToRefresh() { + MVTaskRunExtraMessage extraMessage = new MVTaskRunExtraMessage(); + Set mvPartitionsToRefresh = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + mvPartitionsToRefresh.add("partition" + i); + } + extraMessage.setMvPartitionsToRefresh(mvPartitionsToRefresh); + Assert.assertTrue(extraMessage.getMvPartitionsToRefresh().size() == + 10); + } + + @Test + public void testMessageWithTooLongMVPartitionsToRefresh() { + MVTaskRunExtraMessage extraMessage = new MVTaskRunExtraMessage(); + Set mvPartitionsToRefresh = Sets.newHashSet(); + for (int i = 0; i < 100; i++) { + mvPartitionsToRefresh.add("partition" + i); + } + extraMessage.setMvPartitionsToRefresh(mvPartitionsToRefresh); + Assert.assertTrue(extraMessage.getMvPartitionsToRefresh().size() == + Config.max_mv_task_run_meta_message_values_length); + } + + @Test + public void testMessageWithNormalRefBasePartitionsToRefreshMap() { + Map> refBasePartitionsToRefreshMap = Maps.newHashMap(); + for (int i = 0; i < 15; i++) { + Set partitions = Sets.newHashSet(); + for (int j = 0; j < 10; j++) { + partitions.add("partition" + j); + } + refBasePartitionsToRefreshMap.put("table" + i, partitions); + } + MVTaskRunExtraMessage message = new MVTaskRunExtraMessage(); + message.setRefBasePartitionsToRefreshMap(refBasePartitionsToRefreshMap); + Assert.assertTrue(message.getRefBasePartitionsToRefreshMap().size() == 15); + } + + @Test + public void testMessageWithTooLongRefBasePartitionsToRefreshMap() { + Map> refBasePartitionsToRefreshMap = Maps.newHashMap(); + for (int i = 0; i < 100; i++) { + Set partitions = Sets.newHashSet(); + for (int j = 0; j < 10; j++) { + partitions.add("partition" + j); + } + refBasePartitionsToRefreshMap.put("table" + i, partitions); + } + MVTaskRunExtraMessage message = new MVTaskRunExtraMessage(); + message.setRefBasePartitionsToRefreshMap(refBasePartitionsToRefreshMap); + Assert.assertTrue(message.getRefBasePartitionsToRefreshMap().size() == + Config.max_mv_task_run_meta_message_values_length); + } + + @Test + public void testMessageWithNormalBasePartitionsToRefreshMap() { + Map> basePartitionsToRefreshMap = Maps.newHashMap(); + for (int i = 0; i < 15; i++) { + Set partitions = Sets.newHashSet(); + for (int j = 0; j < 10; j++) { + partitions.add("partition" + j); + } + basePartitionsToRefreshMap.put("table" + i, partitions); + } + MVTaskRunExtraMessage message = new MVTaskRunExtraMessage(); + message.setBasePartitionsToRefreshMap(basePartitionsToRefreshMap); + Assert.assertTrue(message.getBasePartitionsToRefreshMap().size() == 15); + } + + @Test + public void testMessageWithTooLongBasePartitionsToRefreshMap() { + Map> basePartitionsToRefreshMap = Maps.newHashMap(); + for (int i = 0; i < 100; i++) { + Set partitions = Sets.newHashSet(); + for (int j = 0; j < 10; j++) { + partitions.add("partition" + j); + } + basePartitionsToRefreshMap.put("table" + i, partitions); + } + MVTaskRunExtraMessage message = new MVTaskRunExtraMessage(); + message.setBasePartitionsToRefreshMap(basePartitionsToRefreshMap); + Assert.assertTrue(message.getBasePartitionsToRefreshMap().size() == + Config.max_mv_task_run_meta_message_values_length); + } +}