Skip to content

Commit 406088b

Browse files
[fix](delete) Delete should count down latch and clear an agent task when failed
1 parent e29025e commit 406088b

File tree

4 files changed

+45
-14
lines changed

4 files changed

+45
-14
lines changed

be/src/olap/tablet.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2905,14 +2905,13 @@ Status Tablet::prepare_txn(TPartitionId partition_id, TTransactionId transaction
29052905

29062906
// TODO: rename debugpoint.
29072907
DBUG_EXECUTE_IF("PushHandler::_do_streaming_ingestion.try_lock_fail", {
2908-
return Status::Error<TRY_LOCK_FAILED>(
2909-
"PushHandler::_do_streaming_ingestion get lock failed");
2908+
return Status::ObtainLockFailed("PushHandler::_do_streaming_ingestion get lock failed");
29102909
})
29112910

29122911
if (!base_migration_lock.try_lock_for(
29132912
std::chrono::milliseconds(config::migration_lock_timeout_ms))) {
2914-
return Status::Error<TRY_LOCK_FAILED>("try_lock migration lock failed after {}ms",
2915-
config::migration_lock_timeout_ms);
2913+
return Status::ObtainLockFailed("try_lock migration lock failed after {}ms",
2914+
config::migration_lock_timeout_ms);
29162915
}
29172916

29182917
std::lock_guard<std::mutex> push_lock(get_push_lock());

fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.doris.cloud.master.CloudReportHandler;
3434
import org.apache.doris.common.Config;
3535
import org.apache.doris.common.MetaNotFoundException;
36+
import org.apache.doris.common.Status;
3637
import org.apache.doris.load.DeleteJob;
3738
import org.apache.doris.load.loadv2.IngestionLoadJob;
3839
import org.apache.doris.system.Backend;
@@ -342,18 +343,31 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
342343
long backendId = pushTask.getBackendId();
343344
long signature = task.getSignature();
344345
long transactionId = ((PushTask) task).getTransactionId();
346+
long tableId = pushTask.getTableId();
347+
long partitionId = pushTask.getPartitionId();
348+
long pushIndexId = pushTask.getIndexId();
349+
long pushTabletId = pushTask.getTabletId();
345350

346351
if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
347352
if (pushTask.getPushType() == TPushType.DELETE) {
348353
// we don't need to retry if the returned status code is DELETE_INVALID_CONDITION
349354
// or DELETE_INVALID_PARAMETERS
350355
// note that they will be converted to TStatusCode.INVALID_ARGUMENT when being sent from be to fe
351-
if (request.getTaskStatus().getStatusCode() == TStatusCode.INVALID_ARGUMENT) {
352-
pushTask.countDownToZero(request.getTaskStatus().getStatusCode(),
353-
task.getBackendId() + ": " + request.getTaskStatus().getErrorMsgs().toString());
354-
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
355-
LOG.warn("finish push replica error: {}", request.getTaskStatus().getErrorMsgs().toString());
356+
TStatus taskStatus = request.getTaskStatus();
357+
String msg = task.getBackendId() + ": " + taskStatus.getErrorMsgs().toString();
358+
LOG.warn("finish push replica, signature={}, error: {}",
359+
signature, taskStatus.getErrorMsgs().toString());
360+
if (taskStatus.getStatusCode() == TStatusCode.OBTAIN_LOCK_FAILED) {
361+
// retry if obtain lock failed
362+
return;
363+
}
364+
if (taskStatus.getStatusCode() == TStatusCode.INVALID_ARGUMENT) {
365+
pushTask.countDownToZero(taskStatus.getStatusCode(), msg);
366+
} else {
367+
pushTask.countDownLatchWithStatus(backendId, pushTabletId,
368+
new Status(taskStatus.getStatusCode(), msg));
356369
}
370+
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
357371
}
358372
return;
359373
}
@@ -367,10 +381,6 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
367381
return;
368382
}
369383

370-
long tableId = pushTask.getTableId();
371-
long partitionId = pushTask.getPartitionId();
372-
long pushIndexId = pushTask.getIndexId();
373-
long pushTabletId = pushTask.getTabletId();
374384
// push finish type:
375385
// numOfFinishTabletInfos tabletId schemaHash
376386
// Normal: 1 / /
@@ -468,7 +478,7 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
468478
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
469479
LOG.warn("finish push replica error", e);
470480
if (pushTask.getPushType() == TPushType.DELETE) {
471-
pushTask.countDownLatch(backendId, pushTabletId);
481+
pushTask.countDownLatchWithStatus(backendId, pushTabletId, Status.CANCELLED);
472482
}
473483
} finally {
474484
olapTable.writeUnlock();

fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,18 @@ public void countDownLatch(long backendId, long tabletId) {
212212
}
213213
}
214214

215+
public void countDownLatchWithStatus(long backendId, long tabletId, Status st) {
216+
if (this.latch == null) {
217+
return;
218+
}
219+
if (latch.markedCountDownWithStatus(backendId, tabletId, st)) {
220+
if (LOG.isDebugEnabled()) {
221+
LOG.debug("pushTask current latch count with status: {}. backend: {}, tablet:{}, st::{}",
222+
latch.getCount(), backendId, tabletId, st);
223+
}
224+
}
225+
}
226+
215227
// call this always means one of tasks is failed. count down to zero to finish entire task
216228
public void countDownToZero(TStatusCode code, String errMsg) {
217229
if (this.latch != null) {

regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,16 @@ suite("test_delete_from_timeout","nonConcurrent") {
4646

4747
GetDebugPoint().clearDebugPointsForAllBEs()
4848

49+
GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure",
50+
[error_code: -235 /* TOO MANY VERSIONS */, error_msg: "too many versions"])
51+
52+
test {
53+
sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """
54+
exception "invalid parameters for store_cond. condition_size=1"
55+
}
56+
57+
GetDebugPoint().clearDebugPointsForAllBEs()
58+
4959
GetDebugPoint().enableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")
5060

5161
def t1 = Thread.start {

0 commit comments

Comments
 (0)