Skip to content

Commit

Permalink
[BugFix] Fix stream load load_finish_time updated unexpected after tr…
Browse files Browse the repository at this point in the history
…ansaction committed (#51174)

Signed-off-by: meegoo <meegoo.sr@gmail.com>
  • Loading branch information
meegoo authored Sep 23, 2024
1 parent a4ac78e commit 49ef20f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ public enum Type {
private long startPreparingTimeMs;
@SerializedName(value = "finishPreparingTimeMs")
private long finishPreparingTimeMs;
@SerializedName(value = "commitTimeMs")
private long commitTimeMs;
@SerializedName(value = "endTimeMs")
private long endTimeMs;
@SerializedName(value = "txnId")
Expand Down Expand Up @@ -616,7 +618,7 @@ public void waitCoordFinishAndPrepareTxn(TransactionResult resp) {
boolean exception = false;
writeLock();
try {
if (isFinalState()) {
if (isUnreversibleState()) {
if (state == State.CANCELLED) {
resp.setOKMsg("txn could not be prepared because task state is: " + state
+ ", error_msg: " + errorMsg);
Expand Down Expand Up @@ -688,7 +690,7 @@ public void commitTxn(TransactionResult resp) throws UserException {
boolean exception = false;
readLock();
try {
if (isFinalState()) {
if (isUnreversibleState()) {
if (state == State.CANCELLED) {
resp.setOKMsg("txn could not be committed because task state is: " + state
+ ", error_msg: " + errorMsg);
Expand Down Expand Up @@ -868,7 +870,7 @@ public String cancelTask(String msg) {
}
readLock();
try {
if (isFinalState()) {
if (isUnreversibleState()) {
if (state == State.CANCELLED) {
return "cur task state is: " + state
+ ", error_msg: " + errorMsg;
Expand Down Expand Up @@ -956,7 +958,7 @@ public boolean checkNeedPrepareTxn() {
public void beforePrepared(TransactionState txnState) throws TransactionException {
writeLock();
try {
if (isFinalState()) {
if (isUnreversibleState()) {
throw new TransactionException("txn could not be prepared because task state is: " + state);
}
} finally {
Expand Down Expand Up @@ -1000,7 +1002,7 @@ public void replayOnPrepared(TransactionState txnState) {
public void beforeCommitted(TransactionState txnState) throws TransactionException {
writeLock();
try {
if (isFinalState()) {
if (isUnreversibleState()) {
throw new TransactionException("txn could not be commited because task state is: " + state);
}
isCommitting = true;
Expand All @@ -1026,8 +1028,8 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw
this.channels.set(i, State.COMMITED);
}
this.state = State.COMMITED;
commitTimeMs = System.currentTimeMillis();
isCommitting = false;
endTimeMs = System.currentTimeMillis();
} finally {
writeUnlock();
// sync stream load related query info should unregister here
Expand Down Expand Up @@ -1118,8 +1120,8 @@ public void replayOnCommitted(TransactionState txnState) {
this.channels.set(i, State.COMMITED);
}
this.state = State.COMMITED;
commitTimeMs = txnState.getCommitTime();
this.preparedChannelNum = this.channelNum;
this.endTimeMs = txnState.getCommitTime();
} finally {
writeUnlock();
}
Expand All @@ -1138,7 +1140,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String

writeLock();
try {
if (isFinalState()) {
if (isUnreversibleState()) {
return;
}
if (coord != null && !isSyncStreamLoad) {
Expand Down Expand Up @@ -1279,11 +1281,19 @@ public long createTimeMs() {
return createTimeMs;
}

public long commitTimeMs() {
return commitTimeMs;
}

public long endTimeMs() {
return endTimeMs;
}

public boolean isFinalState() {
return state == State.CANCELLED || state == State.FINISHED;
}

public boolean isUnreversibleState() {
return state == State.CANCELLED || state == State.COMMITED || state == State.FINISHED;
}

Expand Down Expand Up @@ -1528,7 +1538,7 @@ public TLoadInfo toThrift() {

info.setCreate_time(TimeUtils.longToTimeString(createTimeMs));
info.setLoad_start_time(TimeUtils.longToTimeString(startLoadingTimeMs));
info.setLoad_commit_time(TimeUtils.longToTimeString(finishPreparingTimeMs));
info.setLoad_commit_time(TimeUtils.longToTimeString(commitTimeMs));
info.setLoad_finish_time(TimeUtils.longToTimeString(endTimeMs));

info.setType(getStringByType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,13 @@ public void testStreamLoadTaskAfterCommit() throws UserException {

TransactionState state = new TransactionState();
task.afterCommitted(state, true);
Assert.assertNotEquals(-1, task.endTimeMs());
Assert.assertNotEquals(-1, task.commitTimeMs());

state.setCommitTime(task.endTimeMs());
task.replayOnCommitted(state);
Assert.assertEquals(task.endTimeMs(), state.getCommitTime());
Assert.assertTrue(task.isUnreversibleState());
Assert.assertFalse(task.isFinalState());

streamLoadManager.cleanSyncStreamLoadTasks();
Assert.assertEquals(0, streamLoadManager.getStreamLoadTaskCount());
Assert.assertEquals(1, streamLoadManager.getStreamLoadTaskCount());
}

}

0 comments on commit 49ef20f

Please sign in to comment.