Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.cloud.master.CloudReportHandler;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Status;
import org.apache.doris.load.DeleteJob;
import org.apache.doris.load.loadv2.IngestionLoadJob;
import org.apache.doris.system.Backend;
Expand Down Expand Up @@ -319,18 +320,31 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
long backendId = pushTask.getBackendId();
long signature = task.getSignature();
long transactionId = ((PushTask) task).getTransactionId();
long tableId = pushTask.getTableId();
long partitionId = pushTask.getPartitionId();
long pushIndexId = pushTask.getIndexId();
long pushTabletId = pushTask.getTabletId();

if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
if (pushTask.getPushType() == TPushType.DELETE) {
// we don't need to retry if the returned status code is DELETE_INVALID_CONDITION
// or DELETE_INVALID_PARAMETERS
// note that they will be converted to TStatusCode.INVALID_ARGUMENT when being sent from be to fe
if (request.getTaskStatus().getStatusCode() == TStatusCode.INVALID_ARGUMENT) {
pushTask.countDownToZero(request.getTaskStatus().getStatusCode(),
task.getBackendId() + ": " + request.getTaskStatus().getErrorMsgs().toString());
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
LOG.warn("finish push replica error: {}", request.getTaskStatus().getErrorMsgs().toString());
TStatus taskStatus = request.getTaskStatus();
String msg = task.getBackendId() + ": " + taskStatus.getErrorMsgs().toString();
LOG.warn("finish push replica, signature={}, error: {}",
signature, taskStatus.getErrorMsgs().toString());
if (taskStatus.getStatusCode() == TStatusCode.OBTAIN_LOCK_FAILED) {
// retry if obtain lock failed
return;
}
if (taskStatus.getStatusCode() == TStatusCode.INVALID_ARGUMENT) {
pushTask.countDownToZero(taskStatus.getStatusCode(), msg);
} else {
pushTask.countDownLatchWithStatus(backendId, pushTabletId,
new Status(taskStatus.getStatusCode(), msg));
}
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
}
return;
}
Expand All @@ -344,10 +358,6 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
return;
}

long tableId = pushTask.getTableId();
long partitionId = pushTask.getPartitionId();
long pushIndexId = pushTask.getIndexId();
long pushTabletId = pushTask.getTabletId();
// push finish type:
// numOfFinishTabletInfos tabletId schemaHash
// Normal: 1 / /
Expand Down Expand Up @@ -445,7 +455,7 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
LOG.warn("finish push replica error", e);
if (pushTask.getPushType() == TPushType.DELETE) {
pushTask.countDownLatch(backendId, pushTabletId);
pushTask.countDownLatchWithStatus(backendId, pushTabletId, Status.CANCELLED);
}
} finally {
olapTable.writeUnlock();
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,18 @@ public void countDownLatch(long backendId, long tabletId) {
}
}

public void countDownLatchWithStatus(long backendId, long tabletId, Status st) {
if (this.latch == null) {
return;
}
if (latch.markedCountDownWithStatus(backendId, tabletId, st)) {
if (LOG.isDebugEnabled()) {
LOG.debug("pushTask current latch count with status: {}. backend: {}, tablet:{}, st::{}",
latch.getCount(), backendId, tabletId, st);
}
}
}

// call this always means one of tasks is failed. count down to zero to finish entire task
public void countDownToZero(TStatusCode code, String errMsg) {
if (this.latch != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ suite("test_delete_from_timeout","nonConcurrent") {

GetDebugPoint().clearDebugPointsForAllBEs()

GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure",
[error_code: -235 /* TOO MANY VERSIONS */, error_msg: "too many versions"])

test {
sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """
exception "too many versions"
}

GetDebugPoint().clearDebugPointsForAllBEs()

GetDebugPoint().enableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")

def t1 = Thread.start {
Expand Down
Loading