Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2905,14 +2905,13 @@ Status Tablet::prepare_txn(TPartitionId partition_id, TTransactionId transaction

// TODO: rename debugpoint.
DBUG_EXECUTE_IF("PushHandler::_do_streaming_ingestion.try_lock_fail", {
return Status::Error<TRY_LOCK_FAILED>(
"PushHandler::_do_streaming_ingestion get lock failed");
return Status::ObtainLockFailed("PushHandler::_do_streaming_ingestion get lock failed");
})

if (!base_migration_lock.try_lock_for(
std::chrono::milliseconds(config::migration_lock_timeout_ms))) {
return Status::Error<TRY_LOCK_FAILED>("try_lock migration lock failed after {}ms",
config::migration_lock_timeout_ms);
return Status::ObtainLockFailed("try_lock migration lock failed after {}ms",
config::migration_lock_timeout_ms);
}

std::lock_guard<std::mutex> push_lock(get_push_lock());
Expand Down
30 changes: 20 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.cloud.master.CloudReportHandler;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Status;
import org.apache.doris.load.DeleteJob;
import org.apache.doris.system.Backend;
import org.apache.doris.task.AgentTask;
Expand Down Expand Up @@ -341,18 +342,31 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
long backendId = pushTask.getBackendId();
long signature = task.getSignature();
long transactionId = ((PushTask) task).getTransactionId();
long tableId = pushTask.getTableId();
long partitionId = pushTask.getPartitionId();
long pushIndexId = pushTask.getIndexId();
long pushTabletId = pushTask.getTabletId();

if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
if (pushTask.getPushType() == TPushType.DELETE) {
// we don't need to retry if the returned status code is DELETE_INVALID_CONDITION
// or DELETE_INVALID_PARAMETERS
// note that they will be converted to TStatusCode.INVALID_ARGUMENT when being sent from be to fe
if (request.getTaskStatus().getStatusCode() == TStatusCode.INVALID_ARGUMENT) {
pushTask.countDownToZero(request.getTaskStatus().getStatusCode(),
task.getBackendId() + ": " + request.getTaskStatus().getErrorMsgs().toString());
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
LOG.warn("finish push replica error: {}", request.getTaskStatus().getErrorMsgs().toString());
TStatus taskStatus = request.getTaskStatus();
String msg = task.getBackendId() + ": " + taskStatus.getErrorMsgs().toString();
LOG.warn("finish push replica, signature={}, error: {}",
signature, taskStatus.getErrorMsgs().toString());
if (taskStatus.getStatusCode() == TStatusCode.OBTAIN_LOCK_FAILED) {
// retry if obtain lock failed
return;
}
if (taskStatus.getStatusCode() == TStatusCode.INVALID_ARGUMENT) {
pushTask.countDownToZero(taskStatus.getStatusCode(), msg);
} else {
pushTask.countDownLatchWithStatus(backendId, pushTabletId,
new Status(taskStatus.getStatusCode(), msg));
}
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
}
return;
}
Expand All @@ -366,10 +380,6 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
return;
}

long tableId = pushTask.getTableId();
long partitionId = pushTask.getPartitionId();
long pushIndexId = pushTask.getIndexId();
long pushTabletId = pushTask.getTabletId();
// push finish type:
// numOfFinishTabletInfos tabletId schemaHash
// Normal: 1 / /
Expand Down Expand Up @@ -458,7 +468,7 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
LOG.warn("finish push replica error", e);
if (pushTask.getPushType() == TPushType.DELETE) {
pushTask.countDownLatch(backendId, pushTabletId);
pushTask.countDownLatchWithStatus(backendId, pushTabletId, Status.CANCELLED);
}
} finally {
olapTable.writeUnlock();
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,18 @@ public void countDownLatch(long backendId, long tabletId) {
}
}

public void countDownLatchWithStatus(long backendId, long tabletId, Status st) {
if (this.latch == null) {
return;
}
if (latch.markedCountDownWithStatus(backendId, tabletId, st)) {
if (LOG.isDebugEnabled()) {
LOG.debug("pushTask current latch count with status: {}. backend: {}, tablet:{}, st::{}",
latch.getCount(), backendId, tabletId, st);
}
}
}

// call this always means one of tasks is failed. count down to zero to finish entire task
public void countDownToZero(TStatusCode code, String errMsg) {
if (this.latch != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ suite("test_delete_from_timeout","nonConcurrent") {

GetDebugPoint().clearDebugPointsForAllBEs()

GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure",
[error_code: -235 /* TOO MANY VERSIONS */, error_msg: "too many versions"])

test {
sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """
exception "too many versions"
}

GetDebugPoint().clearDebugPointsForAllBEs()

GetDebugPoint().enableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")

def t1 = Thread.start {
Expand Down
Loading