-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink sink writes duplicate data in upsert mode #10431
Comments
@zhongqishang: How is your sink/table created? What are the exact records you are sending to the sink? Your issue seems very similar to: #10076 |
@pvary Thanks for your reply. The follow is create table sql :
This is not similar to #10061 , The background of this problem is accompanied by concurrent checkpoint and savepoint, the next savepoint is aborted. |
@zhongqishang: Do you see anything more in the logs? Exceptions/retries, or something like this? Also, I don't fully understand your statement here:
Could you p lease elaborate a but? |
I have not found any Exceptions/retries around the wrong snapshot time. To be clear, this is duplicate data that only exists from a certain snapshot onwards.
TM snapshot commit log :
The files and data submitted by this error snapshot are:
I mean eq delete file |
Here is how the different deletes work:
Based on the data you have shown, I would guess that there were 2 updates for the given record in the snapshot.
Based on the commit message you have shown, it tells us, that it has written 3 delete files - I expect that one of the delete files is the positional delete. It should contain the delete record for the given row. Could you please check it this is the case? |
@pvary Yes, the Sorry, I have corrected the wrong file ID provided above. |
@pvary I encountered the same problem on another table, this time it was caused by a checkpoint PRC timeout. JM log
|
Seems like an issue with checkpoint retry. |
@zhongqishang: Seems like an issue with checkpoint retries. Is there any chance to retry the issue with newer version of Flink? The currently supported versions are 1.17, 1.18, 1.19. |
Apache Iceberg version
1.2.1
Query engine
Flink 1.14.4
Please describe the bug 🐞
I have a flink upsert job with a checkpoint interval of 5 minutes and an external service periodically(30min) triggers the savepoint, parallelism = 1.
5 files were generated in one checkpoint cycle, including two data files, two eq delete files, and one pos delete file.
The 2 data files and 2 eq-delete files contained the same data. When I queried, duplicate data appeared.
I think it is because the subsequent eq delete is not associated with the first data file.
Flink TM log
JM log
Downloaded files and sizes:
The text was updated successfully, but these errors were encountered: