Skip to content

Add undo recovery support for aggregation tables#2545

Merged
wuchong merged 5 commits intoapache:mainfrom
platinumhamburg:undoinfra
Feb 9, 2026
Merged

Add undo recovery support for aggregation tables#2545
wuchong merged 5 commits intoapache:mainfrom
platinumhamburg:undoinfra

Conversation

@platinumhamburg
Copy link
Contributor

This PR introduces the undo recovery mechanism for Flink sink writer to handle failure recovery scenarios with aggregation merge engine tables.

Key components:

  • ByteArrayWrapper: Utility class for using byte arrays as map keys
  • UndoComputer: Computes undo operations by comparing checkpoint state with current log records, supporting both full row and partial update modes
  • UndoRecoveryExecutor: Executes undo operations using UpsertWriter
  • UndoRecoveryCoordinator: Coordinates the recovery process across buckets, managing log scanning, undo computation, and execution
  • BucketRecoveryContext: Holds per-bucket recovery state

The undo recovery works by:

  1. Scanning log records from checkpoint offset to current end offset
  2. Computing inverse operations for uncommitted records
  3. Executing undo operations to restore table state

For partial update mode, INSERT records require full row deletion (not partial column deletion), which is handled by using a separate delete writer with null target columns.

Purpose

Linked issue: close #2544 2544

Brief change log

Tests

API and Format

Documentation

@platinumhamburg platinumhamburg force-pushed the undoinfra branch 3 times, most recently from edd4525 to a75f04f Compare February 2, 2026 05:02
This PR introduces the undo recovery mechanism for Flink sink writer
to handle failure recovery scenarios with aggregation merge engine tables.

Key components:
- ByteArrayWrapper: Utility class for using byte arrays as map keys
- UndoComputer: Computes undo operations by comparing checkpoint state
  with current log records, supporting both full row and partial update modes
- UndoRecoveryExecutor: Executes undo operations using UpsertWriter
- UndoRecoveryCoordinator: Coordinates the recovery process across buckets,
  managing log scanning, undo computation, and execution
- BucketRecoveryContext: Holds per-bucket recovery state

The undo recovery works by:
1. Scanning log records from checkpoint offset to current end offset
2. Computing inverse operations for uncommitted records
3. Executing undo operations to restore table state

For partial update mode, INSERT records require full row deletion
(not partial column deletion), which is handled by using a separate
delete writer with null target columns.
Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @platinumhamburg , I left some comments.

@wuchong
Copy link
Member

wuchong commented Feb 8, 2026

Btw, it seems the UNDO recovery for aggregate merge engine table relies on the full changelog of the table. However, table.changelog.image=WAL will break this and lead to in-correct results. I think we need to forbid such case, a validation check can be done during table creation and throw exception for tables with agg merge engine and WAL mode. We can do that in another issue. What do you think @platinumhamburg ?

@platinumhamburg
Copy link
Contributor Author

@wuchong Thank you for your detailed review. I’ve addressed all the comments above—please take a look when you have a moment.

@platinumhamburg
Copy link
Contributor Author

platinumhamburg commented Feb 9, 2026

Btw, it seems the UNDO recovery for aggregate merge engine table relies on the full changelog of the table. However, table.changelog.image=WAL will break this and lead to in-correct results. I think we need to forbid such case, a validation check can be done during table creation and throw exception for tables with agg merge engine and WAL mode. We can do that in another issue. What do you think @platinumhamburg ?

Yes, this restriction will be addressed in a separate issue [https://github.com//issues/2608].

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@platinumhamburg , I only have one remaining comment. I will merge this PR first. Please open follow-up pull requests to fix this.

Comment on lines 383 to 394
for (int id : zeroOffsetBucketKeysAfterCheckpoint) {
final int keyId = id;
verifyFutures.add(
lookuper.lookup(row(keyId))
.thenAccept(
r ->
assertThat(r.getSingletonRow())
.as(
"Zero-offset bucket key %d should be deleted",
keyId)
.isNull()));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need to check the zeroOffsetBucketKeysAfterCheckpoint related rows after restoring. Could you add this test back?

@wuchong wuchong merged commit 2df2d6b into apache:main Feb 9, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add undo recovery support for aggregation tables

2 participants