Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink - Fix incorrect / old row being written into delta files when using upsert mode #4364

Merged
merged 8 commits into from
Mar 28, 2022

Conversation

kbendick
Copy link
Contributor

@kbendick kbendick commented Mar 18, 2022

Presently, we are writing incorrect data into equality delete file manifests when using upsert mode with Flink.

The rows come in as INSERT row kind, but presently we are calling writer.delete on the entire row, which is the new row.

This leads to incorrect data being written into the equality delete manifests.

Instead, we need to be writing data using only the equality field id keys in that case, so that the delete will apply to the old row and not the new entire row.

Co-authored-by: Li Lliwei hililiwei@gmail.com

Opening this PR, as it addresses some of the feedback from #4316, as well as only applies to Flink 1.14. As @hililiwei and I worked on this together all week, I've marked us as co-authors. We can close that PR as this one is only oriented towards Flink 1.14 (and has the correct co-authorship in it so it can be merged at anyone's discretion).

As some code in core has changed, some changes need to be applied in earlier versions of Flink. To keep the PR to the minimum required, I'm leaving that out where possible and we can add it in after.

The original reporting issue is thanks to @xloya in #4311

@kbendick kbendick changed the title Flink - Fix incorrect row being written for delta files when using upsert mode Flink - Fix incorrect / old row being written into delta files when using upsert mode Mar 18, 2022
@hililiwei
Copy link
Contributor

For more information about the reproduction, see :#4316 (comment)

