Skip to content

Commit

Permalink
[BugFix] Support force cancel refresh materialized view & optimize so…
Browse files Browse the repository at this point in the history
…me task run strategies (backport #46131) (#47586)

Signed-off-by: shuming.li <ming.moriarty@gmail.com>
Co-authored-by: shuming.li <ming.moriarty@gmail.com>
  • Loading branch information
mergify[bot] and LiShuMing authored Jun 27, 2024
1 parent 4f64114 commit 1d8967c
Show file tree
Hide file tree
Showing 25 changed files with 401 additions and 69 deletions.
7 changes: 7 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2836,6 +2836,13 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, comment = "The default try lock timeout for mv refresh to try base table/mv dbs' lock")
public static int mv_refresh_try_lock_timeout_ms = 30 * 1000;

@ConfField(mutable = true, comment = "Whether enable to refresh materialized view in sync mode mergeable or not")
public static boolean enable_mv_refresh_sync_refresh_mergeable = false;

@ConfField(mutable = true, comment = "The max length for mv task run extra message's values(set/map) to avoid " +
"occupying too much meta memory")
public static int max_mv_task_run_meta_message_values_length = 16;

/**
* The refresh partition number when refreshing materialized view at once by default.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.starrocks.sql.ast.AlterTableCommentClause;
import com.starrocks.sql.ast.AlterTableStmt;
import com.starrocks.sql.ast.AlterViewStmt;
import com.starrocks.sql.ast.CancelRefreshMaterializedViewStmt;
import com.starrocks.sql.ast.CreateMaterializedViewStatement;
import com.starrocks.sql.ast.CreateMaterializedViewStmt;
import com.starrocks.sql.ast.CreateTableLikeStmt;
Expand Down Expand Up @@ -304,8 +305,9 @@ public String refreshMaterializedView(RefreshMaterializedViewStatement refreshMa
}

@Override
public void cancelRefreshMaterializedView(String dbName, String mvName) throws DdlException, MetaNotFoundException {
normal.cancelRefreshMaterializedView(dbName, mvName);
public void cancelRefreshMaterializedView(
CancelRefreshMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException {
normal.cancelRefreshMaterializedView(stmt);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.starrocks.sql.ast.AlterTableCommentClause;
import com.starrocks.sql.ast.AlterTableStmt;
import com.starrocks.sql.ast.AlterViewStmt;
import com.starrocks.sql.ast.CancelRefreshMaterializedViewStmt;
import com.starrocks.sql.ast.CreateMaterializedViewStatement;
import com.starrocks.sql.ast.CreateMaterializedViewStmt;
import com.starrocks.sql.ast.CreateTableLikeStmt;
Expand Down Expand Up @@ -313,7 +314,7 @@ default String refreshMaterializedView(RefreshMaterializedViewStatement refreshM
return null;
}

default void cancelRefreshMaterializedView(String dbName, String mvName)
default void cancelRefreshMaterializedView(CancelRefreshMaterializedViewStmt stmt)
throws DdlException, MetaNotFoundException {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,7 @@ public ShowResultSet visitRefreshMaterializedViewStatement(RefreshMaterializedVi
public ShowResultSet visitCancelRefreshMaterializedViewStatement(CancelRefreshMaterializedViewStmt stmt,
ConnectContext context) {
ErrorReport.wrapWithRuntimeException(() -> {
context.getGlobalStateMgr().getLocalMetastore()
.cancelRefreshMaterializedView(
stmt.getMvName().getDb(),
stmt.getMvName().getTbl());
context.getGlobalStateMgr().getLocalMetastore().cancelRefreshMaterializedView(stmt);
});
return null;
}
Expand Down
9 changes: 6 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.gson.Gson;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.HintNode;
Expand Down Expand Up @@ -2009,7 +2010,8 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
long createTime = System.currentTimeMillis();

long loadedRows = 0;
int filteredRows = 0;
// filteredRows is stored in int64_t in the backend, so use long here.
long filteredRows = 0;
long loadedBytes = 0;
long jobId = -1;
long estimateScanRows = -1;
Expand Down Expand Up @@ -2131,7 +2133,7 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
loadedRows = Long.parseLong(coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
}
if (coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) {
filteredRows = Integer.parseInt(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
filteredRows = Long.parseLong(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
}

if (coord.getLoadCounters().get(LoadJob.LOADED_BYTES) != null) {
Expand Down Expand Up @@ -2373,7 +2375,8 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
sb.append("}");
}

context.getState().setOk(loadedRows, filteredRows, sb.toString());
// filterRows may be overflow when to convert it into int, use `saturatedCast` to avoid overflow
context.getState().setOk(loadedRows, Ints.saturatedCast(filteredRows), sb.toString());
}

public String getOriginStmtInString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,14 @@ public enum TaskRunState {
RUNNING, // The task run is scheduled into running queue and is running
FAILED, // The task run is failed
SUCCESS, // The task run is finished successfully
MERGED, // The task run is merged
MERGED; // The task run is merged

/**
* Whether the task run state is a success state
*/
public boolean isSuccessState() {
return this.equals(TaskRunState.SUCCESS) || this.equals(TaskRunState.MERGED);
}
}

