diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index 9883aa68bba11..e84c360d74b42 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -2836,6 +2836,13 @@ public class Config extends ConfigBase { @ConfField(mutable = true, comment = "The default try lock timeout for mv refresh to try base table/mv dbs' lock") public static int mv_refresh_try_lock_timeout_ms = 30 * 1000; + @ConfField(mutable = true, comment = "Whether enable to refresh materialized view in sync mode mergeable or not") + public static boolean enable_mv_refresh_sync_refresh_mergeable = false; + + @ConfField(mutable = true, comment = "The max length for mv task run extra message's values(set/map) to avoid " + + "occupying too much meta memory") + public static int max_mv_task_run_meta_message_values_length = 16; + /** * The refresh partition number when refreshing materialized view at once by default. */ diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/CatalogConnectorMetadata.java b/fe/fe-core/src/main/java/com/starrocks/connector/CatalogConnectorMetadata.java index 02a43a70f4ec6..e18478999755b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/CatalogConnectorMetadata.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/CatalogConnectorMetadata.java @@ -36,6 +36,7 @@ import com.starrocks.sql.ast.AlterTableCommentClause; import com.starrocks.sql.ast.AlterTableStmt; import com.starrocks.sql.ast.AlterViewStmt; +import com.starrocks.sql.ast.CancelRefreshMaterializedViewStmt; import com.starrocks.sql.ast.CreateMaterializedViewStatement; import com.starrocks.sql.ast.CreateMaterializedViewStmt; import com.starrocks.sql.ast.CreateTableLikeStmt; @@ -304,8 +305,9 @@ public String refreshMaterializedView(RefreshMaterializedViewStatement refreshMa } @Override - public void cancelRefreshMaterializedView(String dbName, String mvName) throws DdlException, MetaNotFoundException { - normal.cancelRefreshMaterializedView(dbName, mvName); + public void cancelRefreshMaterializedView( + CancelRefreshMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException { + normal.cancelRefreshMaterializedView(stmt); } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/ConnectorMetadata.java b/fe/fe-core/src/main/java/com/starrocks/connector/ConnectorMetadata.java index b2b0cca332700..1809e906d6116 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/ConnectorMetadata.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/ConnectorMetadata.java @@ -35,6 +35,7 @@ import com.starrocks.sql.ast.AlterTableCommentClause; import com.starrocks.sql.ast.AlterTableStmt; import com.starrocks.sql.ast.AlterViewStmt; +import com.starrocks.sql.ast.CancelRefreshMaterializedViewStmt; import com.starrocks.sql.ast.CreateMaterializedViewStatement; import com.starrocks.sql.ast.CreateMaterializedViewStmt; import com.starrocks.sql.ast.CreateTableLikeStmt; @@ -313,7 +314,7 @@ default String refreshMaterializedView(RefreshMaterializedViewStatement refreshM return null; } - default void cancelRefreshMaterializedView(String dbName, String mvName) + default void cancelRefreshMaterializedView(CancelRefreshMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException { } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java index 4b9c6e266af3f..26173bae020fe 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java @@ -383,10 +383,7 @@ public ShowResultSet visitRefreshMaterializedViewStatement(RefreshMaterializedVi public ShowResultSet visitCancelRefreshMaterializedViewStatement(CancelRefreshMaterializedViewStmt stmt, ConnectContext context) { ErrorReport.wrapWithRuntimeException(() -> { - context.getGlobalStateMgr().getLocalMetastore() - .cancelRefreshMaterializedView( - stmt.getMvName().getDb(), - stmt.getMvName().getTbl()); + context.getGlobalStateMgr().getLocalMetastore().cancelRefreshMaterializedView(stmt); }); return null; } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java index fac27453c2a3a..ba2477e6f1589 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java @@ -39,6 +39,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; import com.google.gson.Gson; import com.starrocks.analysis.Expr; import com.starrocks.analysis.HintNode; @@ -2009,7 +2010,8 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { long createTime = System.currentTimeMillis(); long loadedRows = 0; - int filteredRows = 0; + // filteredRows is stored in int64_t in the backend, so use long here. + long filteredRows = 0; long loadedBytes = 0; long jobId = -1; long estimateScanRows = -1; @@ -2131,7 +2133,7 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { loadedRows = Long.parseLong(coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL)); } if (coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) { - filteredRows = Integer.parseInt(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)); + filteredRows = Long.parseLong(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)); } if (coord.getLoadCounters().get(LoadJob.LOADED_BYTES) != null) { @@ -2373,7 +2375,8 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { sb.append("}"); } - context.getState().setOk(loadedRows, filteredRows, sb.toString()); + // filterRows may be overflow when to convert it into int, use `saturatedCast` to avoid overflow + context.getState().setOk(loadedRows, Ints.saturatedCast(filteredRows), sb.toString()); } public String getOriginStmtInString() { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/Constants.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/Constants.java index 49c376cfa85f4..82218d4a348ba 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/Constants.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/Constants.java @@ -65,7 +65,14 @@ public enum TaskRunState { RUNNING, // The task run is scheduled into running queue and is running FAILED, // The task run is failed SUCCESS, // The task run is finished successfully - MERGED, // The task run is merged + MERGED; // The task run is merged + + /** + * Whether the task run state is a success state + */ + public boolean isSuccessState() { + return this.equals(TaskRunState.SUCCESS) || this.equals(TaskRunState.MERGED); + } } public static boolean isFinishState(TaskRunState state) { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/ExecuteOption.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/ExecuteOption.java index e529738dca5dc..537bd9c0d3c19 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/ExecuteOption.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/ExecuteOption.java @@ -17,6 +17,7 @@ import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; +import com.starrocks.common.Config; import com.starrocks.persist.gson.GsonUtils; import java.util.Map; @@ -67,7 +68,11 @@ public void setPriority(int priority) { public boolean isMergeRedundant() { // If old task run is a sync-mode task, skip to merge it to avoid sync-mode task // hanging after removing it. - return !isSync && isMergeRedundant; + if (Config.enable_mv_refresh_sync_refresh_mergeable) { + return isMergeRedundant; + } else { + return !isSync && isMergeRedundant; + } } public Map getTaskRunProperties() { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java index 856adc3ea2a93..92a27fa8ebafd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java @@ -536,6 +536,20 @@ private void changeDefaultConnectContextIfNeeded(ConnectContext mvConnectCtx) { if (!mvProperty.getProperties().containsKey(MV_SESSION_TIMEOUT)) { mvSessionVariable.setQueryTimeoutS(MV_DEFAULT_QUERY_TIMEOUT); } + + // set enable_insert_strict by default + if (!isMVPropertyContains(SessionVariable.ENABLE_INSERT_STRICT)) { + mvSessionVariable.setEnableInsertStrict(false); + } + // enable profile by default for mv refresh task + if (!isMVPropertyContains(SessionVariable.ENABLE_PROFILE)) { + mvSessionVariable.setEnableProfile(true); + } + } + + private boolean isMVPropertyContains(String key) { + String mvKey = PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX + key; + return materializedView.getTableProperty().getProperties().containsKey(mvKey); } private void postProcess() { @@ -1198,7 +1212,6 @@ public void refreshMaterializedView(MvTaskRunContext mvContext, ExecPlan execPla parentStmtExecutor.registerSubStmtExecutor(executor); } ctx.setStmtId(STMT_ID_GENERATOR.incrementAndGet()); - ctx.getSessionVariable().setEnableInsertStrict(false); LOG.info("[QueryId:{}] start to refresh materialized view {}", ctx.getQueryId(), materializedView.getName()); try { executor.handleDMLStmtWithProfile(execPlan, insertStmt); diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java index b3b8ec4e9a25f..d588c2f2262c8 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java @@ -188,7 +188,7 @@ private void clearUnfinishedTaskRun() { GlobalStateMgr.getCurrentState().getEditLog().logUpdateTaskRun(statusChange); // remove pending task run - taskRunScheduler.removePendingTaskRun(taskRun); + taskRunScheduler.removePendingTaskRun(taskRun, Constants.TaskRunState.FAILED); } // clear running task runs @@ -270,24 +270,22 @@ private boolean stopScheduler(String taskName) { return isCancel; } - public boolean killTask(String taskName, boolean clearPending) { + public boolean killTask(String taskName, boolean force) { Task task = nameToTaskMap.get(taskName); if (task == null) { return false; } - if (clearPending) { - if (!taskRunManager.tryTaskRunLock()) { - return false; - } - try { - taskRunScheduler.removePendingTask(task); - } catch (Exception ex) { - LOG.warn("failed to kill task.", ex); - } finally { - taskRunManager.taskRunUnlock(); - } + if (!taskRunManager.tryTaskRunLock()) { + return false; + } + try { + taskRunScheduler.removePendingTask(task); + } catch (Exception ex) { + LOG.warn("failed to kill task.", ex); + } finally { + taskRunManager.taskRunUnlock(); } - return taskRunManager.killTaskRun(task.getId()); + return taskRunManager.killTaskRun(task.getId(), force); } public SubmitResult executeTask(String taskName) { @@ -337,7 +335,7 @@ public SubmitResult executeTaskSync(Task task, ExecuteOption option) { try { taskRunScheduler.addSyncRunningTaskRun(taskRun); Constants.TaskRunState taskRunState = taskRun.getFuture().get(); - if (taskRunState != Constants.TaskRunState.SUCCESS) { + if (!taskRunState.isSuccessState()) { String msg = taskRun.getStatus().getErrorMessage(); throw new DmlException("execute task %s failed: %s", task.getName(), msg); } @@ -377,7 +375,7 @@ public void dropTasks(List taskIdList, boolean isReplay) { } periodFutureMap.remove(task.getId()); } - if (!killTask(task.getName(), true)) { + if (!killTask(task.getName(), false)) { LOG.error("kill task failed: " + task.getName()); } idToTaskMap.remove(task.getId()); @@ -821,7 +819,7 @@ public void replayUpdateTaskRun(TaskRunStatusChange statusChange) { return; } // remove it from pending task queue - taskRunScheduler.removePendingTaskRun(pendingTaskRun); + taskRunScheduler.removePendingTaskRun(pendingTaskRun, toStatus); TaskRunStatus status = pendingTaskRun.getStatus(); if (toStatus == Constants.TaskRunState.RUNNING) { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunFIFOQueue.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunFIFOQueue.java index 60ed6e5aa4f62..261c047a4dc19 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunFIFOQueue.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunFIFOQueue.java @@ -127,21 +127,30 @@ public boolean add(TaskRun taskRun) { /** * Remove a specific task run from the queue. * @param taskRun: task run to remove + * @param state: complete or cancel task run and set it with the state if the task run's future is not null */ - public boolean remove(TaskRun taskRun) { + public boolean remove(TaskRun taskRun, Constants.TaskRunState state) { if (taskRun == null) { return false; } wLock.lock(); try { - // make sure future is canceled. - CompletableFuture future = taskRun.getFuture(); - boolean isCancel = future.cancel(true); - if (!isCancel) { - LOG.warn("fail to cancel scheduler for task [{}]", taskRun); + CompletableFuture future = taskRun.getFuture(); + // make sure the future is canceled or completed + if (future != null) { + if (state != null && state.isSuccessState()) { + boolean isComplete = future.complete(state); + if (!isComplete) { + LOG.warn("fail to complete scheduler for task [{}]", taskRun); + } + } else { + boolean isCancel = future.cancel(true); + if (!isCancel) { + LOG.warn("fail to cancel scheduler for task [{}]", taskRun); + } + } } - // remove it from pending map. removeFromMapUnlock(taskRun); // remove it from pending queue. diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java index 61388c545cb83..e0a5159d7fd59 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java @@ -77,16 +77,23 @@ public SubmitResult submitTaskRun(TaskRun taskRun, ExecuteOption option) { return new SubmitResult(queryId, SubmitResult.SubmitStatus.SUBMITTED, taskRun.getFuture()); } - public boolean killTaskRun(Long taskId) { + public boolean killTaskRun(Long taskId, boolean force) { TaskRun taskRun = taskRunScheduler.getRunningTaskRun(taskId); if (taskRun == null) { return false; } - taskRun.kill(); - ConnectContext runCtx = taskRun.getRunCtx(); - if (runCtx != null) { - runCtx.kill(false, "kill TaskRun"); - return true; + try { + taskRun.kill(); + ConnectContext runCtx = taskRun.getRunCtx(); + if (runCtx != null) { + runCtx.kill(false, "kill TaskRun"); + return true; + } + } finally { + // if it's force, remove it from running TaskRun map no matter it's killed or not + if (force) { + taskRunScheduler.removeRunningTask(taskRun.getTaskId()); + } } return false; } @@ -160,7 +167,7 @@ public boolean arrangeTaskRun(TaskRun taskRun, boolean isReplay) { GlobalStateMgr.getCurrentState().getEditLog().logUpdateTaskRun(statusChange); // update the state of the old TaskRun to MERGED in LEADER oldTaskRun.getStatus().setState(Constants.TaskRunState.MERGED); - taskRunScheduler.removePendingTaskRun(oldTaskRun); + taskRunScheduler.removePendingTaskRun(oldTaskRun, Constants.TaskRunState.MERGED); taskRunHistory.addHistory(oldTaskRun.getStatus()); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunScheduler.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunScheduler.java index 6f55cc20227f6..54cdbd74cd97a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunScheduler.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunScheduler.java @@ -82,12 +82,12 @@ public boolean addPendingTaskRun(TaskRun taskRun) { return pendingTaskRunQueue.add(taskRun); } - public void removePendingTaskRun(TaskRun taskRun) { + public void removePendingTaskRun(TaskRun taskRun, Constants.TaskRunState state) { if (taskRun == null) { return; } LOG.info("remove pending task run: {}", taskRun); - pendingTaskRunQueue.remove(taskRun); + pendingTaskRunQueue.remove(taskRun, state); } public void removePendingTask(Task task) { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessage.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessage.java index 33143df47bd9c..5e08efe941007 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessage.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessage.java @@ -19,10 +19,12 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; +import com.starrocks.common.Config; import com.starrocks.common.io.Text; import com.starrocks.common.io.Writable; import com.starrocks.persist.gson.GsonUtils; import com.starrocks.scheduler.ExecuteOption; +import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils; import org.apache.commons.lang3.StringUtils; import java.io.DataInput; @@ -32,6 +34,7 @@ import java.util.Set; public class MVTaskRunExtraMessage implements Writable { + @SerializedName("forceRefresh") private boolean forceRefresh; @SerializedName("partitionStart") @@ -54,6 +57,11 @@ public class MVTaskRunExtraMessage implements Writable { @SerializedName("nextPartitionEnd") private String nextPartitionEnd; + // task run starts to process time + // NOTE: finishTime - processStartTime = process task run time(exclude pending time) + @SerializedName("processStartTime") + private long processStartTime = 0; + @SerializedName("executeOption") private ExecuteOption executeOption = new ExecuteOption(true); @@ -89,7 +97,8 @@ public Set getMvPartitionsToRefresh() { } public void setMvPartitionsToRefresh(Set mvPartitionsToRefresh) { - this.mvPartitionsToRefresh = mvPartitionsToRefresh; + this.mvPartitionsToRefresh = MvUtils.shrinkToSize(mvPartitionsToRefresh, + Config.max_mv_task_run_meta_message_values_length); } public Map> getBasePartitionsToRefreshMap() { @@ -100,9 +109,10 @@ public Map> getRefBasePartitionsToRefreshMap() { return refBasePartitionsToRefreshMap; } - public void setRefBasePartitionsToRefreshMap( - Map> refBasePartitionsToRefreshMap) { - this.refBasePartitionsToRefreshMap = refBasePartitionsToRefreshMap; + + public void setRefBasePartitionsToRefreshMap(Map> refBasePartitionsToRefreshMap) { + this.refBasePartitionsToRefreshMap = MvUtils.shrinkToSize(refBasePartitionsToRefreshMap, + Config.max_mv_task_run_meta_message_values_length); } public String getMvPartitionsToRefreshString() { @@ -123,9 +133,9 @@ public String getBasePartitionsToRefreshMapString() { } } - public void setBasePartitionsToRefreshMap( - Map> basePartitionsToRefreshMap) { - this.basePartitionsToRefreshMap = basePartitionsToRefreshMap; + public void setBasePartitionsToRefreshMap(Map> basePartitionsToRefreshMap) { + this.basePartitionsToRefreshMap = MvUtils.shrinkToSize(basePartitionsToRefreshMap, + Config.max_mv_task_run_meta_message_values_length); } public static MVTaskRunExtraMessage read(DataInput in) throws IOException { @@ -157,6 +167,14 @@ public void setNextPartitionEnd(String nextPartitionEnd) { this.nextPartitionEnd = nextPartitionEnd; } + public long getProcessStartTime() { + return processStartTime; + } + + public void setProcessStartTime(long processStartTime) { + this.processStartTime = processStartTime; + } + @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/TaskRunStatus.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/TaskRunStatus.java index 741537e9ab484..f967c4d77f6ea 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/TaskRunStatus.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/persist/TaskRunStatus.java @@ -312,6 +312,10 @@ public long getProcessStartTime() { public void setProcessStartTime(long processStartTime) { this.processStartTime = processStartTime; + // update process start time in mvTaskRunExtraMessage to display in the web page + if (mvTaskRunExtraMessage != null) { + mvTaskRunExtraMessage.setProcessStartTime(processStartTime); + } } public Map getProperties() { diff --git a/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java b/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java index 2a99a306d17f7..db236a401f5a5 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java @@ -198,6 +198,7 @@ import com.starrocks.sql.ast.AlterViewStmt; import com.starrocks.sql.ast.AsyncRefreshSchemeDesc; import com.starrocks.sql.ast.CancelAlterTableStmt; +import com.starrocks.sql.ast.CancelRefreshMaterializedViewStmt; import com.starrocks.sql.ast.ColumnRenameClause; import com.starrocks.sql.ast.CreateMaterializedViewStatement; import com.starrocks.sql.ast.CreateMaterializedViewStmt; @@ -3374,20 +3375,22 @@ public String refreshMaterializedView(RefreshMaterializedViewStatement refreshMa String dbName = refreshMaterializedViewStatement.getMvName().getDb(); String mvName = refreshMaterializedViewStatement.getMvName().getTbl(); boolean force = refreshMaterializedViewStatement.isForceRefresh(); - PartitionRangeDesc range = - refreshMaterializedViewStatement.getPartitionRangeDesc(); - + PartitionRangeDesc range = refreshMaterializedViewStatement.getPartitionRangeDesc(); return refreshMaterializedView(dbName, mvName, force, range, Constants.TaskRunPriority.HIGH.value(), - false, true, refreshMaterializedViewStatement.isSync()); + Config.enable_mv_refresh_sync_refresh_mergeable, true, refreshMaterializedViewStatement.isSync()); } @Override - public void cancelRefreshMaterializedView(String dbName, String mvName) throws DdlException, MetaNotFoundException { + public void cancelRefreshMaterializedView( + CancelRefreshMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException { + String dbName = stmt.getMvName().getDb(); + String mvName = stmt.getMvName().getTbl(); MaterializedView materializedView = getMaterializedViewToRefresh(dbName, mvName); TaskManager taskManager = GlobalStateMgr.getCurrentState().getTaskManager(); Task refreshTask = taskManager.getTask(TaskBuilder.getMvTaskName(materializedView.getId())); + boolean isForce = stmt.isForce(); if (refreshTask != null) { - taskManager.killTask(refreshTask.getName(), true); + taskManager.killTask(refreshTask.getName(), isForce); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/CancelRefreshMaterializedViewStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/CancelRefreshMaterializedViewStmt.java index bf89423bb376f..3a53f702734b1 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/ast/CancelRefreshMaterializedViewStmt.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/CancelRefreshMaterializedViewStmt.java @@ -19,20 +19,25 @@ public class CancelRefreshMaterializedViewStmt extends DdlStmt { private final TableName mvName; + private final boolean force; - public CancelRefreshMaterializedViewStmt(TableName mvName) { - this(mvName, NodePosition.ZERO); + public CancelRefreshMaterializedViewStmt(TableName mvName, boolean force) { + this(mvName, force, NodePosition.ZERO); } - public CancelRefreshMaterializedViewStmt(TableName mvName, NodePosition pos) { + public CancelRefreshMaterializedViewStmt(TableName mvName, boolean force, NodePosition pos) { super(pos); this.mvName = mvName; + this.force = force; } public TableName getMvName() { return mvName; } + public boolean isForce() { + return force; + } @Override public R accept(AstVisitor visitor, C context) { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvUtils.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvUtils.java index 9489edbd44124..290c87baed23d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvUtils.java @@ -1410,4 +1410,26 @@ public static DistributionDesc getDistributionDesc(MaterializedView materialized return new RandomDistributionDesc(); } } + + /** + * Trim the input set if its size is larger than maxLength. + * @return the trimmed set. + */ + public static Set shrinkToSize(Set set, int maxLength) { + if (set != null && set.size() > maxLength) { + return set.stream().limit(maxLength).collect(Collectors.toSet()); + } + return set; + } + + /** + * Trim the input map if its size is larger than maxLength. + * @return the trimmed map. + */ + public static Map> shrinkToSize(Map> map, int maxLength) { + if (map != null && map.size() > maxLength) { + return map.entrySet().stream().limit(maxLength).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + return map; + } } \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java b/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java index 981bc7f458b55..0129d62db4a75 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java @@ -1862,7 +1862,8 @@ public ParseNode visitCancelRefreshMaterializedViewStatement( StarRocksParser.CancelRefreshMaterializedViewStatementContext context) { QualifiedName mvQualifiedName = getQualifiedName(context.qualifiedName()); TableName mvName = qualifiedNameToTableName(mvQualifiedName); - return new CancelRefreshMaterializedViewStmt(mvName, createPos(context)); + boolean force = context.FORCE() != null; + return new CancelRefreshMaterializedViewStmt(mvName, force, createPos(context)); } // ------------------------------------------- Catalog Statement --------------------------------------------------- diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 b/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 index 5e89ce6e8d5d4..c780c2f8d08d4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 +++ b/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 @@ -643,7 +643,7 @@ refreshMaterializedViewStatement ; cancelRefreshMaterializedViewStatement - : CANCEL REFRESH MATERIALIZED VIEW mvName=qualifiedName + : CANCEL REFRESH MATERIALIZED VIEW mvName=qualifiedName FORCE? ; // ------------------------------------------- Admin Statement --------------------------------------------------------- diff --git a/fe/fe-core/src/test/java/com/starrocks/alter/AlterTest.java b/fe/fe-core/src/test/java/com/starrocks/alter/AlterTest.java index 0a23ead337511..f4a4ef6091742 100644 --- a/fe/fe-core/src/test/java/com/starrocks/alter/AlterTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/alter/AlterTest.java @@ -320,9 +320,7 @@ private static void cancelRefreshMaterializedView(String sql, boolean expectedEx CancelRefreshMaterializedViewStmt cancelRefresh = (CancelRefreshMaterializedViewStmt) UtFrameUtils.parseStmtWithNewParser(sql, connectContext); try { - GlobalStateMgr.getCurrentState().getLocalMetastore() - .cancelRefreshMaterializedView(cancelRefresh.getMvName().getDb(), - cancelRefresh.getMvName().getTbl()); + GlobalStateMgr.getCurrentState().getLocalMetastore().cancelRefreshMaterializedView(cancelRefresh); if (expectedException) { Assert.fail(); } diff --git a/fe/fe-core/src/test/java/com/starrocks/analysis/CancelRefreshMaterializedViewTest.java b/fe/fe-core/src/test/java/com/starrocks/analysis/CancelRefreshMaterializedViewTest.java index 268480d3ab748..74507b82960b1 100644 --- a/fe/fe-core/src/test/java/com/starrocks/analysis/CancelRefreshMaterializedViewTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/analysis/CancelRefreshMaterializedViewTest.java @@ -40,5 +40,18 @@ public void testNormal() throws Exception { String mvName = cancelRefresh.getMvName().getTbl(); Assert.assertEquals("test1", dbName); Assert.assertEquals("mv1", mvName); + Assert.assertFalse(cancelRefresh.isForce()); + } + + @Test + public void testForceRefreshMaterializedView() throws Exception { + String refreshMvSql = "cancel refresh materialized view test1.mv1 force"; + CancelRefreshMaterializedViewStmt cancelRefresh = + (CancelRefreshMaterializedViewStmt) UtFrameUtils.parseStmtWithNewParser(refreshMvSql, connectContext); + String dbName = cancelRefresh.getMvName().getDb(); + String mvName = cancelRefresh.getMvName().getTbl(); + Assert.assertEquals("test1", dbName); + Assert.assertEquals("mv1", mvName); + Assert.assertTrue(cancelRefresh.isForce()); } } diff --git a/fe/fe-core/src/test/java/com/starrocks/connector/CatalogConnectorMetadataTest.java b/fe/fe-core/src/test/java/com/starrocks/connector/CatalogConnectorMetadataTest.java index 027c2337610cb..2661b084a6611 100644 --- a/fe/fe-core/src/test/java/com/starrocks/connector/CatalogConnectorMetadataTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/connector/CatalogConnectorMetadataTest.java @@ -184,7 +184,7 @@ void testMetadataRouting(@Mocked ConnectorMetadata connectorMetadata) throws Use connectorMetadata.dropMaterializedView(null); connectorMetadata.alterMaterializedView(null); connectorMetadata.refreshMaterializedView(null); - connectorMetadata.cancelRefreshMaterializedView("test_db", "test_mv"); + connectorMetadata.cancelRefreshMaterializedView(null); connectorMetadata.createView(null); connectorMetadata.alterView(null); connectorMetadata.truncateTable(null, null); @@ -221,7 +221,7 @@ void testMetadataRouting(@Mocked ConnectorMetadata connectorMetadata) throws Use catalogConnectorMetadata.dropMaterializedView(null); catalogConnectorMetadata.alterMaterializedView(null); catalogConnectorMetadata.refreshMaterializedView(null); - catalogConnectorMetadata.cancelRefreshMaterializedView("test_db", "test_mv"); + catalogConnectorMetadata.cancelRefreshMaterializedView(null); catalogConnectorMetadata.createView(null); catalogConnectorMetadata.alterView(null); catalogConnectorMetadata.truncateTable(null, null); diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/MockTaskRunProcessor.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/MockTaskRunProcessor.java index 7c12850fbab41..d263d76a86b9c 100644 --- a/fe/fe-core/src/test/java/com/starrocks/scheduler/MockTaskRunProcessor.java +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/MockTaskRunProcessor.java @@ -22,9 +22,21 @@ public class MockTaskRunProcessor implements TaskRunProcessor { private static final Logger LOG = LogManager.getLogger(TaskManagerTest.class); + private final long sleepTimeMs; + + public MockTaskRunProcessor() { + this.sleepTimeMs = 0; + } + + public MockTaskRunProcessor(long sleepTime) { + this.sleepTimeMs = sleepTime; + } @Override public void processTaskRun(TaskRunContext context) throws Exception { + if (sleepTimeMs > 0) { + Thread.sleep(sleepTimeMs); + } LOG.info("running a task. currentTime:" + LocalDateTime.now()); } diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/TaskManagerTest.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/TaskManagerTest.java index ef2a8b0473a22..f3dd65d75622c 100644 --- a/fe/fe-core/src/test/java/com/starrocks/scheduler/TaskManagerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/TaskManagerTest.java @@ -33,6 +33,8 @@ import com.starrocks.utframe.StarRocksAssert; import com.starrocks.utframe.UtFrameUtils; import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; import org.apache.hadoop.util.ThreadUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -703,4 +705,96 @@ public void testTaskEquality() { Assert.assertTrue(map1.get(task1.getId()).equals(map3.get(task1.getId()))); } } + + @Test + public void testSyncRefreshWithoutMergeable() { + Config.enable_mv_refresh_sync_refresh_mergeable = false; + TaskManager tm = new TaskManager(); + TaskRunScheduler taskRunScheduler = tm.getTaskRunScheduler(); + for (int i = 0; i < 10; i++) { + Task task = new Task("test"); + task.setDefinition("select 1"); + TaskRun taskRun = makeTaskRun(1, task, makeExecuteOption(true, true)); + taskRun.setProcessor(new MockTaskRunProcessor(5000)); + tm.getTaskRunManager().submitTaskRun(taskRun, taskRun.getExecuteOption()); + } + long pendingTaskRunsCount = taskRunScheduler.getPendingQueueCount(); + Assert.assertEquals(pendingTaskRunsCount, 10); + } + + @Test + public void testSyncRefreshWithMergeable1() { + TaskManager tm = new TaskManager(); + TaskRunScheduler taskRunScheduler = tm.getTaskRunScheduler(); + Config.enable_mv_refresh_sync_refresh_mergeable = true; + for (int i = 0; i < 10; i++) { + Task task = new Task("test"); + task.setDefinition("select 1"); + TaskRun taskRun = makeTaskRun(1, task, makeExecuteOption(true, true)); + taskRun.setProcessor(new MockTaskRunProcessor(5000)); + tm.getTaskRunManager().submitTaskRun(taskRun, taskRun.getExecuteOption()); + } + long pendingTaskRunsCount = taskRunScheduler.getPendingQueueCount(); + Assert.assertTrue(pendingTaskRunsCount == 1); + Config.enable_mv_refresh_sync_refresh_mergeable = false; + } + + @Test + public void testSyncRefreshWithMergeable2() { + TaskManager tm = new TaskManager(); + TaskRunScheduler taskRunScheduler = tm.getTaskRunScheduler(); + Config.enable_mv_refresh_sync_refresh_mergeable = true; + for (int i = 0; i < 10; i++) { + Task task = new Task("test"); + task.setDefinition("select 1"); + TaskRun taskRun = makeTaskRun(1, task, makeExecuteOption(true, true)); + taskRun.setProcessor(new MockTaskRunProcessor(5000)); + tm.getTaskRunManager().submitTaskRun(taskRun, taskRun.getExecuteOption()); + taskRunScheduler.scheduledPendingTaskRun(t -> { + try { + t.getProcessor().postTaskRun(null); + } catch (Exception e) { + Assert.fail("Process task run failed:" + e); + } + }); + } + long pendingTaskRunsCount = taskRunScheduler.getPendingQueueCount(); + Assert.assertTrue(pendingTaskRunsCount == 1); + Config.enable_mv_refresh_sync_refresh_mergeable = false; + } + + @Test + public void testKillTaskRun() { + TaskManager tm = new TaskManager(); + TaskRunScheduler taskRunScheduler = tm.getTaskRunScheduler(); + for (int i = 0; i < 10; i++) { + Task task = new Task("test"); + task.setDefinition("select 1"); + TaskRun taskRun = makeTaskRun(1, task, makeExecuteOption(true, false)); + taskRun.setProcessor(new MockTaskRunProcessor(5000)); + tm.getTaskRunManager().submitTaskRun(taskRun, taskRun.getExecuteOption()); + } + taskRunScheduler.scheduledPendingTaskRun(taskRun -> { + try { + taskRun.getProcessor().postTaskRun(null); + } catch (Exception e) { + Assert.fail("Process task run failed:" + e); + } + }); + long runningTaskRunsCount = taskRunScheduler.getRunningTaskCount(); + Assert.assertEquals(1, runningTaskRunsCount); + + new MockUp() { + @Mock + public ConnectContext getRunCtx() { + return null; + } + }; + // running task run will not be removed if force kill is false + TaskRunManager taskRunManager = tm.getTaskRunManager(); + taskRunManager.killTaskRun(1L, false); + Assert.assertEquals(1, taskRunScheduler.getRunningTaskCount()); + taskRunManager.killTaskRun(1L, true); + Assert.assertEquals(0, taskRunScheduler.getRunningTaskCount()); + } } \ No newline at end of file diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessageTest.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessageTest.java new file mode 100644 index 0000000000000..2c704687f8089 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/persist/MVTaskRunExtraMessageTest.java @@ -0,0 +1,113 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +package com.starrocks.scheduler.persist; + +import com.google.common.collect.Maps; +import com.starrocks.common.Config; +import org.assertj.core.util.Sets; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; +import java.util.Set; + +public class MVTaskRunExtraMessageTest { + @Test + public void testMessageWithNormalMVPartitionsToRefresh() { + MVTaskRunExtraMessage extraMessage = new MVTaskRunExtraMessage(); + Set mvPartitionsToRefresh = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + mvPartitionsToRefresh.add("partition" + i); + } + extraMessage.setMvPartitionsToRefresh(mvPartitionsToRefresh); + Assert.assertTrue(extraMessage.getMvPartitionsToRefresh().size() == + 10); + } + + @Test + public void testMessageWithTooLongMVPartitionsToRefresh() { + MVTaskRunExtraMessage extraMessage = new MVTaskRunExtraMessage(); + Set mvPartitionsToRefresh = Sets.newHashSet(); + for (int i = 0; i < 100; i++) { + mvPartitionsToRefresh.add("partition" + i); + } + extraMessage.setMvPartitionsToRefresh(mvPartitionsToRefresh); + Assert.assertTrue(extraMessage.getMvPartitionsToRefresh().size() == + Config.max_mv_task_run_meta_message_values_length); + } + + @Test + public void testMessageWithNormalRefBasePartitionsToRefreshMap() { + Map> refBasePartitionsToRefreshMap = Maps.newHashMap(); + for (int i = 0; i < 15; i++) { + Set partitions = Sets.newHashSet(); + for (int j = 0; j < 10; j++) { + partitions.add("partition" + j); + } + refBasePartitionsToRefreshMap.put("table" + i, partitions); + } + MVTaskRunExtraMessage message = new MVTaskRunExtraMessage(); + message.setRefBasePartitionsToRefreshMap(refBasePartitionsToRefreshMap); + Assert.assertTrue(message.getRefBasePartitionsToRefreshMap().size() == 15); + } + + @Test + public void testMessageWithTooLongRefBasePartitionsToRefreshMap() { + Map> refBasePartitionsToRefreshMap = Maps.newHashMap(); + for (int i = 0; i < 100; i++) { + Set partitions = Sets.newHashSet(); + for (int j = 0; j < 10; j++) { + partitions.add("partition" + j); + } + refBasePartitionsToRefreshMap.put("table" + i, partitions); + } + MVTaskRunExtraMessage message = new MVTaskRunExtraMessage(); + message.setRefBasePartitionsToRefreshMap(refBasePartitionsToRefreshMap); + Assert.assertTrue(message.getRefBasePartitionsToRefreshMap().size() == + Config.max_mv_task_run_meta_message_values_length); + } + + @Test + public void testMessageWithNormalBasePartitionsToRefreshMap() { + Map> basePartitionsToRefreshMap = Maps.newHashMap(); + for (int i = 0; i < 15; i++) { + Set partitions = Sets.newHashSet(); + for (int j = 0; j < 10; j++) { + partitions.add("partition" + j); + } + basePartitionsToRefreshMap.put("table" + i, partitions); + } + MVTaskRunExtraMessage message = new MVTaskRunExtraMessage(); + message.setBasePartitionsToRefreshMap(basePartitionsToRefreshMap); + Assert.assertTrue(message.getBasePartitionsToRefreshMap().size() == 15); + } + + @Test + public void testMessageWithTooLongBasePartitionsToRefreshMap() { + Map> basePartitionsToRefreshMap = Maps.newHashMap(); + for (int i = 0; i < 100; i++) { + Set partitions = Sets.newHashSet(); + for (int j = 0; j < 10; j++) { + partitions.add("partition" + j); + } + basePartitionsToRefreshMap.put("table" + i, partitions); + } + MVTaskRunExtraMessage message = new MVTaskRunExtraMessage(); + message.setBasePartitionsToRefreshMap(basePartitionsToRefreshMap); + Assert.assertTrue(message.getBasePartitionsToRefreshMap().size() == + Config.max_mv_task_run_meta_message_values_length); + } +}