Skip to content

Commit

Permalink
[BugFix] Add submit retry and redefine loading task queue size
Browse files Browse the repository at this point in the history
  • Loading branch information
meegoo committed Sep 28, 2022
1 parent 945f911 commit d25924c
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ brokerFileGroups, getDeadlineMs(), loadMemLimit,

// Submit task outside the database lock, cause it may take a while if task queue is full.
for (LoadTask loadTask : newLoadingTasks) {
GlobalStateMgr.getCurrentState().getLoadingLoadTaskScheduler().submit(loadTask);
submitTask(GlobalStateMgr.getCurrentState().getLoadingLoadTaskScheduler(), loadTask);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void onTaskFailed(long taskId, FailMsg failMsg) {
if (loadTask.getTaskType() == LoadTask.TaskType.PENDING) {
submitTask(GlobalStateMgr.getCurrentState().getPendingLoadTaskScheduler(), loadTask);
} else if (loadTask.getTaskType() == LoadTask.TaskType.LOADING) {
GlobalStateMgr.getCurrentState().getLoadingLoadTaskScheduler().submit(loadTask);
submitTask(GlobalStateMgr.getCurrentState().getLoadingLoadTaskScheduler(), loadTask);
} else {
throw new LoadException(String.format("Unknown load task type: %s. task id: %d, job id, %d",
loadTask.getTaskType(), loadTask.getSignature(), id));
Expand Down
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.task.LeaderTaskExecutor;
import com.starrocks.task.PriorityLeaderTask;
import com.starrocks.task.PriorityLeaderTaskExecutor;
import com.starrocks.thrift.TEtlState;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.transaction.AbstractTxnStateChangeCallback;
Expand Down Expand Up @@ -383,6 +384,23 @@ protected void submitTask(LeaderTaskExecutor executor, LoadTask task) throws Loa
}
}

protected void submitTask(PriorityLeaderTaskExecutor executor, LoadTask task) throws LoadException {
int retryNum = 0;
while (!executor.submit(task)) {
LOG.warn("submit load task failed. try to resubmit. job id: {}, task id: {}, retry: {}",
id, task.getSignature(), retryNum);
if (++retryNum > TASK_SUBMIT_RETRY_NUM) {
throw new LoadException("submit load task failed");
}

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.warn(e);
}
}
}

public void processTimeout() {
// this is only for jobs which transaction is not started.
// if transaction is started, global transaction manager will handle the timeout.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,10 @@ private GlobalStateMgr(boolean isCheckpointCatalog) {
new LeaderTaskExecutor("pending_load_task_scheduler", Config.async_load_task_pool_size,
Config.desired_max_waiting_jobs, !isCheckpointCatalog);
// One load job will be split into multiple loading tasks, the queue size is not
// determined, so set async_load_task_pool_size * 10
// determined, so set desired_max_waiting_jobs * 10
this.loadingLoadTaskScheduler = new PriorityLeaderTaskExecutor("loading_load_task_scheduler",
Config.async_load_task_pool_size,
Config.async_load_task_pool_size * 10, !isCheckpointCatalog);
Config.desired_max_waiting_jobs * 10, !isCheckpointCatalog);
this.loadJobScheduler = new LoadJobScheduler();
this.loadManager = new LoadManager(loadJobScheduler);
this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import com.starrocks.sql.ast.DataDescription;
import com.starrocks.task.LeaderTask;
import com.starrocks.task.LeaderTaskExecutor;
import com.starrocks.task.PriorityLeaderTask;
import com.starrocks.task.PriorityLeaderTaskExecutor;
import com.starrocks.transaction.TransactionState;
import mockit.Expectations;
import mockit.Injectable;
Expand Down Expand Up @@ -341,6 +343,7 @@ public void testPendingTaskOnFinished(@Injectable BrokerPendingTaskAttachment at
@Injectable BrokerFileGroup brokerFileGroup2,
@Injectable BrokerFileGroup brokerFileGroup3,
@Mocked LeaderTaskExecutor leaderTaskExecutor,
@Mocked PriorityLeaderTaskExecutor priorityLeaderTaskExecutor,
@Injectable OlapTable olapTable,
@Mocked LoadingTaskPlanner loadingTaskPlanner) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
Expand Down Expand Up @@ -386,6 +389,9 @@ public void testPendingTaskOnFinished(@Injectable BrokerPendingTaskAttachment at
leaderTaskExecutor.submit((LeaderTask) any);
minTimes = 0;
result = true;
priorityLeaderTaskExecutor.submit((PriorityLeaderTask) any);
minTimes = 0;
result = true;
}
};

Expand Down

0 comments on commit d25924c

Please sign in to comment.