public static boolean isFinishState(TaskRunState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import com.starrocks.common.Config;
import com.starrocks.persist.gson.GsonUtils;

import java.util.Map;
Expand Down Expand Up @@ -67,7 +68,11 @@ public void setPriority(int priority) {
public boolean isMergeRedundant() {
// If old task run is a sync-mode task, skip to merge it to avoid sync-mode task
// hanging after removing it.
return !isSync && isMergeRedundant;
if (Config.enable_mv_refresh_sync_refresh_mergeable) {
return isMergeRedundant;
} else {
return !isSync && isMergeRedundant;
}
}

public Map<String, String> getTaskRunProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,20 @@ private void changeDefaultConnectContextIfNeeded(ConnectContext mvConnectCtx) {
if (!mvProperty.getProperties().containsKey(MV_SESSION_TIMEOUT)) {
mvSessionVariable.setQueryTimeoutS(MV_DEFAULT_QUERY_TIMEOUT);
}

// set enable_insert_strict by default
if (!isMVPropertyContains(SessionVariable.ENABLE_INSERT_STRICT)) {
mvSessionVariable.setEnableInsertStrict(false);
}
// enable profile by default for mv refresh task
if (!isMVPropertyContains(SessionVariable.ENABLE_PROFILE)) {
mvSessionVariable.setEnableProfile(true);
}
}

private boolean isMVPropertyContains(String key) {
String mvKey = PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX + key;
return materializedView.getTableProperty().getProperties().containsKey(mvKey);
}

private void postProcess() {
Expand Down Expand Up @@ -1198,7 +1212,6 @@ public void refreshMaterializedView(MvTaskRunContext mvContext, ExecPlan execPla
parentStmtExecutor.registerSubStmtExecutor(executor);
}
ctx.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
ctx.getSessionVariable().setEnableInsertStrict(false);
LOG.info("[QueryId:{}] start to refresh materialized view {}", ctx.getQueryId(), materializedView.getName());
try {
executor.handleDMLStmtWithProfile(execPlan, insertStmt);
Expand Down
32 changes: 15 additions & 17 deletions fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private void clearUnfinishedTaskRun() {
GlobalStateMgr.getCurrentState().getEditLog().logUpdateTaskRun(statusChange);

// remove pending task run
taskRunScheduler.removePendingTaskRun(taskRun);
taskRunScheduler.removePendingTaskRun(taskRun, Constants.TaskRunState.FAILED);
}

// clear running task runs
Expand Down Expand Up @@ -270,24 +270,22 @@ private boolean stopScheduler(String taskName) {
return isCancel;
}

public boolean killTask(String taskName, boolean clearPending) {
public boolean killTask(String taskName, boolean force) {
Task task = nameToTaskMap.get(taskName);
if (task == null) {
return false;
}
if (clearPending) {
if (!taskRunManager.tryTaskRunLock()) {
return false;
}
try {
taskRunScheduler.removePendingTask(task);
} catch (Exception ex) {
LOG.warn("failed to kill task.", ex);
} finally {
taskRunManager.taskRunUnlock();
}
if (!taskRunManager.tryTaskRunLock()) {
return false;
}
try {
taskRunScheduler.removePendingTask(task);
} catch (Exception ex) {
LOG.warn("failed to kill task.", ex);
} finally {
taskRunManager.taskRunUnlock();
}
return taskRunManager.killTaskRun(task.getId());
return taskRunManager.killTaskRun(task.getId(), force);
}

public SubmitResult executeTask(String taskName) {
Expand Down Expand Up @@ -337,7 +335,7 @@ public SubmitResult executeTaskSync(Task task, ExecuteOption option) {
try {
taskRunScheduler.addSyncRunningTaskRun(taskRun);
Constants.TaskRunState taskRunState = taskRun.getFuture().get();
if (taskRunState != Constants.TaskRunState.SUCCESS) {
if (!taskRunState.isSuccessState()) {
String msg = taskRun.getStatus().getErrorMessage();
throw new DmlException("execute task %s failed: %s", task.getName(), msg);
}
Expand Down Expand Up @@ -377,7 +375,7 @@ public void dropTasks(List<Long> taskIdList, boolean isReplay) {
}
periodFutureMap.remove(task.getId());
}
if (!killTask(task.getName(), true)) {
if (!killTask(task.getName(), false)) {
LOG.error("kill task failed: " + task.getName());
}
idToTaskMap.remove(task.getId());
Expand Down Expand Up @@ -821,7 +819,7 @@ public void replayUpdateTaskRun(TaskRunStatusChange statusChange) {
return;
}
// remove it from pending task queue
taskRunScheduler.removePendingTaskRun(pendingTaskRun);
taskRunScheduler.removePendingTaskRun(pendingTaskRun, toStatus);

TaskRunStatus status = pendingTaskRun.getStatus();
if (toStatus == Constants.TaskRunState.RUNNING) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,30 @@ public boolean add(TaskRun taskRun) {
/**
* Remove a specific task run from the queue.
* @param taskRun: task run to remove
* @param state: complete or cancel task run and set it with the state if the task run's future is not null
*/
public boolean remove(TaskRun taskRun) {
public boolean remove(TaskRun taskRun, Constants.TaskRunState state) {
if (taskRun == null) {
return false;
}

wLock.lock();
try {
// make sure future is canceled.
CompletableFuture<?> future = taskRun.getFuture();
boolean isCancel = future.cancel(true);
if (!isCancel) {
LOG.warn("fail to cancel scheduler for task [{}]", taskRun);
CompletableFuture<Constants.TaskRunState> future = taskRun.getFuture();
// make sure the future is canceled or completed
if (future != null) {
if (state != null && state.isSuccessState()) {
boolean isComplete = future.complete(state);
if (!isComplete) {
LOG.warn("fail to complete scheduler for task [{}]", taskRun);
}
} else {
boolean isCancel = future.cancel(true);
if (!isCancel) {
LOG.warn("fail to cancel scheduler for task [{}]", taskRun);
}
}
}

// remove it from pending map.
removeFromMapUnlock(taskRun);
// remove it from pending queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,23 @@ public SubmitResult submitTaskRun(TaskRun taskRun, ExecuteOption option) {
return new SubmitResult(queryId, SubmitResult.SubmitStatus.SUBMITTED, taskRun.getFuture());
}

public boolean killTaskRun(Long taskId) {
public boolean killTaskRun(Long taskId, boolean force) {
TaskRun taskRun = taskRunScheduler.getRunningTaskRun(taskId);
if (taskRun == null) {
return false;
}
taskRun.kill();
ConnectContext runCtx = taskRun.getRunCtx();
if (runCtx != null) {
runCtx.kill(false, "kill TaskRun");
return true;
try {
taskRun.kill();
ConnectContext runCtx = taskRun.getRunCtx();
if (runCtx != null) {
runCtx.kill(false, "kill TaskRun");
return true;
}
} finally {
// if it's force, remove it from running TaskRun map no matter it's killed or not
if (force) {
taskRunScheduler.removeRunningTask(taskRun.getTaskId());
}
}
return false;
}
Expand Down Expand Up @@ -160,7 +167,7 @@ public boolean arrangeTaskRun(TaskRun taskRun, boolean isReplay) {
GlobalStateMgr.getCurrentState().getEditLog().logUpdateTaskRun(statusChange);
// update the state of the old TaskRun to MERGED in LEADER
oldTaskRun.getStatus().setState(Constants.TaskRunState.MERGED);
taskRunScheduler.removePendingTaskRun(oldTaskRun);
taskRunScheduler.removePendingTaskRun(oldTaskRun, Constants.TaskRunState.MERGED);
taskRunHistory.addHistory(oldTaskRun.getStatus());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ public boolean addPendingTaskRun(TaskRun taskRun) {
return pendingTaskRunQueue.add(taskRun);
}

public void removePendingTaskRun(TaskRun taskRun) {
public void removePendingTaskRun(TaskRun taskRun, Constants.TaskRunState state) {
if (taskRun == null) {
return;
}
LOG.info("remove pending task run: {}", taskRun);
pendingTaskRunQueue.remove(taskRun);
pendingTaskRunQueue.remove(taskRun, state);
}

public void removePendingTask(Task task) {
Expand Down
Loading

0 comments on commit 1d8967c

Please sign in to comment.