Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink sink writes duplicate data in upsert mode #10431

Closed
zhongqishang opened this issue Jun 3, 2024 · 9 comments · Fixed by #10526
Closed

Flink sink writes duplicate data in upsert mode #10431

zhongqishang opened this issue Jun 3, 2024 · 9 comments · Fixed by #10526
Labels
bug Something isn't working flink

Comments

@zhongqishang
Copy link
Contributor

zhongqishang commented Jun 3, 2024

Apache Iceberg version

1.2.1

Query engine

Flink 1.14.4

Please describe the bug 🐞

I have a flink upsert job with a checkpoint interval of 5 minutes and an external service periodically(30min) triggers the savepoint, parallelism = 1.

5 files were generated in one checkpoint cycle, including two data files, two eq delete files, and one pos delete file.
The 2 data files and 2 eq-delete files contained the same data. When I queried, duplicate data appeared.
I think it is because the subsequent eq delete is not associated with the first data file.

Flink TM log

2024-05-31 16:10:57.457 org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.zstd]
2024-05-31 16:10:57.459 org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.zstd]
2024-05-31 16:10:57.462 org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] - Checkpoint 5765 has been notified as aborted, would not trigger any checkpoint.
2024-05-31 16:13:58.455 org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.zstd]
2024-05-31 16:13:58.505 org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.zstd]
2024-05-31 16:13:58.507 org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.zstd]

JM log

2024-05-31 16:08:12.840 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 5764 (type=CHECKPOINT) @ 1717142891998 for job fc721024df3d70e3a1f3a46a63e9635a.
2024-05-31 16:08:16.239 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Triggering savepoint for job fc721024df3d70e3a1f3a46a63e9635a.
2024-05-31 16:08:16.242 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 5765 (type=SAVEPOINT) @ 1717142896239 for job fc721024df3d70e3a1f3a46a63e9635a.
2024-05-31 16:09:41.531 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed checkpoint 5764 for job fc721024df3d70e3a1f3a46a63e9635a (7170 bytes, checkpointDuration=89495 ms, finalizationTime=38 ms).
2024-05-31 16:09:41.532 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking checkpoint 5764 as completed for source Source: TableSourceScan(table=[[default_catalog, default_database, cdc_xxx]], fields=[id, data_status, ...]).
2024-05-31 16:10:46.242 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 5765 of job fc721024df3d70e3a1f3a46a63e9635a expired before completing.

Downloaded files and sizes:

-rw-r--r--@ 1 q  staff   30528 May 31 19:36 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01926.parquet
-rw-r--r--@ 1 q  staff     701 May 31 19:36 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01927.parquet
-rw-r--r--@ 1 q  staff  741706 May 31 19:36 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01928.parquet
-rw-r--r--@ 1 q  staff   17592 May 31 19:37 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01929.parquet
-rw-r--r--@ 1 q  staff    1978 May 31 19:37 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01930.parquet
@zhongqishang zhongqishang added the bug Something isn't working label Jun 3, 2024
@Fokko Fokko added the flink label Jun 3, 2024
@pvary
Copy link
Contributor

pvary commented Jun 3, 2024

@zhongqishang: How is your sink/table created? What are the exact records you are sending to the sink? Your issue seems very similar to: #10076

@zhongqishang
Copy link
Contributor Author

@zhongqishang: How is your sink/table created? What are the exact records you are sending to the sink? Your issue seems very similar to: #10076

@pvary Thanks for your reply.

The follow is create table sql :

CREATE TABLE `iceberg_table` ( 
  `id` String, 
  ... 
  PRIMARY KEY (`id`) NOT ENFORCED  
) WITH ( 
    'connector'='iceberg',
    'catalog-name'='iceberg_catalog',
    'catalog-database'='database',
    'catalog-table'='table',
    'catalog-type'='hive',
    'uri'='xx',
    'hive-conf-dir'='xx',
    'write.format.default'='parquet',
    'format-version'='2',
    'write.upsert.enabled'='true',
    'write.metadata.metrics.default'='full',
    'write.target-file-size-bytes'='268435456',
    'write.parquet.compression-codec'='zstd',
    ...
);

This is not similar to #10061 , The background of this problem is accompanied by concurrent checkpoint and savepoint, the next savepoint is aborted.

@pvary
Copy link
Contributor

pvary commented Jun 4, 2024

@zhongqishang: Do you see anything more in the logs? Exceptions/retries, or something like this?

Also, I don't fully understand your statement here:

I think it is because the subsequent eq delete is not associated with the first data file.

Could you p lease elaborate a but?

@zhongqishang
Copy link
Contributor Author

zhongqishang commented Jun 5, 2024

@zhongqishang: Do you see anything more in the logs? Exceptions/retries, or something like this?

I have not found any Exceptions/retries around the wrong snapshot time. To be clear, this is duplicate data that only exists from a certain snapshot onwards.

I think it is because the subsequent eq delete is not associated with the first data file.

Could you p lease elaborate a but?

TM snapshot commit log :