@kbendick kbendick marked this pull request as draft March 20, 2022 22:29
@kbendick kbendick marked this pull request as ready for review March 20, 2022 22:29
@kbendick kbendick changed the title Flink - Fix incorrect / old row being written into delta files when using upsert mode Flink - Fix incorrect / old row being written into delta files when using upsert mode - DO NOT MERGE Mar 20, 2022
@kbendick kbendick force-pushed the fix-flink-upsert-delta-file-writer branch 2 times, most recently from 4c947eb to 4ad7a6e Compare March 21, 2022 01:56
@kbendick kbendick changed the title Flink - Fix incorrect / old row being written into delta files when using upsert mode - DO NOT MERGE Flink - Fix incorrect / old row being written into delta files when using upsert mode Mar 21, 2022
@kbendick kbendick force-pushed the fix-flink-upsert-delta-file-writer branch from 1b82e76 to 65b493b Compare March 21, 2022 02:08
} else {
// TODO provide the ability to customize the equality-delete row schema.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems worth removing, as not having the right equality-delete row schema can cause correctness issues.

@kbendick kbendick closed this Mar 21, 2022
@kbendick kbendick reopened this Mar 21, 2022
@kbendick
Copy link
Contributor Author

It seems that one of the new tests might be flakey.

org.apache.iceberg.flink.TestFlinkUpsert > testCompoundPrimaryKey[catalogName=testhadoop_basenamespace, baseNamespace=l0.l1, format=ORC, isStreaming=true] FAILED
[215](https://github.com/apache/iceberg/runs/5621154621?check_suite_focus=true#step:6:215)
    java.lang.AssertionError: result should have the correct rows expected:<[+I[2, aaa, 2022-03-01], +I[3, bbb, 2022-03-01]]> but was:<[+I[1, aaa, 2022-03-01], +I[3, bbb, 2022-03-01]]>
[216](https://github.com/apache/iceberg/runs/5621154621?check_suite_focus=true#step:6:216)
        at org.junit.Assert.fail(Assert.java:89)
[217](https://github.com/apache/iceberg/runs/5621154621?check_suite_focus=true#step:6:217)
        at org.junit.Assert.failNotEquals(Assert.java:835)
[218](https://github.com/apache/iceberg/runs/5621154621?check_suite_focus=true#step:6:218)
        at org.junit.Assert.assertEquals(Assert.java:120)
[219](https://github.com/apache/iceberg/runs/5621154621?check_suite_focus=true#step:6:219)
        at org.apache.iceberg.flink.TestFlinkUpsert.testCompoundPrimaryKey(TestFlinkUpsert.java:218)

@kbendick kbendick force-pushed the fix-flink-upsert-delta-file-writer branch from f538167 to 089e7e0 Compare March 24, 2022 18:15
Copy link
Contributor Author

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed another test that also fails.

I do not think that this new test, testMultipleUpsertsToOneRowWithNonPKFieldChanging, has much of anything to do with the changes made in this PR.

However, given that we've been working on this for a while and also that I've been sick for the past few days, I wanted to get this in front of others to see. I do think it should be handled separately though.


TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Row.of("aaa", dt, 6, false), Row.of("bbb", dt, 3, false)));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The record for PK ("aaa", '2022-03-01) should be whats mentioned here, but instead it comes out with the boolean field as true instead.

Comment on lines 289 to 326
public void testMultipleUpsertsToOneRowWithNonPKFieldChanging() {
String tableName = "test_multiple_upserts_on_one_row";
LocalDate dt = LocalDate.of(2022, 3, 1);
try {
sql("CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT NOT NULL, bool BOOLEAN NOT NULL, " +
"PRIMARY KEY(data,dt) NOT ENFORCED) " +
"PARTITIONED BY (data) WITH %s",
tableName, toWithClause(tableUpsertProps));

sql("INSERT INTO %s VALUES " +
"('aaa', TO_DATE('2022-03-01'), 1, false)," +
"('aaa', TO_DATE('2022-03-01'), 2, false)," +
"('bbb', TO_DATE('2022-03-01'), 3, false)",
tableName);

TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Row.of("aaa", dt, 2, false), Row.of("bbb", dt, 3, false)));

// Process several duplicates of the same record with PK ('aaa', TO_DATE('2022-03-01')).
// Depending on the number of times that records are inserted for that row, one of the
// rows 2 back will be used instead.
//
// Indicating possibly an issue with insertedRowMap checking and/or the positional delete
// writer.
sql("INSERT INTO %s VALUES " +
"('aaa', TO_DATE('2022-03-01'), 6, false)," +
"('aaa', TO_DATE('2022-03-01'), 6, true)," +
"('aaa', TO_DATE('2022-03-01'), 6, false)," +
"('aaa', TO_DATE('2022-03-01'), 6, false)",
tableName);

TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Row.of("aaa", dt, 6, false), Row.of("bbb", dt, 3, false)));
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the other test that I have found that fails.

It fails on master as well, and I don't think it's caused by this change, but given that we've spent a while on this PR and this data correctness bug seems somewhat urgent and this issue is in the same area, I wanted to get it out there for others to see.

This can ideally be handled in a follow up PR, as I believe it's related to the positional delete writer or the insertedRowMap. See the TODO in BaseTaskWriter#internalPosDelete, which seems to allude to this issue:

TODO attach the previous row if has a positional-delete row schema in appender factory

I would prefer to open a separate PR for this issue, but given that it's an issue involving the same class being fixed here, I wanted to get this out there for others to look at ASAP. cc @openinx

Comment on lines 314 to 319
sql("INSERT INTO %s VALUES " +
"('aaa', TO_DATE('2022-03-01'), 6, false)," +
"('aaa', TO_DATE('2022-03-01'), 6, true)," +
"('aaa', TO_DATE('2022-03-01'), 6, false)," +
"('aaa', TO_DATE('2022-03-01'), 6, false)",
tableName);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More update: When changing this to the following insertion statement, the tests pass with the correct results:

      sql("INSERT INTO %s VALUES " +
          "('aaa', TO_DATE('2022-03-01'), 1, false)," +
          "('aaa', TO_DATE('2022-03-01'), 2, true)," +
          "('aaa', TO_DATE('2022-03-01'), 3, false)," +
          "('aaa', TO_DATE('2022-03-01'), 6, false)",
          tableName);

So I'm not sure what this issue is, but I'm now very sure it's not related to this change at all.

@kbendick
Copy link
Contributor Author

Given that the new test case testMultipleUpsertsToOneRowWithNonPKFieldChanging can be fixed by using new values for each row as shown here, #4364 (comment), I'm now sure that the issue is unrelated to what this patch is fixing.

And I would suggest that we handle it in another PR. As the issue exists in master, and is not related to the changes this PR fixes.

I've removed the test and will open another issue and a draft PR with the new test case in it so that anybody who wants to can look into it further. 🙂

@rdblue rdblue added this to the Iceberg 0.13.2 Release milestone Mar 28, 2022
@rdblue
Copy link
Contributor

rdblue commented Mar 28, 2022

Great, with the unnecessary test projection removed, I think this is ready. It also looks like other discussion has been resolved, so I'll merge this.

For the new test failure, I talked with Kyle offline and we'll resolve it in a separate PR since it appears to be unrelated to this problem---it is position deletes that are the issue.

@rdblue rdblue merged commit 340a0c5 into apache:master Mar 28, 2022
@kbendick kbendick deleted the fix-flink-upsert-delta-file-writer branch April 13, 2022 16:34
nastra pushed a commit to nastra/iceberg that referenced this pull request May 16, 2022
Co-authored-by: liliwei <hililiwei@gmail.com>
nastra pushed a commit to nastra/iceberg that referenced this pull request May 17, 2022
Co-authored-by: liliwei <hililiwei@gmail.com>
nastra pushed a commit to nastra/iceberg that referenced this pull request May 18, 2022
Co-authored-by: liliwei <hililiwei@gmail.com>
nastra pushed a commit to nastra/iceberg that referenced this pull request May 18, 2022
Co-authored-by: liliwei <hililiwei@gmail.com>
hililiwei pushed a commit to hililiwei/iceberg that referenced this pull request Jun 29, 2022
Co-authored-by: liliwei <hililiwei@gmail.com>
Reference:(cherry picked from commit 340a0c5)
wobu added a commit to TIKI-Institut/kafka-connect-iceberg-sink that referenced this pull request Nov 24, 2022
- Removed dependency on debezium cdc fields, only depends on existing primary key
- Changed to `deleteKey()`. As in apache/iceberg#4364
- Added partitionField Ids to equality delete schema, as mentioned at https://iceberg.apache.org/docs/latest/flink/#upsert
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants