Closed
Description
I'm using Flink to write CDC data to iceberg, and I'm using the master branch.I came across the following problem.
Caused by: org.apache.iceberg.exceptions.ValidationException: Cannot determine history between starting snapshot null and current 8203201752131271868
at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:46)
at org.apache.iceberg.MergingSnapshotProducer.validateDataFilesExist(MergingSnapshotProducer.java:313)
at org.apache.iceberg.BaseRowDelta.validate(BaseRowDelta.java:95)
at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:162)
at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:283)
at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404)
at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:213)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:197)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189)
at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:282)
at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitOperation(IcebergFilesCommitter.java:298)
at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitDeltaTxn(IcebergFilesCommitter.java:285)
at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:210)
at org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:147)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:285)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
When the Flink job is running, I start a spark job to rewrite data.This is my spark code
RewriteDataFilesActionResult rewriteDataFilesActionResult = Actions.forTable(sparkSession, table)
.rewriteDataFiles()
.targetSizeInBytes(TARGET_SIZE)
.splitLookback(SPLIT_LOOK_BACK)
.execute();
List<String> orphanFiles = Actions.forTable(sparkSession, table)
.removeOrphanFiles()
.olderThan(expireOrphanTime)
.execute();
BaseExpireSnapshotsSparkAction expireSnapshotsSparkAction = new BaseExpireSnapshotsSparkAction(sparkSession, table);
ExpireSnapshots.Result result = expireSnapshotsSparkAction
.expireOlderThan(expireSnapshotTime)
.retainLast(30)
.execute();
Can someone help me see this problem? Thanks.
Does RewriteAction rewrite some datafiles that Flink has not yet committed?
Metadata
Metadata
Assignees
Labels
No labels
Activity