2024-05-31 16:13:58.916 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter          [] - Committing rowDelta for checkpoint 5766 to table iceberg_catalog.ods_iceberg_xxx.xxx branch main with summary: CommitSummary{dataFilesCount=2, dataFilesRecordCount=1016, dataFilesByteCount=772234, deleteFilesCount=3, deleteFilesRecordCount=1016, deleteFilesByteCount=20271}
2024-05-31 16:13:59.254 INFO org.apache.iceberg.metrics.LoggingMetricsReporter            [] - Received metrics report: CommitReport{tableName=iceberg_catalog.ods_iceberg_xxx.xxx, snapshotId=6176224982712390258, sequenceNumber=7987, operation=overwrite, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.326962754S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=2}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=77}, addedDeleteFiles=CounterResult{unit=COUNT, value=3}, addedEqualityDeleteFiles=CounterResult{unit=COUNT, value=2}, addedPositionalDeleteFiles=CounterResult{unit=COUNT, value=1}, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=84}, addedRecords=CounterResult{unit=COUNT, value=1016}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=43962360}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=792505}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=16066186371}, addedPositionalDeletes=CounterResult{unit=COUNT, value=50}, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=2796437}, addedEqualityDeletes=CounterResult{unit=COUNT, value=966}, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=8596}}, metadata={engine-version=1.14.4, engine-name=flink, iceberg-version=Apache Iceberg 1.2.1 (commit 4e2cdccd7453603af42a090fc5530f2bd20cf1be)}}

The files and data submitted by this error snapshot are:

-rw-r--r--@ 1 q  staff   30528 May 31 19:36 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01926.parquet
-rw-r--r--@ 1 q  staff     701 May 31 19:36 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01927.parquet
-rw-r--r--@ 1 q  staff  741706 May 31 19:36 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01928.parquet
-rw-r--r--@ 1 q  staff   17592 May 31 19:37 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01929.parquet
-rw-r--r--@ 1 q  staff    1978 May 31 19:37 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01930.parquet

DATA file : 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01926.parquet

{"id": "d6957bf663d220cac89a4e780ab8f54d", ...}

EQ-DELETE file : 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01927.parquet

{"id": "d6957bf663d220cac89a4e780ab8f54d"}

DATA file : 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01928.parquet

{"id": "d6957bf663d220cac89a4e780ab8f54d", ...}

EQ-DELETE file : 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01930.parquet
EQ-DELETE file : 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01929.parquet

{"id": "d6957bf663d220cac89a4e780ab8f54d"}

I mean eq delete file 0193001929 is not associated with data file 01926

@pvary
Copy link
Contributor

pvary commented Jun 10, 2024

Here is how the different deletes work:

  • EQ-DELETE - removes all occurrences of the record with the given id BEFORE the snapshot
  • POS-DELETE - removes a given row from the given data file BEFORE OR IN the given snapshot.

Based on the data you have shown, I would guess that there were 2 updates for the given record in the snapshot.
Flink does the following:

  • In upsert mode, it always writes the EQ-DELETE for the id, and writes the new record. Also it stores the filename and the id for all written record in memory.
  • If there is a new update for an id which arrived in the same snapshot, then it also writes the positional delete file.

Based on the commit message you have shown, it tells us, that it has written 3 delete files - I expect that one of the delete files is the positional delete. It should contain the delete record for the given row.

Could you please check it this is the case?
Thanks,
Peter

@zhongqishang
Copy link
Contributor Author

zhongqishang commented Jun 10, 2024

@pvary Yes, the 01930 file is a pos delete file, but the file path is only contain the data file 01928.
It should contain the first data update in the first data file 01926, but it does not.

Sorry, I have corrected the wrong file ID provided above.

@zhongqishang
Copy link
Contributor Author

@pvary I encountered the same problem on another table, this time it was caused by a checkpoint PRC timeout.

JM log

2024-06-07 15:50:10.472 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 86447 (type=CHECKPOINT) @ 1717746610457 for job dd6dc4caf9d34a7d91767420862f4826.
2024-06-07 15:50:20.496 [flink-scheduler-1] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering Checkpoint 86447 for job dd6dc4caf9d34a7d91767420862f4826 failed due to java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(null.triggerCheckpoint(ExecutionAttemptID, long, long, CheckpointOptions))] at recipient [akka.tcp://flink@hd-097.ld-hadoop.com:36663/user/rpc/taskmanager_0] timed out.
2024-06-07 15:50:32.594 [jobmanager-io-thread-71] WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Received late message for now expired checkpoint attempt 86447 from task a2e42c20a8234511b70c26b835ec5552 of job dd6dc4caf9d34a7d91767420862f4826 at container_e53_1714278166039_374784_01_000004 @ hd-097.ld-hadoop.com (dataPort=33266).
2024-06-07 15:55:10.480 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 86448 (type=CHECKPOINT) @ 1717746910457 for job dd6dc4caf9d34a7d91767420862f4826.
2024-06-07 15:55:13.450 [jobmanager-io-thread-76] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed checkpoint 86448 for job dd6dc4caf9d34a7d91767420862f4826 (7769 bytes, checkpointDuration=2936 ms, finalizationTime=57 ms).

@pvary
Copy link
Contributor

pvary commented Jun 11, 2024

Seems like an issue with checkpoint retry.
Will be out of office for a bit, but this needs to be investigated.

@pvary
Copy link
Contributor

pvary commented Jun 17, 2024

@zhongqishang: Seems like an issue with checkpoint retries. Is there any chance to retry the issue with newer version of Flink? The currently supported versions are 1.17, 1.18, 1.19.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working flink
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants