|
33 | 33 | import org.apache.doris.cloud.master.CloudReportHandler; |
34 | 34 | import org.apache.doris.common.Config; |
35 | 35 | import org.apache.doris.common.MetaNotFoundException; |
| 36 | +import org.apache.doris.common.Status; |
36 | 37 | import org.apache.doris.load.DeleteJob; |
37 | 38 | import org.apache.doris.load.loadv2.IngestionLoadJob; |
38 | 39 | import org.apache.doris.system.Backend; |
@@ -352,15 +353,17 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro |
352 | 353 | // we don't need to retry if the returned status code is DELETE_INVALID_CONDITION |
353 | 354 | // or DELETE_INVALID_PARAMETERS |
354 | 355 | // note that they will be converted to TStatusCode.INVALID_ARGUMENT when being sent from be to fe |
355 | | - if (request.getTaskStatus().getStatusCode() == TStatusCode.INVALID_ARGUMENT) { |
356 | | - pushTask.countDownToZero(request.getTaskStatus().getStatusCode(), |
357 | | - task.getBackendId() + ": " + request.getTaskStatus().getErrorMsgs().toString()); |
| 356 | + TStatus taskStatus = request.getTaskStatus(); |
| 357 | + String msg = task.getBackendId() + ": " + taskStatus.getErrorMsgs().toString(); |
| 358 | + if (taskStatus.getStatusCode() == TStatusCode.INVALID_ARGUMENT) { |
| 359 | + pushTask.countDownToZero(taskStatus.getStatusCode(), msg); |
358 | 360 | } else { |
359 | | - pushTask.countDownLatch(backendId, pushTabletId); |
| 361 | + pushTask.countDownLatchWithStatus(backendId, pushTabletId, |
| 362 | + new Status(taskStatus.getStatusCode(), msg)); |
360 | 363 | } |
361 | 364 | AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature); |
362 | 365 | LOG.warn("finish push replica, signature={}, error: {}", |
363 | | - signature, request.getTaskStatus().getErrorMsgs().toString()); |
| 366 | + signature, taskStatus.getErrorMsgs().toString()); |
364 | 367 | } |
365 | 368 | return; |
366 | 369 | } |
@@ -471,7 +474,7 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro |
471 | 474 | AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature); |
472 | 475 | LOG.warn("finish push replica error", e); |
473 | 476 | if (pushTask.getPushType() == TPushType.DELETE) { |
474 | | - pushTask.countDownLatch(backendId, pushTabletId); |
| 477 | + pushTask.countDownLatchWithStatus(backendId, pushTabletId, Status.CANCELLED); |
475 | 478 | } |
476 | 479 | } finally { |
477 | 480 | olapTable.writeUnlock(); |
|
0 commit comments