-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Fix CDC validation errors #3258
Conversation
08505a6
to
9173be2
Compare
@openinx, it looks like there is a test for a case that I don't think can happen. Should we fix the test or should we look into calling Here's the alternative that uses the last committed snapshot: Long lastCommittedSnapshotId = null;
for (Snapshot snap : SnapshotUtil.currentAncestors(table)) {
String snapshotCheckpointId = snap.summary().get(MAX_COMMITTED_CHECKPOINT_ID);
if (snapshotCheckpointId != null && Long.parseLong(snapshotCheckpointId) == maxCommittedCheckpointId) {
lastCommittedSnapshotId = snap.snapshotId();
}
}
RowDelta rowDelta = table.newRowDelta()
.validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles()))
.validateDeletedFiles();
if (lastCommittedSnapshotId != null) {
rowDelta.validateFromSnapshot(lastCommittedSnapshotId);
} |
@@ -282,9 +281,14 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri | |||
// txn2, the equality-delete files of txn2 are required to be applied to data files from txn1. Committing the | |||
// merged one will lead to the incorrect delete semantic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the new comment that retries may push deletes further in the future, should we possibly reword this comment here?
It states that for the sequential transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied to data files from txn1
.
If equality-deletes could get pushed out further due to retries and other scenarios, would it be more appropriate for the above comment to read something like equality-delete files of transaction N are required to be applied to data files from transactions strictly prior to N
? There's probably a better way to phrase it, but the above comment seems to still imply that equality delete files need to be applied strictly in the next transaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's okay either way. This comment is saying that the inserts from one checkpoint are affected by delete files in the next checkpoint, so we have to commit them separately rather than as a single transaction. The txn
numbers were already generic, since the map may have more than 2 entries.
If user run
I have another PR(#3103) to check snapshot history from last committed snapshot id, but this PR is trying to speed up the @rdblue I think the proper way to resolve #2482 is
|
@Reo-LEI I didn't completely get your point. The reasoning/logic that Ryan described in the comment seems correct to me. Hence, I'd like to double check with you in case that we missed sth here. |
@Reo-LEI, the approach in #3103 is similar to what I suggested in this comment: #3258 (comment). That approach works and is safe, but it still runs checks that I think are unnecessary for the CDC use case. I think it would be better not to run the validation at all if I'm right that it is unnecessary.
Deleting orphan files does not affect correctness because the files are not referenced. Removing referenced data files (physically or logically) through any process other than Both of those cases aren't relevant to the problem here. The problem in #2482 is that the validation is incorrectly configured and very likely not required at all.
This is partially correct. The validation should stop trying to use snapshots that have been expired. But doing that by ignoring expired snapshots is not correct. Instead, the validation should be configured to not require the old snapshots. Another way of thinking about this is that the validation is requesting all version of the table back to the beginning of table history. You're right that it doesn't need all of those versions. But the right way to fix this is to stop requesting them rather than breaking the check by ignoring when requested versions aren't available. Does that make sense? |
@stevenzwu, can you confirm that we can remove the tests that are failing? I don't think that there is a case where Flink CDC will produce a positional delete file that references files other that those that are being added. We could also add a check to ensure that there are no such files by validating that the set of referenced data files from the positional deletes is a subset of the data files to add. |
@rdblue with the new logic of removing validation in the PR, I agree that we don't need the failed unit test of
That sounds like a good idea. |
@rdblue @stevenzwu I'm just thinking that the original intention of this validation is trying to prevent to commit the invalid manifests which contain not exists files location to iceberg table. I think the purpose of this validation needs to be confirmed with @openinx. |
Thanks all for pinging me ! Let's provide more original background for this validation: it was added for validating the existence of data files that were referenced from the pos-delete files introduced in the current TXN which is planning to commit but have not committed yet. After I read the implementation of the About the broken unit test testValidateDataFileExist by this PR, I think it is addressing the case that will never happen in the real CDC user case, because its steps are following:
The key point is: in the real flink CDC user case, we won't produce any pos-deletes in a new txn to delete a row coming from an older txn, we just produce pos-deletes to delete a row that comes from the same txn as the pos-delete ( as @rdblue said, Thanks all for the patient work ! |
.validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles())) | ||
.validateDeletedFiles(); | ||
|
||
// row delta validations are not needed for streaming changes that write equality deletes. Equality deletes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: row delta -> Row delta.
Thank you for the review, @openinx! I'll remove the unit test and fix up this PR so tests are passing. |
9173be2
to
ccfb795
Compare
Rebased and removed the test. I'll merge this when CI is passing. I also marked this for inclusion in a patch release, 0.12.1. |
The broken unit test is the flaky one, let's retry the CI. org.apache.iceberg.flink.TestFlinkTableSink > testHashDistributeMode[catalogName=testhadoop_basenamespace, baseNamespace=l0.l1, format=ORC, isStreaming=true] FAILED
java.lang.AssertionError: There should be 1 data file in partition 'aaa' expected:<1> but was:<2>
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.failNotEquals(Assert.java:835)
at org.junit.Assert.assertEquals(Assert.java:647)
at org.apache.iceberg.flink.TestFlinkTableSink.testHashDistributeMode(TestFlinkTableSink.java:281) |
@openinx looks like @szehon-ho 's attempt of #2989 didn't fix the flakiness. so it is probably a different root cause. |
@stevenzwu , Okay, let me take a look for the root cause. Anyway it does not block this PR. Let me get this merged first. |
Thanks for merging, @openinx! |
Removed test left two unused imports.
Removed test left two unused imports.
This fixes CDC validation problems in Flink, #2482. The problem was that validation did not configure a starting snapshot ID by calling
validateFromSnapshot
. As a result, the entire table history was used to check that there were no deletes for the data files referenced by position deletes in the commit. One fix would be to correctly set the validation snapshot, rather than letting it default to the start of table history. But this PR fixes the problem by removing the validation entirely because none is required.The only position deletes created by the CDC writer are against the data files that are being added by the commit. Equality deletes are used to delete data in existing files and those require no validation because they do not reference specific files and apply to all older data files (retries do not affect correctness). Because position deletes will only reference data files being added to the table, there is no possibility that those files are concurrently deleted.
Closes #2482.