From 27f64c549e32aca6df333f43fa7d993a0a1d6d6d Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Mon, 22 Jan 2024 13:15:46 +0800 Subject: [PATCH] [fix](routine-load) optimize error msg when meet out of range (#30118) --- .../doris/load/routineload/RoutineLoadJob.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 0a947701ef506b..889d240ce291d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1142,11 +1142,24 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String if (txnStatusChangeReasonString != null) { txnStatusChangeReason = TransactionState.TxnStatusChangeReason.fromString(txnStatusChangeReasonString); + String msg; if (txnStatusChangeReason != null) { switch (txnStatusChangeReason) { case OFFSET_OUT_OF_RANGE: + msg = "be " + taskBeId + " abort task," + + " task id: " + routineLoadTaskInfo.getId() + + " job id: " + routineLoadTaskInfo.getJobId() + + " with reason: " + txnStatusChangeReasonString + + " the offset used by job does not exist in kafka," + + " please check the offset," + + " using the Alter ROUTINE LOAD command to modify it," + + " and resume the job"; + updateState(JobState.PAUSED, + new ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg), + false /* not replay */); + return; case PAUSE: - String msg = "be " + taskBeId + " abort task " + msg = "be " + taskBeId + " abort task " + "with reason: " + txnStatusChangeReasonString; updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),