-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core:Remove unnecessary row filtering in deleted manifest file #4316
Conversation
34d2c06
to
3fe16db
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for the patch @hililiwei!
And thank you for expanding on the issue in the other PR from @xloya. Can you please link that PR and possibly restate some of the facts from over there so reviewers have more of the facts in one place? I’ll DM you to not add too much unneeded stuff to the PR.
Given the situation doesn’t occur with Avro files, I’d like to get input from others on the best way to resolve this.
This seems like the solution to go with, but I’d like to get input from others on whether there is possibly a better way to compose this with existing Predicate and related interfaces.
But overall this is really great work. And thank you to @xloya for opening the original PR to bring attention to this issue.
cc @szehon-ho @stevenzwu (when you get a chance)
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/expressions/ExpressionRemove.java
Outdated
Show resolved
Hide resolved
We seem to have the same issues with @xloya. Here are some comments from #4311
|
70d46f2
to
4889692
Compare
Can you help me understand what's happening? Are you saying that the lower/upper bounds in the delete file for other columns are not accurate for the deleted rows? That's what it sounds like when you say the bounds are "new" -- are these values for the upserted row columns rather than the deleted row columns? |
I'm going to try to explain what happened, which may be a bit long, 😄 prerequisite:
id INT NOT NULL,
province STRING NOT NULL,
dt DATE,
PRIMARY KEY(id,province) NOT ENFORCED
PARTITIONED BY (province)
WITH
('write.format.default'='PARQUET',
'write.upsert.enabled'='true',
'format-version'='2')
INSERT INTO test_upsert_query
VALUES
(1, 'a', TO_DATE('2022-03-01')),
(2, 'b', TO_DATE('2022-03-01')),
(1, 'b', TO_DATE('2022-03-01')) Two manifest files are generated:
m1.avo is a delete manifest file, view it:
Notice Execute upsert: INSERT INTO test_upsert_query
VALUES
(4, 'a', TO_DATE('2022-03-02')),
(5, 'b', TO_DATE('2022-03-02')),
(1, 'b', TO_DATE('2022-03-02')) The dt of (1,'b') is updated to '2022-03-02' (:19053) . Check again. The following two manifest files are displayed::
This time, we're still just looking at c3fd1626-d26f-4067-b4b0-a245d59a0615-m1.avro:
Now, the value of dt (key=3) in If we query data at this time: SELECT * FROM test_upsert_query WHERE dt < '2022-03-02' During the query, the manifest file is filtered based on the values of metric So in the result, it will contain In this PR, I tried to trim the filter predicate, and only the field in If the process and result of our analysis are wrong, please do not hesitate to tell me. |
Thanks, @hililiwei! Great repro case. I just went through this with @kbendick and we have a slightly different fix to prevent this with future tables. He's also going to open a PR with a fix for existing tables that we'll ping you to review. |
It's the least I could do. Looking forward to your PR. Please feel free to let me know if there's anything I can do to help. 😄 |
…ert mode by updating equalityDeleteSchema Co-authored-by: hililiwei <hililiwei@gmail.com>
@kbendick and I are trying to solve this, ref: kbendick#71 |
4889692
to
3abe264
Compare
3abe264
to
23836a0
Compare
Hi, @kbendick, based on our previous discussion, I've raised a preliminary solution. It deletes data based on the key. Please take a look at it. Thx. |
@@ -74,7 +82,8 @@ public void write(RowData row) throws IOException { | |||
case INSERT: | |||
case UPDATE_AFTER: | |||
if (upsert) { | |||
writer.delete(row); | |||
RowData wrap = RowDataProjection.create(schema, deleteSchema).wrap(row); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't create a new projection every time. Instead it should create one and reuse it by calling wrap
every time.
writer.deleteKey(wrap); | ||
} else { | ||
writer.delete(row); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this need to change? I think you just want to fix the upsert
case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addtionally, in the upsert
case, data doesn't come through as UPDATE_BEFORE
. Though this might be needed to keep CDC data in check.
We've been workin on the PR in my fork but I'll run some tests.
return fieldGetter[index].getFieldOrNull(struct); | ||
if (struct.isNullAt(index)) { | ||
return null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did this need to change? NPE in a test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an NPE in some test cases, yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, I'm going to investigate a bit furhter as I do think it might be indicative of a bug.
I think if we use the correct deletion schema in all cases, the NPEs will go away. I am testing now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or actually, I don't think this change is needed. If there's no fieldGetter
for a given index, that's likely indicative of a bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this didn't need to be changed if we use the full schema as the deletion schema outside of upsert
case like here https://github.com/apache/iceberg/pull/4364/files#diff-bbdfbcbd83d2e6f53d402e804dcd0a6fd28ab39d1cc3f74a88641ae8763fde3bR75-R87
@@ -57,6 +60,7 @@ | |||
this.schema = schema; | |||
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); | |||
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); | |||
this.wrapperDelete = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about keyWrapper
instead?
@@ -66,6 +70,10 @@ RowDataWrapper wrapper() { | |||
return wrapper; | |||
} | |||
|
|||
RowDataWrapper wrapperDelete() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is there an accessor for this?
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec, | ||
ArrayUtil.toIntArray(equalityFieldIds), schema, null); | ||
ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the correct schema for upsert
but should not be used for delete
when the row passed to the delete file is the deleted row.
@@ -66,6 +70,10 @@ RowDataWrapper wrapper() { | |||
return wrapper; | |||
} | |||
|
|||
RowDataWrapper wrapperDelete() { | |||
return wrapperDelete; | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you revert the changes in all modules other than 1.14? That makes reviewing and updating for review comments much easier. Once this goes in we can backport to 1.12 and 1.13.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah agreed. I left a similar comment on another file because the changes were a bit much.
It keeps the discussion of a specific change all in one place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, I believe that @hililiwei made the changes as existing tests might not pass in earlier versions of Flink.
But for something important, we should still keep the changes in one PR while reviewing. Otherwise it's difficult for others to review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks good, but we don't want to change how Flink handles CDC deletes or updates. This should be a narrower change that only applies to upserts.
if (struct.isNullAt(index)) { | ||
return null; | ||
} | ||
return this.fieldGetter[index].getFieldOrNull(struct); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the work to just Flink 1.14 for now, so comments don't get duplicated on many things.
After we update the data via
upsert
, if we use the field not in theidentifierFieldIds
to query the data, we may get inaccurate result.Because the metrics values ( such as
upper_bounds
\lower_bounds
)of non-identifierFieldIds in the manifest file are new, and these new data may be hit by therow filter
, this will cause the update to the old data to not take effect, even though it has beenupsert
, old data is still fetched and put into the result set.In this PR, try to come up with a solution. When filtering the delete manifest file, if the
identifierFieldIds
is not empty, only the fields in theidentifierFieldIds
are retained in therow filter
, always keepTrue
for filter predicate of non-identifierFieldIds fields.