Skip to content

Commit

Permalink
fix task run bugs
Browse files Browse the repository at this point in the history
Signed-off-by: shuming.li <ming.moriarty@gmail.com>
  • Loading branch information
LiShuMing committed Jun 27, 2024
1 parent 165fab3 commit f1e01fe
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,14 @@ public enum TaskRunState {
RUNNING, // The task run is scheduled into running queue and is running
FAILED, // The task run is failed
SUCCESS, // The task run is finished successfully
MERGED, // The task run is merged
MERGED; // The task run is merged

/**
* Whether the task run state is a success state
*/
public boolean isSuccessState() {
return this.equals(TaskRunState.SUCCESS) || this.equals(TaskRunState.MERGED);
}
}

public static boolean isFinishState(TaskRunState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private void clearUnfinishedTaskRun() {
GlobalStateMgr.getCurrentState().getEditLog().logUpdateTaskRun(statusChange);

// remove pending task run
taskRunScheduler.removePendingTaskRun(taskRun);
taskRunScheduler.removePendingTaskRun(taskRun, Constants.TaskRunState.FAILED);
}

// clear running task runs
Expand Down Expand Up @@ -336,7 +336,7 @@ public SubmitResult executeTaskSync(Task task, ExecuteOption option) {
try {
taskRunScheduler.addSyncRunningTaskRun(taskRun);
Constants.TaskRunState taskRunState = taskRun.getFuture().get();
if (taskRunState != Constants.TaskRunState.SUCCESS) {
if (!taskRunState.isSuccessState()) {
String msg = taskRun.getStatus().getErrorMessage();
throw new DmlException("execute task %s failed: %s", task.getName(), msg);
}
Expand Down Expand Up @@ -816,7 +816,7 @@ public void replayUpdateTaskRun(TaskRunStatusChange statusChange) {
return;
}
// remove it from pending task queue
taskRunScheduler.removePendingTaskRun(pendingTaskRun);
taskRunScheduler.removePendingTaskRun(pendingTaskRun, toStatus);

TaskRunStatus status = pendingTaskRun.getStatus();
if (toStatus == Constants.TaskRunState.RUNNING) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,30 @@ public boolean add(TaskRun taskRun) {
/**
* Remove a specific task run from the queue.
* @param taskRun: task run to remove
* @param state: complete or cancel task run and set it with the state if the task run's future is not null
*/
public boolean remove(TaskRun taskRun) {
public boolean remove(TaskRun taskRun, Constants.TaskRunState state) {
if (taskRun == null) {
return false;
}

wLock.lock();
try {
// make sure future is canceled.
CompletableFuture<?> future = taskRun.getFuture();
boolean isCancel = future.cancel(true);
if (!isCancel) {
LOG.warn("fail to cancel scheduler for task [{}]", taskRun);
CompletableFuture<Constants.TaskRunState> future = taskRun.getFuture();
// make sure the future is canceled or completed
if (future != null) {
if (state != null && state.isSuccessState()) {
boolean isComplete = future.complete(state);
if (!isComplete) {
LOG.warn("fail to complete scheduler for task [{}]", taskRun);
}
} else {
boolean isCancel = future.cancel(true);
if (!isCancel) {
LOG.warn("fail to cancel scheduler for task [{}]", taskRun);
}
}
}

// remove it from pending map.
removeFromMapUnlock(taskRun);
// remove it from pending queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public boolean arrangeTaskRun(TaskRun taskRun, boolean isReplay) {
GlobalStateMgr.getCurrentState().getEditLog().logUpdateTaskRun(statusChange);
// update the state of the old TaskRun to MERGED in LEADER
oldTaskRun.getStatus().setState(Constants.TaskRunState.MERGED);
taskRunScheduler.removePendingTaskRun(oldTaskRun);
taskRunScheduler.removePendingTaskRun(oldTaskRun, Constants.TaskRunState.MERGED);
taskRunHistory.addHistory(oldTaskRun.getStatus());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ public boolean addPendingTaskRun(TaskRun taskRun) {
return pendingTaskRunQueue.add(taskRun);
}

public void removePendingTaskRun(TaskRun taskRun) {
public void removePendingTaskRun(TaskRun taskRun, Constants.TaskRunState state) {
if (taskRun == null) {
return;
}
LOG.info("remove pending task run: {}", taskRun);
pendingTaskRunQueue.remove(taskRun);
pendingTaskRunQueue.remove(taskRun, state);
}

public void removePendingTask(Task task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3427,11 +3427,9 @@ public String refreshMaterializedView(RefreshMaterializedViewStatement refreshMa
String dbName = refreshMaterializedViewStatement.getMvName().getDb();
String mvName = refreshMaterializedViewStatement.getMvName().getTbl();
boolean force = refreshMaterializedViewStatement.isForceRefresh();
PartitionRangeDesc range =
refreshMaterializedViewStatement.getPartitionRangeDesc();

PartitionRangeDesc range = refreshMaterializedViewStatement.getPartitionRangeDesc();
return refreshMaterializedView(dbName, mvName, force, range, Constants.TaskRunPriority.HIGH.value(),
false, true, refreshMaterializedViewStatement.isSync());
Config.enable_mv_refresh_sync_refresh_mergeable, true, refreshMaterializedViewStatement.isSync());
}

@Override
Expand Down

0 comments on commit f1e01fe

Please sign in to comment.