Skip to content

Conversation

@zk-drizzle
Copy link
Contributor

RIP-82 Implement Timer message, transaction message, and index based on RocksDB

Fixes #9786

Brief Description

Implement Timer message, transaction message, and index based on RocksDB

How Did You Test This Change?

zhouli11 and others added 16 commits October 23, 2025 11:06
Change-Id: Ica01fbd08a2d60e9a0f4ab54b41ae4768cc4fa5d
Change-Id: I764ce6cea3159b025bf1e6e65b60e20514cecb4f
Change-Id: I0cc735896e1266934293ec06cb4f07746a1894a1
Change-Id: I6538b6a60631c62e162bf511ae20185a07b72508
Change-Id: I03f83b9e8199f3595ca28471df4a2e94671fbaae
Change-Id: I5b19b671fa0b40cd2aba63f1d0eb654b92775cb4
Change-Id: I22788d112b7f214310719e2d3ebad839ef818e67
Change-Id: I7da89fe635b58ac5026095a8e484fa46f68521e7
Change-Id: I5eade6c31803b8cf05c1a4bb83c226d57f4980c7
Change-Id: Id67f600fad1a876926cd505dba047cc82011160c
Change-Id: Ia4b0e2a21aa5e12570a80713432fd48ddfb210e6
Change-Id: I65a43a612f09280cb61d2a23b3324d198c6a71de
Change-Id: Idf8080c7b17c25e14fe34ef6bad1e1150dab58d6
Change-Id: I904f0120a728b4eb87226159f9c0cde3d18ef26a
@RongtongJin RongtongJin changed the title RIP-82 Implement Timer message, transaction message, and index based on RocksDB [RIP-82] Implement Timer message, transaction message, and index based on RocksDB Oct 29, 2025
fuyou001
fuyou001 previously approved these changes Oct 29, 2025
@fuyou001 fuyou001 self-requested a review October 29, 2025 05:19
drizzle.zk added 2 commits October 29, 2025 13:54
Change-Id: I712b9cde9a18c730fd020ea76f05560e594a9edd
@codecov-commenter
Copy link

Codecov Report

❌ Patch coverage is 12.91667% with 2090 lines in your changes missing coverage. Please review.
✅ Project coverage is 47.40%. Comparing base (bea086f) to head (9fa994b).

Files with missing lines Patch % Lines
.../rocketmq/store/rocksdb/MessageRocksDBStorage.java 14.42% 356 Missing ⚠️
.../store/timer/rocksdb/TimerMessageRocksDBStore.java 0.00% 321 Missing ⚠️
.../apache/rocketmq/store/timer/rocksdb/Timeline.java 0.00% 253 Missing ⚠️
...mq/store/transaction/TransMessageRocksDBStore.java 0.00% 193 Missing ⚠️
...ocketmq/store/index/rocksdb/IndexRocksDBStore.java 8.98% 160 Missing and 2 partials ⚠️
...on/rocksdb/TransactionalMessageRocksDBService.java 0.00% 145 Missing ⚠️
...rocketmq/store/transaction/TransRocksDBRecord.java 0.00% 72 Missing ⚠️
...cketmq/store/timer/rocksdb/TimerRocksDBRecord.java 0.00% 64 Missing ⚠️
...org/apache/rocketmq/store/DefaultMessageStore.java 16.00% 58 Missing and 5 partials ⚠️
...ocketmq/broker/processor/AdminBrokerProcessor.java 0.00% 50 Missing ⚠️
... and 26 more
Additional details and impacted files
@@              Coverage Diff              @@
##             develop    #9787      +/-   ##
=============================================
- Coverage      48.40%   47.40%   -1.00%     
+ Complexity     12253    12252       -1     
=============================================
  Files           1314     1324      +10     
  Lines          93668    96023    +2355     
  Branches       12011    12371     +360     
=============================================
+ Hits           45340    45520     +180     
- Misses         42737    44853    +2116     
- Partials        5591     5650      +59     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.


<properties>
<revision>5.3.4-SNAPSHOT</revision>
<revision>5.3.6-rocksdb-SNAPSHOT</revision>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The version number should not be changed here.

db.flush(flushOptions, timerCFHandle);
log.info("MessageRocksDBStorage flush timer wal success");
} catch (Exception e) {
logError.error("MessageRocksDBStorage flush timer wal failed, error: {}", e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directly printing the exception is preferable, as printing e.getMessage() may lose important stack trace information, hindering troubleshooting.

} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("switch timer engine error");
LOGGER.info("switchTimerEngine error");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, @zk-drizzle . I'm just curious why all the logs here are INFO level, regardless of whether the restart succeeds or fails.

Shouldn't WARN or ERROR level logs be used to provide some warning information to the user when the restart fails?

while (true) {
try {
List<TransRocksDBRecord> trs = messageRocksDBStorage.scanRecordsForTrans(TRANS_COLUMN_FAMILY, MAX_BATCH_SIZE_FROM_ROCKSDB, lastKey);
if (null == trs || CollectionUtils.isEmpty(trs)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The null == trs check is redundant.

Suggested change
if (null == trs || CollectionUtils.isEmpty(trs)) {
if (CollectionUtils.isEmpty(trs)) {

}

public void shutdown() {
if (this.state != RUNNING || this.state == SHUTDOWN) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, @zk-drizzle . Is the check this.state == SHUTDOWN redundant? When state equals SHUTDOWN, the condition this.state != RUNNING is already satisfied.

Copy link
Contributor

@majialoong majialoong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @zk-drizzle PR, this is an exciting feature. I've left some comments.

}

public void shutdown() {
if (this.state != RUNNING || this.state == SHUTDOWN) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

}

public long getCommitOffsetInRocksDB() {
if (null == messageRocksDBStorage || !storeConfig.isTransRocksDBEnable()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be isTimerRocksDBEnable?


private void checkTransRecordsStatus(List<TransRocksDBRecord> trs) {
if (CollectionUtils.isEmpty(trs)) {
log.error("TransactionalMessageRocksDBService checkTransRecordsStatus, trs is empty");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checkTransRecordsStatus method is currently only called within the checkTransStatus method, and since the checkTransStatus method already checks for empty values, this method checks again for redundancy. Furthermore, the log level here is ERROR, while checkTransStatus uses INFO, creating inconsistencies.

this.storeConfig.setTimerStopEnqueue(true);
if (this.state == RUNNING && !this.storeConfig.isTimerRocksDBStopScan()) {
log.info("restart TimerMessageRocksDBStore has been running");
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, if the enqueue operation was stopped using setTimerStopEnqueue(true) above, will there be any issues if we simply return here?

long commitOffsetRocksDB = messageRocksDBStorage.getCheckpointForTimer(TIMER_COLUMN_FAMILY, MessageRocksDBStorage.SYS_TOPIC_SCAN_OFFSET_CHECK_POINT);
long maxCommitOffset = Math.max(commitOffsetFile, commitOffsetRocksDB);
this.readOffset.set(maxCommitOffset);
log.info("restart TimerMessageRocksDBStore has benn recover running, commitOffsetFile: {}, commitOffsetRocksDB: {}, readOffset: {}", commitOffsetFile, commitOffsetRocksDB, readOffset.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe is has been recovered to running

this.readOffset.set(maxCommitOffset);
log.info("restart TimerMessageRocksDBStore has benn recover running, commitOffsetFile: {}, commitOffsetRocksDB: {}, readOffset: {}", commitOffsetFile, commitOffsetRocksDB, readOffset.get());
} else {
this.load();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If load fails here, should we continue with the subsequent start?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] RIP‐82 Implement Timer message, transaction message, and index based on RocksDB

6 participants