-
Notifications
You must be signed in to change notification settings - Fork 12k
[RIP-82] Implement Timer message, transaction message, and index based on RocksDB #9787
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
Change-Id: I7b5782e8f1a9548851069c8c239c75868ae64400
Change-Id: I22788d112b7f214310719e2d3ebad839ef818e67
Change-Id: I5eade6c31803b8cf05c1a4bb83c226d57f4980c7
Change-Id: I65a43a612f09280cb61d2a23b3324d198c6a71de
Change-Id: Idf8080c7b17c25e14fe34ef6bad1e1150dab58d6
Change-Id: I21653d6bcad905daa8f10d12298d1d27535af9d9
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #9787 +/- ##
=============================================
- Coverage 48.40% 47.40% -1.00%
+ Complexity 12253 12252 -1
=============================================
Files 1314 1324 +10
Lines 93668 96023 +2355
Branches 12011 12371 +360
=============================================
+ Hits 45340 45520 +180
- Misses 42737 44853 +2116
- Partials 5591 5650 +59 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
|
||
| <properties> | ||
| <revision>5.3.4-SNAPSHOT</revision> | ||
| <revision>5.3.6-rocksdb-SNAPSHOT</revision> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The version number should not be changed here.
| db.flush(flushOptions, timerCFHandle); | ||
| log.info("MessageRocksDBStorage flush timer wal success"); | ||
| } catch (Exception e) { | ||
| logError.error("MessageRocksDBStorage flush timer wal failed, error: {}", e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Directly printing the exception is preferable, as printing e.getMessage() may lose important stack trace information, hindering troubleshooting.
| } else { | ||
| response.setCode(ResponseCode.SYSTEM_ERROR); | ||
| response.setRemark("switch timer engine error"); | ||
| LOGGER.info("switchTimerEngine error"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello, @zk-drizzle . I'm just curious why all the logs here are INFO level, regardless of whether the restart succeeds or fails.
Shouldn't WARN or ERROR level logs be used to provide some warning information to the user when the restart fails?
| while (true) { | ||
| try { | ||
| List<TransRocksDBRecord> trs = messageRocksDBStorage.scanRecordsForTrans(TRANS_COLUMN_FAMILY, MAX_BATCH_SIZE_FROM_ROCKSDB, lastKey); | ||
| if (null == trs || CollectionUtils.isEmpty(trs)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The null == trs check is redundant.
| if (null == trs || CollectionUtils.isEmpty(trs)) { | |
| if (CollectionUtils.isEmpty(trs)) { |
| } | ||
|
|
||
| public void shutdown() { | ||
| if (this.state != RUNNING || this.state == SHUTDOWN) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello, @zk-drizzle . Is the check this.state == SHUTDOWN redundant? When state equals SHUTDOWN, the condition this.state != RUNNING is already satisfied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @zk-drizzle PR, this is an exciting feature. I've left some comments.
| } | ||
|
|
||
| public void shutdown() { | ||
| if (this.state != RUNNING || this.state == SHUTDOWN) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
| } | ||
|
|
||
| public long getCommitOffsetInRocksDB() { | ||
| if (null == messageRocksDBStorage || !storeConfig.isTransRocksDBEnable()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be isTimerRocksDBEnable?
|
|
||
| private void checkTransRecordsStatus(List<TransRocksDBRecord> trs) { | ||
| if (CollectionUtils.isEmpty(trs)) { | ||
| log.error("TransactionalMessageRocksDBService checkTransRecordsStatus, trs is empty"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The checkTransRecordsStatus method is currently only called within the checkTransStatus method, and since the checkTransStatus method already checks for empty values, this method checks again for redundancy. Furthermore, the log level here is ERROR, while checkTransStatus uses INFO, creating inconsistencies.
| this.storeConfig.setTimerStopEnqueue(true); | ||
| if (this.state == RUNNING && !this.storeConfig.isTimerRocksDBStopScan()) { | ||
| log.info("restart TimerMessageRocksDBStore has been running"); | ||
| return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious, if the enqueue operation was stopped using setTimerStopEnqueue(true) above, will there be any issues if we simply return here?
| long commitOffsetRocksDB = messageRocksDBStorage.getCheckpointForTimer(TIMER_COLUMN_FAMILY, MessageRocksDBStorage.SYS_TOPIC_SCAN_OFFSET_CHECK_POINT); | ||
| long maxCommitOffset = Math.max(commitOffsetFile, commitOffsetRocksDB); | ||
| this.readOffset.set(maxCommitOffset); | ||
| log.info("restart TimerMessageRocksDBStore has benn recover running, commitOffsetFile: {}, commitOffsetRocksDB: {}, readOffset: {}", commitOffsetFile, commitOffsetRocksDB, readOffset.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe is has been recovered to running
| this.readOffset.set(maxCommitOffset); | ||
| log.info("restart TimerMessageRocksDBStore has benn recover running, commitOffsetFile: {}, commitOffsetRocksDB: {}, readOffset: {}", commitOffsetFile, commitOffsetRocksDB, readOffset.get()); | ||
| } else { | ||
| this.load(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If load fails here, should we continue with the subsequent start?
RIP-82 Implement Timer message, transaction message, and index based on RocksDB
Fixes #9786
Brief Description
Implement Timer message, transaction message, and index based on RocksDB
How Did You Test This Change?