-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink - Fix incorrect / old row being written into delta files when using upsert mode #4364
Flink - Fix incorrect / old row being written into delta files when using upsert mode #4364
Conversation
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
Outdated
Show resolved
Hide resolved
For more information about the reproduction, see :#4316 (comment) |
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
Show resolved
Hide resolved
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
Show resolved
Hide resolved
4c947eb
to
4ad7a6e
Compare
1b82e76
to
65b493b
Compare
} else { | ||
// TODO provide the ability to customize the equality-delete row schema. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment seems worth removing, as not having the right equality-delete row schema can cause correctness issues.
It seems that one of the new tests might be flakey.
|
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
Outdated
Show resolved
Hide resolved
f538167
to
089e7e0
Compare
data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
Outdated
Show resolved
Hide resolved
… several times in same statement batch
…on in BaseTaskWriter#write
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed another test that also fails.
I do not think that this new test, testMultipleUpsertsToOneRowWithNonPKFieldChanging
, has much of anything to do with the changes made in this PR.
However, given that we've been working on this for a while and also that I've been sick for the past few days, I wanted to get this in front of others to see. I do think it should be handled separately though.
|
||
TestHelpers.assertRows( | ||
sql("SELECT * FROM %s", tableName), | ||
Lists.newArrayList(Row.of("aaa", dt, 6, false), Row.of("bbb", dt, 3, false))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The record for PK ("aaa", '2022-03-01)
should be whats mentioned here, but instead it comes out with the boolean field as true
instead.
public void testMultipleUpsertsToOneRowWithNonPKFieldChanging() { | ||
String tableName = "test_multiple_upserts_on_one_row"; | ||
LocalDate dt = LocalDate.of(2022, 3, 1); | ||
try { | ||
sql("CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT NOT NULL, bool BOOLEAN NOT NULL, " + | ||
"PRIMARY KEY(data,dt) NOT ENFORCED) " + | ||
"PARTITIONED BY (data) WITH %s", | ||
tableName, toWithClause(tableUpsertProps)); | ||
|
||
sql("INSERT INTO %s VALUES " + | ||
"('aaa', TO_DATE('2022-03-01'), 1, false)," + | ||
"('aaa', TO_DATE('2022-03-01'), 2, false)," + | ||
"('bbb', TO_DATE('2022-03-01'), 3, false)", | ||
tableName); | ||
|
||
TestHelpers.assertRows( | ||
sql("SELECT * FROM %s", tableName), | ||
Lists.newArrayList(Row.of("aaa", dt, 2, false), Row.of("bbb", dt, 3, false))); | ||
|
||
// Process several duplicates of the same record with PK ('aaa', TO_DATE('2022-03-01')). | ||
// Depending on the number of times that records are inserted for that row, one of the | ||
// rows 2 back will be used instead. | ||
// | ||
// Indicating possibly an issue with insertedRowMap checking and/or the positional delete | ||
// writer. | ||
sql("INSERT INTO %s VALUES " + | ||
"('aaa', TO_DATE('2022-03-01'), 6, false)," + | ||
"('aaa', TO_DATE('2022-03-01'), 6, true)," + | ||
"('aaa', TO_DATE('2022-03-01'), 6, false)," + | ||
"('aaa', TO_DATE('2022-03-01'), 6, false)", | ||
tableName); | ||
|
||
TestHelpers.assertRows( | ||
sql("SELECT * FROM %s", tableName), | ||
Lists.newArrayList(Row.of("aaa", dt, 6, false), Row.of("bbb", dt, 3, false))); | ||
} finally { | ||
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the other test that I have found that fails.
It fails on master as well, and I don't think it's caused by this change, but given that we've spent a while on this PR and this data correctness bug seems somewhat urgent and this issue is in the same area, I wanted to get it out there for others to see.
This can ideally be handled in a follow up PR, as I believe it's related to the positional delete writer or the insertedRowMap
. See the TODO in BaseTaskWriter#internalPosDelete
, which seems to allude to this issue:
TODO attach the previous row if has a positional-delete row schema in appender factory
I would prefer to open a separate PR for this issue, but given that it's an issue involving the same class being fixed here, I wanted to get this out there for others to look at ASAP. cc @openinx
data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
Outdated
Show resolved
Hide resolved
sql("INSERT INTO %s VALUES " + | ||
"('aaa', TO_DATE('2022-03-01'), 6, false)," + | ||
"('aaa', TO_DATE('2022-03-01'), 6, true)," + | ||
"('aaa', TO_DATE('2022-03-01'), 6, false)," + | ||
"('aaa', TO_DATE('2022-03-01'), 6, false)", | ||
tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More update: When changing this to the following insertion statement, the tests pass with the correct results:
sql("INSERT INTO %s VALUES " +
"('aaa', TO_DATE('2022-03-01'), 1, false)," +
"('aaa', TO_DATE('2022-03-01'), 2, true)," +
"('aaa', TO_DATE('2022-03-01'), 3, false)," +
"('aaa', TO_DATE('2022-03-01'), 6, false)",
tableName);
So I'm not sure what this issue is, but I'm now very sure it's not related to this change at all.
Given that the new test case And I would suggest that we handle it in another PR. As the issue exists in master, and is not related to the changes this PR fixes. I've removed the test and will open another issue and a draft PR with the new test case in it so that anybody who wants to can look into it further. 🙂 |
Great, with the unnecessary test projection removed, I think this is ready. It also looks like other discussion has been resolved, so I'll merge this. For the new test failure, I talked with Kyle offline and we'll resolve it in a separate PR since it appears to be unrelated to this problem---it is position deletes that are the issue. |
Co-authored-by: liliwei <hililiwei@gmail.com>
Co-authored-by: liliwei <hililiwei@gmail.com>
Co-authored-by: liliwei <hililiwei@gmail.com>
Co-authored-by: liliwei <hililiwei@gmail.com>
Co-authored-by: liliwei <hililiwei@gmail.com> Reference:(cherry picked from commit 340a0c5)
- Removed dependency on debezium cdc fields, only depends on existing primary key - Changed to `deleteKey()`. As in apache/iceberg#4364 - Added partitionField Ids to equality delete schema, as mentioned at https://iceberg.apache.org/docs/latest/flink/#upsert
Presently, we are writing incorrect data into equality delete file manifests when using upsert mode with Flink.
The rows come in as
INSERT
row kind, but presently we are callingwriter.delete
on the entire row, which is the new row.This leads to incorrect data being written into the equality delete manifests.
Instead, we need to be writing data using only the equality field id keys in that case, so that the delete will apply to the old row and not the new entire row.
Co-authored-by: Li Lliwei hililiwei@gmail.com
Opening this PR, as it addresses some of the feedback from #4316, as well as only applies to Flink 1.14. As @hililiwei and I worked on this together all week, I've marked us as co-authors. We can close that PR as this one is only oriented towards Flink 1.14 (and has the correct co-authorship in it so it can be merged at anyone's discretion).
As some code in
core
has changed, some changes need to be applied in earlier versions of Flink. To keep the PR to the minimum required, I'm leaving that out where possible and we can add it in after.The original reporting issue is thanks to @xloya in #4311