Skip to content

Commit

Permalink
[BugFix] Fix stream load load_finish_time updated unexpected after tr…
Browse files Browse the repository at this point in the history
…ansaction committed (#51174)

Signed-off-by: meegoo <meegoo.sr@gmail.com>
(cherry picked from commit 49ef20f)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java
#	fe/fe-core/src/test/java/com/starrocks/load/streamload/StreamLoadManagerTest.java
  • Loading branch information
meegoo authored and mergify[bot] committed Sep 23, 2024
1 parent 92c6c1f commit d2b5fa7
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ public enum Type {
private long startPreparingTimeMs;
@SerializedName(value = "finishPreparingTimeMs")
private long finishPreparingTimeMs;
@SerializedName(value = "commitTimeMs")
private long commitTimeMs;
@SerializedName(value = "endTimeMs")
private long endTimeMs;
@SerializedName(value = "txnId")
Expand Down Expand Up @@ -591,7 +593,7 @@ public void waitCoordFinishAndPrepareTxn(TransactionResult resp) {
boolean exception = false;
writeLock();
try {
if (isFinalState()) {
if (isUnreversibleState()) {
if (state == State.CANCELLED) {
resp.setOKMsg("txn could not be prepared because task state is: " + state
+ ", error_msg: " + errorMsg);
Expand Down Expand Up @@ -663,7 +665,7 @@ public void commitTxn(TransactionResult resp) throws UserException {
boolean exception = false;
readLock();
try {
if (isFinalState()) {
if (isUnreversibleState()) {
if (state == State.CANCELLED) {
resp.setOKMsg("txn could not be committed because task state is: " + state
+ ", error_msg: " + errorMsg);
Expand Down Expand Up @@ -848,7 +850,7 @@ public String cancelTask(String msg) {
}
readLock();
try {
if (isFinalState()) {
if (isUnreversibleState()) {
if (state == State.CANCELLED) {
return "cur task state is: " + state
+ ", error_msg: " + errorMsg;
Expand Down Expand Up @@ -936,7 +938,7 @@ public boolean checkNeedPrepareTxn() {
public void beforePrepared(TransactionState txnState) throws TransactionException {
writeLock();
try {
if (isFinalState()) {
if (isUnreversibleState()) {
throw new TransactionException("txn could not be prepared because task state is: " + state);
}
} finally {
Expand Down Expand Up @@ -980,7 +982,7 @@ public void replayOnPrepared(TransactionState txnState) {
public void beforeCommitted(TransactionState txnState) throws TransactionException {
writeLock();
try {
if (isFinalState()) {
if (isUnreversibleState()) {
throw new TransactionException("txn could not be commited because task state is: " + state);
}
isCommitting = true;
Expand All @@ -1006,8 +1008,8 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw
this.channels.set(i, State.COMMITED);
}
this.state = State.COMMITED;
commitTimeMs = System.currentTimeMillis();
isCommitting = false;
endTimeMs = System.currentTimeMillis();
} finally {
writeUnlock();
// sync stream load related query info should unregister here
Expand Down Expand Up @@ -1090,8 +1092,8 @@ public void replayOnCommitted(TransactionState txnState) {
this.channels.set(i, State.COMMITED);
}
this.state = State.COMMITED;
commitTimeMs = txnState.getCommitTime();
this.preparedChannelNum = this.channelNum;
this.endTimeMs = txnState.getCommitTime();
} finally {
writeUnlock();
}
Expand All @@ -1106,7 +1108,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String

writeLock();
try {
if (isFinalState()) {
if (isUnreversibleState()) {
return;
}
if (coord != null && !isSyncStreamLoad) {
Expand Down Expand Up @@ -1246,11 +1248,19 @@ public long createTimeMs() {
return createTimeMs;
}

public long commitTimeMs() {
return commitTimeMs;
}

public long endTimeMs() {
return endTimeMs;
}

public boolean isFinalState() {
return state == State.CANCELLED || state == State.FINISHED;
}

public boolean isUnreversibleState() {
return state == State.CANCELLED || state == State.COMMITED || state == State.FINISHED;
}

Expand Down Expand Up @@ -1429,7 +1439,80 @@ public void gsonPreProcess() throws IOException {
loadIdLo = loadId.getLo();
}

<<<<<<< HEAD
public TStreamLoadInfo toThrift() {
=======
public String toRuntimeDetails() {
TreeMap<String, Object> runtimeDetails = Maps.newTreeMap();
if (!clientIp.equals("")) {
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_CLIENT_IP, clientIp);
}
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_LOAD_ID, DebugUtil.printId(loadId));
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_TXN_ID, txnId);
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_BEGIN_TXN_TIME_MS, beginTxnTimeMs);
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_RECEIVE_DATA_TIME_MS, receiveDataTimeMs);
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_PLAN_TIME_MS, planTimeMs);
Gson gson = new Gson();
return gson.toJson(runtimeDetails);
}

public String toProperties() {
TreeMap<String, Object> properties = Maps.newTreeMap();
properties.put(LoadConstants.PROPERTIES_TIMEOUT, timeoutMs / 1000);
Gson gson = new Gson();
return gson.toJson(properties);
}

public TLoadInfo toThrift() {
readLock();
try {
TLoadInfo info = new TLoadInfo();
info.setJob_id(id);
info.setLabel(label);
info.setLoad_id(DebugUtil.printId(loadId));
info.setTxn_id(txnId);
info.setDb(dbName);
info.setTable(tableName);
info.setUser(user);
info.setState(state.name());
info.setError_msg(errorMsg);
info.setRuntime_details(toRuntimeDetails());
info.setProperties(toProperties());
if (state == State.FINISHED) {
info.setProgress("100%");
} else {
info.setProgress("0%");
}
if (ProfileManager.getInstance().hasProfile(DebugUtil.printId(loadId))) {
info.setProfile_id(DebugUtil.printId(loadId));
}
// tracking url
if (trackingUrl != null) {
info.setUrl(trackingUrl);
info.setTracking_sql("select tracking_log from information_schema.load_tracking_logs where job_id=" + id);
}
info.setPriority(LoadPriority.NORMAL);

info.setNum_sink_rows(numRowsNormal);
info.setNum_filtered_rows(numRowsAbnormal);
info.setNum_unselected_rows(numRowsUnselected);
info.setNum_scan_bytes(numLoadBytesTotal);

info.setCreate_time(TimeUtils.longToTimeString(createTimeMs));
info.setLoad_start_time(TimeUtils.longToTimeString(startLoadingTimeMs));
info.setLoad_commit_time(TimeUtils.longToTimeString(commitTimeMs));
info.setLoad_finish_time(TimeUtils.longToTimeString(endTimeMs));

info.setType(getStringByType());
return info;
} finally {
readUnlock();
}

}

public TStreamLoadInfo toStreamLoadThrift() {
>>>>>>> 49ef20f904 ([BugFix] Fix stream load load_finish_time updated unexpected after transaction committed (#51174))
readLock();
try {
TStreamLoadInfo info = new TStreamLoadInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,19 @@ public void testStreamLoadTaskAfterCommit() throws UserException {

TransactionState state = new TransactionState();
task.afterCommitted(state, true);
Assert.assertNotEquals(-1, task.endTimeMs());
Assert.assertNotEquals(-1, task.commitTimeMs());

<<<<<<< HEAD
state.setCommitTime(task.endTimeMs());
task.replayOnCommitted(state);
Assert.assertEquals(task.endTimeMs(), state.getCommitTime());
=======
Assert.assertTrue(task.isUnreversibleState());
Assert.assertFalse(task.isFinalState());

streamLoadManager.cleanSyncStreamLoadTasks();
Assert.assertEquals(1, streamLoadManager.getStreamLoadTaskCount());
>>>>>>> 49ef20f904 ([BugFix] Fix stream load load_finish_time updated unexpected after transaction committed (#51174))
}

}

0 comments on commit d2b5fa7

Please sign in to comment.