From f1e01fe1dbb6349794775c7df211a3464a31644c Mon Sep 17 00:00:00 2001 From: "shuming.li" Date: Thu, 27 Jun 2024 13:51:25 +0800 Subject: [PATCH] fix task run bugs Signed-off-by: shuming.li --- .../com/starrocks/scheduler/Constants.java | 9 +++++++- .../com/starrocks/scheduler/TaskManager.java | 6 ++--- .../starrocks/scheduler/TaskRunFIFOQueue.java | 23 +++++++++++++------ .../starrocks/scheduler/TaskRunManager.java | 2 +- .../starrocks/scheduler/TaskRunScheduler.java | 4 ++-- .../com/starrocks/server/LocalMetastore.java | 6 ++--- 6 files changed, 32 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/Constants.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/Constants.java index 49c376cfa85f4c..82218d4a348ba7 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/Constants.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/Constants.java @@ -65,7 +65,14 @@ public enum TaskRunState { RUNNING, // The task run is scheduled into running queue and is running FAILED, // The task run is failed SUCCESS, // The task run is finished successfully - MERGED, // The task run is merged + MERGED; // The task run is merged + + /** + * Whether the task run state is a success state + */ + public boolean isSuccessState() { + return this.equals(TaskRunState.SUCCESS) || this.equals(TaskRunState.MERGED); + } } public static boolean isFinishState(TaskRunState state) { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java index 06d05cad12361f..534b3342599e9e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java @@ -188,7 +188,7 @@ private void clearUnfinishedTaskRun() { GlobalStateMgr.getCurrentState().getEditLog().logUpdateTaskRun(statusChange); // remove pending task run - taskRunScheduler.removePendingTaskRun(taskRun); + taskRunScheduler.removePendingTaskRun(taskRun, Constants.TaskRunState.FAILED); } // clear running task runs @@ -336,7 +336,7 @@ public SubmitResult executeTaskSync(Task task, ExecuteOption option) { try { taskRunScheduler.addSyncRunningTaskRun(taskRun); Constants.TaskRunState taskRunState = taskRun.getFuture().get(); - if (taskRunState != Constants.TaskRunState.SUCCESS) { + if (!taskRunState.isSuccessState()) { String msg = taskRun.getStatus().getErrorMessage(); throw new DmlException("execute task %s failed: %s", task.getName(), msg); } @@ -816,7 +816,7 @@ public void replayUpdateTaskRun(TaskRunStatusChange statusChange) { return; } // remove it from pending task queue - taskRunScheduler.removePendingTaskRun(pendingTaskRun); + taskRunScheduler.removePendingTaskRun(pendingTaskRun, toStatus); TaskRunStatus status = pendingTaskRun.getStatus(); if (toStatus == Constants.TaskRunState.RUNNING) { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunFIFOQueue.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunFIFOQueue.java index 60ed6e5aa4f62b..261c047a4dc198 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunFIFOQueue.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunFIFOQueue.java @@ -127,21 +127,30 @@ public boolean add(TaskRun taskRun) { /** * Remove a specific task run from the queue. * @param taskRun: task run to remove + * @param state: complete or cancel task run and set it with the state if the task run's future is not null */ - public boolean remove(TaskRun taskRun) { + public boolean remove(TaskRun taskRun, Constants.TaskRunState state) { if (taskRun == null) { return false; } wLock.lock(); try { - // make sure future is canceled. - CompletableFuture future = taskRun.getFuture(); - boolean isCancel = future.cancel(true); - if (!isCancel) { - LOG.warn("fail to cancel scheduler for task [{}]", taskRun); + CompletableFuture future = taskRun.getFuture(); + // make sure the future is canceled or completed + if (future != null) { + if (state != null && state.isSuccessState()) { + boolean isComplete = future.complete(state); + if (!isComplete) { + LOG.warn("fail to complete scheduler for task [{}]", taskRun); + } + } else { + boolean isCancel = future.cancel(true); + if (!isCancel) { + LOG.warn("fail to cancel scheduler for task [{}]", taskRun); + } + } } - // remove it from pending map. removeFromMapUnlock(taskRun); // remove it from pending queue. diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java index adf98de9564fc4..0c96e154faa230 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java @@ -168,7 +168,7 @@ public boolean arrangeTaskRun(TaskRun taskRun, boolean isReplay) { GlobalStateMgr.getCurrentState().getEditLog().logUpdateTaskRun(statusChange); // update the state of the old TaskRun to MERGED in LEADER oldTaskRun.getStatus().setState(Constants.TaskRunState.MERGED); - taskRunScheduler.removePendingTaskRun(oldTaskRun); + taskRunScheduler.removePendingTaskRun(oldTaskRun, Constants.TaskRunState.MERGED); taskRunHistory.addHistory(oldTaskRun.getStatus()); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunScheduler.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunScheduler.java index 6f55cc20227f64..54cdbd74cd97aa 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunScheduler.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunScheduler.java @@ -82,12 +82,12 @@ public boolean addPendingTaskRun(TaskRun taskRun) { return pendingTaskRunQueue.add(taskRun); } - public void removePendingTaskRun(TaskRun taskRun) { + public void removePendingTaskRun(TaskRun taskRun, Constants.TaskRunState state) { if (taskRun == null) { return; } LOG.info("remove pending task run: {}", taskRun); - pendingTaskRunQueue.remove(taskRun); + pendingTaskRunQueue.remove(taskRun, state); } public void removePendingTask(Task task) { diff --git a/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java b/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java index 6a326938df82ef..c0206cfad0fdc4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java @@ -3427,11 +3427,9 @@ public String refreshMaterializedView(RefreshMaterializedViewStatement refreshMa String dbName = refreshMaterializedViewStatement.getMvName().getDb(); String mvName = refreshMaterializedViewStatement.getMvName().getTbl(); boolean force = refreshMaterializedViewStatement.isForceRefresh(); - PartitionRangeDesc range = - refreshMaterializedViewStatement.getPartitionRangeDesc(); - + PartitionRangeDesc range = refreshMaterializedViewStatement.getPartitionRangeDesc(); return refreshMaterializedView(dbName, mvName, force, range, Constants.TaskRunPriority.HIGH.value(), - false, true, refreshMaterializedViewStatement.isSync()); + Config.enable_mv_refresh_sync_refresh_mergeable, true, refreshMaterializedViewStatement.isSync()); } @Override