Add undo recovery support for aggregation tables#2545
Conversation
edd4525 to
a75f04f
Compare
This PR introduces the undo recovery mechanism for Flink sink writer to handle failure recovery scenarios with aggregation merge engine tables. Key components: - ByteArrayWrapper: Utility class for using byte arrays as map keys - UndoComputer: Computes undo operations by comparing checkpoint state with current log records, supporting both full row and partial update modes - UndoRecoveryExecutor: Executes undo operations using UpsertWriter - UndoRecoveryCoordinator: Coordinates the recovery process across buckets, managing log scanning, undo computation, and execution - BucketRecoveryContext: Holds per-bucket recovery state The undo recovery works by: 1. Scanning log records from checkpoint offset to current end offset 2. Computing inverse operations for uncommitted records 3. Executing undo operations to restore table state For partial update mode, INSERT records require full row deletion (not partial column deletion), which is handled by using a separate delete writer with null target columns.
a75f04f to
ba699f0
Compare
wuchong
left a comment
There was a problem hiding this comment.
Thanks @platinumhamburg , I left some comments.
...link-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/BucketRecoveryContext.java
Outdated
Show resolved
Hide resolved
...link-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/BucketRecoveryContext.java
Outdated
Show resolved
Hide resolved
...k/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoComputer.java
Outdated
Show resolved
Hide resolved
...k/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoComputer.java
Show resolved
Hide resolved
...k/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoComputer.java
Outdated
Show resolved
Hide resolved
...flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutor.java
Outdated
Show resolved
Hide resolved
...flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutor.java
Outdated
Show resolved
Hide resolved
...flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutor.java
Outdated
Show resolved
Hide resolved
...flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutor.java
Outdated
Show resolved
Hide resolved
...flink-common/src/main/java/org/apache/fluss/flink/sink/writer/undo/UndoRecoveryExecutor.java
Outdated
Show resolved
Hide resolved
|
Btw, it seems the UNDO recovery for aggregate merge engine table relies on the full changelog of the table. However, |
987dcce to
308d9e9
Compare
|
@wuchong Thank you for your detailed review. I’ve addressed all the comments above—please take a look when you have a moment. |
Yes, this restriction will be addressed in a separate issue [https://github.com//issues/2608]. |
wuchong
left a comment
There was a problem hiding this comment.
@platinumhamburg , I only have one remaining comment. I will merge this PR first. Please open follow-up pull requests to fix this.
| for (int id : zeroOffsetBucketKeysAfterCheckpoint) { | ||
| final int keyId = id; | ||
| verifyFutures.add( | ||
| lookuper.lookup(row(keyId)) | ||
| .thenAccept( | ||
| r -> | ||
| assertThat(r.getSingletonRow()) | ||
| .as( | ||
| "Zero-offset bucket key %d should be deleted", | ||
| keyId) | ||
| .isNull())); | ||
| } |
There was a problem hiding this comment.
I think we still need to check the zeroOffsetBucketKeysAfterCheckpoint related rows after restoring. Could you add this test back?
This PR introduces the undo recovery mechanism for Flink sink writer to handle failure recovery scenarios with aggregation merge engine tables.
Key components:
The undo recovery works by:
For partial update mode, INSERT records require full row deletion (not partial column deletion), which is handled by using a separate delete writer with null target columns.
Purpose
Linked issue: close #2544 2544
Brief change log
Tests
API and Format
Documentation