Skip to content

Commit

Permalink
[fix](cloud) abortTransaction does not handle response code (apache#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi authored Sep 26, 2024
1 parent 62a64d1 commit f4c4b27
Showing 1 changed file with 28 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,33 @@ public void abortTransaction(Long dbId, Long transactionId, String reason,
}
throw new UserException("abortTxn failed, errMsg:" + e.getMessage());
}
afterAbortTxnResp(abortTxnResponse, String.valueOf(transactionId), txnCommitAttachment);
}

private void afterAbortTxnResp(AbortTxnResponse abortTxnResponse, String txnIdOrLabel,
TxnCommitAttachment txnCommitAttachment) throws UserException {
if (abortTxnResponse.getStatus().getCode() != MetaServiceCode.OK) {
LOG.warn("abortTxn failed, transaction:{}, response:{}", txnIdOrLabel, abortTxnResponse);
// For routine load, it is necessary to release the write lock when abort transaction fails,
// otherwise it will cause the lock added in beforeAborted to not be released.
if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment;
Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock();
}
switch (abortTxnResponse.getStatus().getCode()) {
case TXN_ID_NOT_FOUND:
case TXN_LABEL_NOT_FOUND:
case TXN_INVALID_STATUS:
throw new TransactionNotFoundException("transaction [" + txnIdOrLabel + "] not found");
case TXN_ALREADY_ABORTED:
throw new TransactionNotFoundException("transaction [" + txnIdOrLabel + "] is already aborted");
case TXN_ALREADY_VISIBLE:
throw new UserException(
"transaction [" + txnIdOrLabel + "] is already visible, " + ", could not abort");
default:
throw new UserException(abortTxnResponse.getStatus().getMsg());
}
}

TransactionState txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo());
TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId());
Expand Down Expand Up @@ -1149,20 +1176,7 @@ public void abortTransaction(Long dbId, String label, String reason) throws User
LOG.warn("abortTxn failed, label:{}, exception:", label, e);
throw new UserException("abortTxn failed, errMsg:" + e.getMessage());
}

TransactionState txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo());
TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId());
if (cb == null) {
LOG.info("no callback to run for this txn, txnId:{} callbackId:{}", txnState.getTransactionId(),
txnState.getCallbackId());
return;
}

LOG.info("run txn callback, txnId:{} callbackId:{}", txnState.getTransactionId(), txnState.getCallbackId());
cb.afterAborted(txnState, true, txnState.getReason());
if (MetricRepo.isInit) {
MetricRepo.COUNTER_TXN_FAILED.increase(1L);
}
afterAbortTxnResp(abortTxnResponse, label, null);
}

@Override
Expand Down

0 comments on commit f4c4b27

Please sign in to comment.