Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Fix CDC validation errors #3258

Merged
merged 3 commits into from
Oct 19, 2021

Conversation

rdblue
Copy link
Contributor

@rdblue rdblue commented Oct 8, 2021

This fixes CDC validation problems in Flink, #2482. The problem was that validation did not configure a starting snapshot ID by calling validateFromSnapshot. As a result, the entire table history was used to check that there were no deletes for the data files referenced by position deletes in the commit. One fix would be to correctly set the validation snapshot, rather than letting it default to the start of table history. But this PR fixes the problem by removing the validation entirely because none is required.

The only position deletes created by the CDC writer are against the data files that are being added by the commit. Equality deletes are used to delete data in existing files and those require no validation because they do not reference specific files and apply to all older data files (retries do not affect correctness). Because position deletes will only reference data files being added to the table, there is no possibility that those files are concurrently deleted.

Closes #2482.

@rdblue rdblue requested a review from openinx October 8, 2021 19:29
@github-actions github-actions bot added the flink label Oct 8, 2021
@rdblue rdblue force-pushed the fix-flink-cdc-validation branch from 08505a6 to 9173be2 Compare October 8, 2021 20:05
@rdblue
Copy link
Contributor Author

rdblue commented Oct 8, 2021

@openinx, it looks like there is a test for a case that I don't think can happen. Should we fix the test or should we look into calling validateFromSnapshot with the last committed snapshot ID?

Here's the alternative that uses the last committed snapshot:

        Long lastCommittedSnapshotId = null;
        for (Snapshot snap : SnapshotUtil.currentAncestors(table)) {
          String snapshotCheckpointId = snap.summary().get(MAX_COMMITTED_CHECKPOINT_ID);
          if (snapshotCheckpointId != null && Long.parseLong(snapshotCheckpointId) == maxCommittedCheckpointId) {
            lastCommittedSnapshotId = snap.snapshotId();
          }
        }

        RowDelta rowDelta = table.newRowDelta()
            .validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles()))
            .validateDeletedFiles();

        if (lastCommittedSnapshotId != null) {
          rowDelta.validateFromSnapshot(lastCommittedSnapshotId);
        }

@@ -282,9 +281,14 @@ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, Stri
// txn2, the equality-delete files of txn2 are required to be applied to data files from txn1. Committing the
// merged one will lead to the incorrect delete semantic.
Copy link
Contributor

@kbendick kbendick Oct 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the new comment that retries may push deletes further in the future, should we possibly reword this comment here?

It states that for the sequential transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied to data files from txn1.

If equality-deletes could get pushed out further due to retries and other scenarios, would it be more appropriate for the above comment to read something like equality-delete files of transaction N are required to be applied to data files from transactions strictly prior to N? There's probably a better way to phrase it, but the above comment seems to still imply that equality delete files need to be applied strictly in the next transaction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay either way. This comment is saying that the inserts from one checkpoint are affected by delete files in the next checkpoint, so we have to commit them separately rather than as a single transaction. The txn numbers were already generic, since the map may have more than 2 entries.

@Reo-LEI
Copy link
Contributor

Reo-LEI commented Oct 11, 2021

Because position deletes will only reference data files being added to the table, there is no possibility that those files are concurrently deleted.

If user run deleteOrphanFiles action or delete referenced data file by manually/automatically program which is implement by user before flink commit, I think this validation can prevent to commit this not exists files.

should we look into calling validateFromSnapshot with the last committed snapshot ID?

