From dd842e8a9e019bd3029270352ce19776eb565eb1 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 27 Sep 2024 16:35:15 +0800 Subject: [PATCH] [BugFix] Fix stream load load_finish_time updated unexpected after transaction committed (backport #51174) (#51265) Signed-off-by: meegoo Co-authored-by: meegoo --- .../load/streamload/StreamLoadTask.java | 26 +++++++++++++------ .../streamload/StreamLoadManagerTest.java | 10 ++++--- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java index 519001cbcf0bc..d25b5b2650653 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java @@ -132,6 +132,8 @@ public enum Type { private long startPreparingTimeMs; @SerializedName(value = "finishPreparingTimeMs") private long finishPreparingTimeMs; + @SerializedName(value = "commitTimeMs") + private long commitTimeMs; @SerializedName(value = "endTimeMs") private long endTimeMs; @SerializedName(value = "txnId") @@ -591,7 +593,7 @@ public void waitCoordFinishAndPrepareTxn(TransactionResult resp) { boolean exception = false; writeLock(); try { - if (isFinalState()) { + if (isUnreversibleState()) { if (state == State.CANCELLED) { resp.setOKMsg("txn could not be prepared because task state is: " + state + ", error_msg: " + errorMsg); @@ -663,7 +665,7 @@ public void commitTxn(TransactionResult resp) throws UserException { boolean exception = false; readLock(); try { - if (isFinalState()) { + if (isUnreversibleState()) { if (state == State.CANCELLED) { resp.setOKMsg("txn could not be committed because task state is: " + state + ", error_msg: " + errorMsg); @@ -848,7 +850,7 @@ public String cancelTask(String msg) { } readLock(); try { - if (isFinalState()) { + if (isUnreversibleState()) { if (state == State.CANCELLED) { return "cur task state is: " + state + ", error_msg: " + errorMsg; @@ -936,7 +938,7 @@ public boolean checkNeedPrepareTxn() { public void beforePrepared(TransactionState txnState) throws TransactionException { writeLock(); try { - if (isFinalState()) { + if (isUnreversibleState()) { throw new TransactionException("txn could not be prepared because task state is: " + state); } } finally { @@ -980,7 +982,7 @@ public void replayOnPrepared(TransactionState txnState) { public void beforeCommitted(TransactionState txnState) throws TransactionException { writeLock(); try { - if (isFinalState()) { + if (isUnreversibleState()) { throw new TransactionException("txn could not be commited because task state is: " + state); } isCommitting = true; @@ -1006,8 +1008,8 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw this.channels.set(i, State.COMMITED); } this.state = State.COMMITED; + commitTimeMs = System.currentTimeMillis(); isCommitting = false; - endTimeMs = System.currentTimeMillis(); } finally { writeUnlock(); // sync stream load related query info should unregister here @@ -1090,8 +1092,8 @@ public void replayOnCommitted(TransactionState txnState) { this.channels.set(i, State.COMMITED); } this.state = State.COMMITED; + commitTimeMs = txnState.getCommitTime(); this.preparedChannelNum = this.channelNum; - this.endTimeMs = txnState.getCommitTime(); } finally { writeUnlock(); } @@ -1106,7 +1108,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String writeLock(); try { - if (isFinalState()) { + if (isUnreversibleState()) { return; } if (coord != null && !isSyncStreamLoad) { @@ -1246,11 +1248,19 @@ public long createTimeMs() { return createTimeMs; } + public long commitTimeMs() { + return commitTimeMs; + } + public long endTimeMs() { return endTimeMs; } public boolean isFinalState() { + return state == State.CANCELLED || state == State.FINISHED; + } + + public boolean isUnreversibleState() { return state == State.CANCELLED || state == State.COMMITED || state == State.FINISHED; } diff --git a/fe/fe-core/src/test/java/com/starrocks/load/streamload/StreamLoadManagerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/streamload/StreamLoadManagerTest.java index 9057a010aea0b..a4dad06d732f0 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/streamload/StreamLoadManagerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/streamload/StreamLoadManagerTest.java @@ -260,11 +260,13 @@ public void testStreamLoadTaskAfterCommit() throws UserException { TransactionState state = new TransactionState(); task.afterCommitted(state, true); - Assert.assertNotEquals(-1, task.endTimeMs()); + Assert.assertNotEquals(-1, task.commitTimeMs()); - state.setCommitTime(task.endTimeMs()); - task.replayOnCommitted(state); - Assert.assertEquals(task.endTimeMs(), state.getCommitTime()); + Assert.assertTrue(task.isUnreversibleState()); + Assert.assertFalse(task.isFinalState()); + + streamLoadManager.cleanSyncStreamLoadTasks(); + Assert.assertEquals(1, streamLoadManager.getStreamLoadTaskCount()); } }