Skip to content

Commit cf0dfd8

Browse files
TangSiyang2001Your Name
authored andcommitted
[fix](delete) Delete should count down latch and clear an agent task when failed (#57428)
1 parent a17454c commit cf0dfd8

File tree

4 files changed

+45
-14
lines changed

4 files changed

+45
-14
lines changed

be/src/olap/tablet.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2905,14 +2905,13 @@ Status Tablet::prepare_txn(TPartitionId partition_id, TTransactionId transaction
29052905

29062906
// TODO: rename debugpoint.
29072907
DBUG_EXECUTE_IF("PushHandler::_do_streaming_ingestion.try_lock_fail", {
2908-
return Status::Error<TRY_LOCK_FAILED>(
2909-
"PushHandler::_do_streaming_ingestion get lock failed");
2908+
return Status::ObtainLockFailed("PushHandler::_do_streaming_ingestion get lock failed");
29102909
})
29112910

29122911
if (!base_migration_lock.try_lock_for(
29132912
std::chrono::milliseconds(config::migration_lock_timeout_ms))) {
2914-
return Status::Error<TRY_LOCK_FAILED>("try_lock migration lock failed after {}ms",
2915-
config::migration_lock_timeout_ms);
2913+
return Status::ObtainLockFailed("try_lock migration lock failed after {}ms",
2914+
config::migration_lock_timeout_ms);
29162915
}
29172916

29182917
std::lock_guard<std::mutex> push_lock(get_push_lock());

fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.doris.cloud.master.CloudReportHandler;
3434
import org.apache.doris.common.Config;
3535
import org.apache.doris.common.MetaNotFoundException;
36+
import org.apache.doris.common.Status;
3637
import org.apache.doris.load.DeleteJob;
3738
import org.apache.doris.system.Backend;
3839
import org.apache.doris.task.AgentTask;
@@ -341,18 +342,31 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
341342
long backendId = pushTask.getBackendId();
342343
long signature = task.getSignature();
343344
long transactionId = ((PushTask) task).getTransactionId();
345+
long tableId = pushTask.getTableId();
346+
long partitionId = pushTask.getPartitionId();
347+
long pushIndexId = pushTask.getIndexId();
348+
long pushTabletId = pushTask.getTabletId();
344349

345350
if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
346351
if (pushTask.getPushType() == TPushType.DELETE) {
347352
// we don't need to retry if the returned status code is DELETE_INVALID_CONDITION
348353
// or DELETE_INVALID_PARAMETERS
349354
// note that they will be converted to TStatusCode.INVALID_ARGUMENT when being sent from be to fe
350-
if (request.getTaskStatus().getStatusCode() == TStatusCode.INVALID_ARGUMENT) {
351-
pushTask.countDownToZero(request.getTaskStatus().getStatusCode(),
352-
task.getBackendId() + ": " + request.getTaskStatus().getErrorMsgs().toString());
353-
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
354-
LOG.warn("finish push replica error: {}", request.getTaskStatus().getErrorMsgs().toString());
355+
TStatus taskStatus = request.getTaskStatus();
356+
String msg = task.getBackendId() + ": " + taskStatus.getErrorMsgs().toString();
357+
LOG.warn("finish push replica, signature={}, error: {}",
358+
signature, taskStatus.getErrorMsgs().toString());
359+
if (taskStatus.getStatusCode() == TStatusCode.OBTAIN_LOCK_FAILED) {
360+
// retry if obtain lock failed
361+
return;
362+
}
363+
if (taskStatus.getStatusCode() == TStatusCode.INVALID_ARGUMENT) {
364+
pushTask.countDownToZero(taskStatus.getStatusCode(), msg);
365+
} else {
366+
pushTask.countDownLatchWithStatus(backendId, pushTabletId,
367+
new Status(taskStatus.getStatusCode(), msg));
355368
}
369+
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
356370
}
357371
return;
358372
}
@@ -366,10 +380,6 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
366380
return;
367381
}
368382

369-
long tableId = pushTask.getTableId();
370-
long partitionId = pushTask.getPartitionId();
371-
long pushIndexId = pushTask.getIndexId();
372-
long pushTabletId = pushTask.getTabletId();
373383
// push finish type:
374384
// numOfFinishTabletInfos tabletId schemaHash
375385
// Normal: 1 / /
@@ -458,7 +468,7 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
458468
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
459469
LOG.warn("finish push replica error", e);
460470
if (pushTask.getPushType() == TPushType.DELETE) {
461-
pushTask.countDownLatch(backendId, pushTabletId);
471+
pushTask.countDownLatchWithStatus(backendId, pushTabletId, Status.CANCELLED);
462472
}
463473
} finally {
464474
olapTable.writeUnlock();

fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,18 @@ public void countDownLatch(long backendId, long tabletId) {
212212
}
213213
}
214214

215+
public void countDownLatchWithStatus(long backendId, long tabletId, Status st) {
216+
if (this.latch == null) {
217+
return;
218+
}
219+
if (latch.markedCountDownWithStatus(backendId, tabletId, st)) {
220+
if (LOG.isDebugEnabled()) {
221+
LOG.debug("pushTask current latch count with status: {}. backend: {}, tablet:{}, st::{}",
222+
latch.getCount(), backendId, tabletId, st);
223+
}
224+
}
225+
}
226+
215227
// call this always means one of tasks is failed. count down to zero to finish entire task
216228
public void countDownToZero(TStatusCode code, String errMsg) {
217229
if (this.latch != null) {

regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,16 @@ suite("test_delete_from_timeout","nonConcurrent") {
4646

4747
GetDebugPoint().clearDebugPointsForAllBEs()
4848

49+
GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure",
50+
[error_code: -235 /* TOO MANY VERSIONS */, error_msg: "too many versions"])
51+
52+
test {
53+
sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """
54+
exception "too many versions"
55+
}
56+
57+
GetDebugPoint().clearDebugPointsForAllBEs()
58+
4959
GetDebugPoint().enableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")
5060

5161
def t1 = Thread.start {

0 commit comments

Comments
 (0)