I have another PR(#3103) to check snapshot history from last committed snapshot id, but this PR is trying to speed up the IcebergFilesCommitter, but not to resolve #2482. Becasue as I mentioned on https://github.com/apache/iceberg/pull/3103/files#r718347222, we could not guarantee the lastCommittedSnapshot what we stored in snapshot summary or somewhere else will alway exists when we restore the flink job.

@rdblue I think the proper way to resolve #2482 is MergingSnapshotProducer.validationHistory shop to travel the not exists snapshots which are delete by expireSnapshots action, or your comment on https://github.com/apache/iceberg/pull/2603/files#r652149167

while (currentSnapshotId != null && !currentSnapshotId.equals(startingSnapshotId)) {

@stevenzwu
Copy link
Contributor

@Reo-LEI I didn't completely get your point. The reasoning/logic that Ryan described in the comment seems correct to me. Hence, I'd like to double check with you in case that we missed sth here.

@rdblue
Copy link
Contributor Author

rdblue commented Oct 11, 2021

@Reo-LEI, the approach in #3103 is similar to what I suggested in this comment: #3258 (comment). That approach works and is safe, but it still runs checks that I think are unnecessary for the CDC use case. I think it would be better not to run the validation at all if I'm right that it is unnecessary.

If user run deleteOrphanFiles action or delete referenced data file by manually/automatically program which is implement by user before flink commit, I think this validation can prevent to commit this not exists files.

Deleting orphan files does not affect correctness because the files are not referenced.

Removing referenced data files (physically or logically) through any process other than expireSnapshots is not supported. If you make changes to files underneath a table, Iceberg makes no correctness guarantees.

Both of those cases aren't relevant to the problem here. The problem in #2482 is that the validation is incorrectly configured and very likely not required at all.

I think the proper way to resolve #2482 is MergingSnapshotProducer.validationHistory shop to travel the not exists snapshots which are delete by expireSnapshots action

This is partially correct. The validation should stop trying to use snapshots that have been expired. But doing that by ignoring expired snapshots is not correct. Instead, the validation should be configured to not require the old snapshots.

Another way of thinking about this is that the validation is requesting all version of the table back to the beginning of table history. You're right that it doesn't need all of those versions. But the right way to fix this is to stop requesting them rather than breaking the check by ignoring when requested versions aren't available. Does that make sense?

@rdblue
Copy link
Contributor Author

rdblue commented Oct 11, 2021

@stevenzwu, can you confirm that we can remove the tests that are failing? I don't think that there is a case where Flink CDC will produce a positional delete file that references files other that those that are being added.

We could also add a check to ensure that there are no such files by validating that the set of referenced data files from the positional deletes is a subset of the data files to add.

@stevenzwu
Copy link
Contributor

@rdblue with the new logic of removing validation in the PR, I agree that we don't need the failed unit test of testValidateDataFileExist

We could also add a check to ensure that there are no such files by validating that the set of referenced data files from the positional deletes is a subset of the data files to add.

That sounds like a good idea.

@Reo-LEI
Copy link
Contributor

Reo-LEI commented Oct 12, 2021

@rdblue @stevenzwu I'm just thinking that the original intention of this validation is trying to prevent to commit the invalid manifests which contain not exists files location to iceberg table. I think the purpose of this validation needs to be confirmed with @openinx.

@openinx
Copy link
Member

openinx commented Oct 12, 2021

Thanks all for pinging me ! Let's provide more original background for this validation: it was added for validating the existence of data files that were referenced from the pos-delete files introduced in the current TXN which is planning to commit but have not committed yet.

After I read the implementation of the validateDataFilesExist method, I think I misunderstood the meaning of it before, because I thought it could validate the existence of data files that was planing to commit in the current TXN. While the real meaning is: validate the existence of the data files after going through the history committed txn and ensuring that NO other OVERWRITE/REPLACE TXN has deleted the required data files.

About the broken unit test testValidateDataFileExist by this PR, I think it is addressing the case that will never happen in the real CDC user case, because its steps are following:

  • Step.1 : Start a txn.1 to insert a row <1, 'aaa'>, which produces a data-file1;
  • Step.2 : Start a txn.2 to replace the whole iceberg table with a newly produced row <2, 'bbb'>, it will remove the old data file data-file-1 and add a new data file data-file2;
  • Step.3 : Start a txn.3 to commit a row delta txn , which includes a pos-delete to delete the old row <1, 'aaa'>.

The key point is: in the real flink CDC user case, we won't produce any pos-deletes in a new txn to delete a row coming from an older txn, we just produce pos-deletes to delete a row that comes from the same txn as the pos-delete ( as @rdblue said, Because position deletes will only reference data files being added to the table, there is no possibility that those files are concurrently deleted ). In summary, the unit test is trying to address a case that will never happen in the real user case, so I would like to remove this unit test in this PR.

Thanks all for the patient work !

.validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles()))
.validateDeletedFiles();

// row delta validations are not needed for streaming changes that write equality deletes. Equality deletes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: row delta -> Row delta.

@rdblue
Copy link
Contributor Author

rdblue commented Oct 12, 2021

Thank you for the review, @openinx! I'll remove the unit test and fix up this PR so tests are passing.

@rdblue rdblue added this to the Java 0.12.1 Release milestone Oct 18, 2021
@rdblue rdblue force-pushed the fix-flink-cdc-validation branch from 9173be2 to ccfb795 Compare October 19, 2021 00:09
@rdblue
Copy link
Contributor Author

rdblue commented Oct 19, 2021

Rebased and removed the test. I'll merge this when CI is passing. I also marked this for inclusion in a patch release, 0.12.1.

@openinx
Copy link
Member

openinx commented Oct 19, 2021

The broken unit test is the flaky one, let's retry the CI.

org.apache.iceberg.flink.TestFlinkTableSink > testHashDistributeMode[catalogName=testhadoop_basenamespace, baseNamespace=l0.l1, format=ORC, isStreaming=true] FAILED
    java.lang.AssertionError: There should be 1 data file in partition 'aaa' expected:<1> but was:<2>
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:647)
        at org.apache.iceberg.flink.TestFlinkTableSink.testHashDistributeMode(TestFlinkTableSink.java:281)

@openinx openinx closed this Oct 19, 2021
@openinx openinx reopened this Oct 19, 2021
@stevenzwu
Copy link
Contributor

The broken unit test is the flaky one, let's retry the CI.

org.apache.iceberg.flink.TestFlinkTableSink > testHashDistributeMode[catalogName=testhadoop_basenamespace, baseNamespace=l0.l1, format=ORC, isStreaming=true] FAILED
    java.lang.AssertionError: There should be 1 data file in partition 'aaa' expected:<1> but was:<2>
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:647)
        at org.apache.iceberg.flink.TestFlinkTableSink.testHashDistributeMode(TestFlinkTableSink.java:281)

@openinx looks like @szehon-ho 's attempt of #2989 didn't fix the flakiness. so it is probably a different root cause.

@openinx
Copy link
Member

openinx commented Oct 19, 2021

@stevenzwu , Okay, let me take a look for the root cause. Anyway it does not block this PR. Let me get this merged first.

@rdblue
Copy link
Contributor Author

rdblue commented Oct 19, 2021

Thanks for merging, @openinx!

wypoon added a commit to wypoon/iceberg that referenced this pull request Oct 19, 2021
Removed test left two unused imports.
wypoon added a commit to wypoon/iceberg that referenced this pull request Oct 19, 2021
Removed test left two unused imports.
kbendick pushed a commit to kbendick/iceberg that referenced this pull request Oct 27, 2021
kbendick pushed a commit to kbendick/iceberg that referenced this pull request Oct 27, 2021
kbendick pushed a commit to kbendick/iceberg that referenced this pull request Oct 28, 2021
rdblue added a commit that referenced this pull request Oct 29, 2021
izchen pushed a commit to izchen/iceberg that referenced this pull request Dec 7, 2021
Initial-neko pushed a commit to Initial-neko/iceberg that referenced this pull request Dec 15, 2021
Initial-neko pushed a commit to Initial-neko/iceberg that referenced this pull request Dec 17, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Error in writing CDC data with Flink
5 participants