-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Add the iceberg files committer to collect data files and commit to iceberg table. #1185
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @openinx for the contribution.
I think we should consider a situation: Multiple jobs write a table at the same time; or kill the old job and restart a new stream job to write the table (this is more common, checkpointId will restart from 1).
So in this situation, we must do something like:
iceberg/spark3/src/main/java/org/apache/iceberg/spark/source/SparkStreamingWrite.java
Line 115 in 51c930e
String snapshotQueryId = summary.get(QUERY_ID_PROPERTY); |
Before we obtain max_checkpoint from snapshot, we should make sure the snapshot is written by this job.
|
||
operator.addSink(filesCommitter) | ||
.name(ICEBERG_FILES_COMMITTER) | ||
.uid(UUID.randomUUID().toString()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting the uid to be a randomly generated number is not a good practice. Operator uid's should be stable, such that between intentional redeployments or simply on restarts due to lost task managers, Flink will know which operators each state belongs to.
If the uid is changed between restarts, state could be lost.
You can find more information about the use of operator uid's with flink's savepoints (intentional restarts) and why stateful operators should be assigned a stable uid here: https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#matching-operator-state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the time, I simply assign the name
as the uid
. In Scala, I use an implicit class to assign name and uid at the same time with the same value. This assumes that the name of an operator is unique (which might not always be true, and is something that should be considered as a library author), but so far I've not run into issues with the handful of jobs that I oversee.
Given that #1180 and #1175 have been merged, would it be possible for you to merge those commits into this branch to make it easier to review? Or does the dependency on #1145 make that somewhat burdensome? Not a major issue, but it would help me in being able to just review / look through the code that's being proposed for addition and not the code that has already been merged in. If not, no worries. Appreciate all of the work it seems you've put into getting Flink going with Iceberg @openinx. I can see you've submitted quite a number of PRs on the matter so I'm especially looking to learn from you by reading and reviewing these PRs 👍 |
@Override | ||
public void initializeState(FunctionInitializationContext context) throws Exception { | ||
table = TableUtil.findTable(path, conf.get()); | ||
maxCommittedCheckpointId = parseMaxCommittedCheckpointId(table.currentSnapshot()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this method parseMaxCommittedCheckpointId
should be invoked when this job is stored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. here we could only initialize the max-checkpoint-id to -1, and could not read the checkpoint id from iceberg table if not in a restore job. Because we may have a table with 3 checkpoints and its max-committed-checkpoint id is 3, then we stop the current flink job and start another flink job to continue to write this table, its checkpoint id will start from 1, if we read max-commited-checkpoint id here then we will miss the first three checkpoint's data files in current flink job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a unit test to address this thing. https://github.com/apache/iceberg/pull/1185/files#diff-67c60f8d1a96e4583f5b53248df15bedR257
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have any idea about if the snapshots were removed by another process, and this process commit a new snapshot to iceberg table, the flink committer will not get the correct maxCommittedCheckpointId
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That means someone have expired the latest committed snapshot for a running flink job, usually we flink checkpoint interval will be several minutes, while the snapshot expire interval will be several days or weeks. It's unlikely that we will expire the latest committed snapshot unless someone set the unreasonable intervals.
On the other hand, even if someone removed the latest committed snapshot, the flink job will write the data to iceberg table correctly unless we restore the flink job once snapshot was removed.
|
||
// State for all checkpoints; | ||
private static final ListStateDescriptor<byte[]> STATE_DESCRIPTOR = | ||
new ListStateDescriptor<>("checkpoints-state", BytePrimitiveArraySerializer.INSTANCE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use Flink MapTypeInfo
, and addAll to sorted map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good, I've implemented it in this patch: 15dd8e0
|
||
@Override | ||
public void notifyCheckpointComplete(long checkpointId) { | ||
NavigableMap<Long, List<DataFile>> pendingFileMap = dataFilesPerCheckpoint.tailMap(maxCommittedCheckpointId, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why exclude maxCommittedCheckpointId
and checkpointId
? Just dataFilesPerCheckpoint.tailMap(checkpointId, true)
? I don't understand why we need store previous checkpoint id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added few comments about those members here 6b43ec5.
Why exclude maxCommittedCheckpointId and checkpointId?
The maxCommittedCheckPointId
's data files have been committed to iceberg, so no need to commit them again, otherwise the table will have redundant datas. For current checkpointId
, we've flushed the dataFilesOfCurrentCheckpoint
into dataFilesPerCheckpoint
in snapshotState
method, so we did not exclude the files for current checkpointId.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the current code, the data of the current checkpoint needs to be committed in the next cycle.
This should not be the ideal way, which can lead to excessive delay.
if (maxCommittedCheckpointId >= checkpointId) {
return;
}
Map<Long, List<DataFile>> pendingFileMap = dataFilesPerCheckpoint.headMap(checkpointId, true);
List<DataFile> pendingDataFiles = Lists.newArrayList();
for (List<DataFile> dataFiles : pendingFileMap.values()) {
pendingDataFiles.addAll(dataFiles);
}
-- Do AppendFiles committing
maxCommittedCheckpointId = checkpointId;
pendingFileMap.clear();
This is the process I imagined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the data of the current checkpoint needs to be committed in the next cycle.
That's not correct. Since we've merged the dataFilesOfCurrentCheckpoint
into dataFilesPerCheckpoint
, so the dataFilesPerCheckpoint.tailMap(maxCommittedCheckpointId, false);
will contains all the uncommitted data files, include the files from current checkpoint.
About your described process:
Map<Long, List<DataFile>> pendingFileMap = dataFilesPerCheckpoint.tailMap(checkpointId, true);
This seems won't commit all the previous files which have committed with failure . How about the data files between (maxCommittedCheckpointId, checkpointId) ?
In my thought, the current process should be correct. If there're some places I misunderstood, pls correct me,thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, wrong reading. Should beheadMap
instead of tailMap
. You can think again about my above example.
I got your point about tailMap
. But it is incorrect too. Consider the case:
- 1.snapshot(cpId)
- 2.snapshot(cpId + 1)
- 3.cpId success, notifyComplete(cpId)
- 4.cpId + 1 success, notifyComplete(cpId + 1)
If there is a failure between 3 and 4, the data in table will be incorrect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great case (which I did not consider) to proof your point. Yes, we need the headMap(checkpointId, true)
to avoid future data files being committed to iceberg table, I've fixed this point in 598f235.
Let me address above comments and fix the failed unit test. |
For this pull request, there are several issue I need to explain here:
The global id of job will also help to resolve the restart issue2, because we will know that the newly started job is starting from checkpoint=1.
I've address this two issue in the new patch , also attached the unit tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, left some comments.
@@ -113,7 +114,7 @@ protected PartitionKey partition(RowData row) { | |||
} | |||
} | |||
|
|||
private static class FlinkFileAppenderFactory implements FileAppenderFactory<RowData> { | |||
private static class FlinkFileAppenderFactory implements FileAppenderFactory<RowData>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add parquet support, and also add tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to wait until the parquet writer is available. #1272. Now only the parquet readers get merged.
@@ -113,7 +114,7 @@ protected PartitionKey partition(RowData row) { | |||
} | |||
} | |||
|
|||
private static class FlinkFileAppenderFactory implements FileAppenderFactory<RowData> { | |||
private static class FlinkFileAppenderFactory implements FileAppenderFactory<RowData>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can let FileAppenderFactory
extends Serializable
@@ -27,7 +28,7 @@ | |||
import org.apache.iceberg.encryption.EncryptedOutputFile; | |||
import org.apache.iceberg.encryption.EncryptionManager; | |||
|
|||
public class OutputFileFactory { | |||
public class OutputFileFactory implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the IcebergStreamWriter
will need the RowDataTaskWriterFactory
to create TaskWriter
, it could pass it to TaskWriterFactory
when constructing, that means all members inside RowDataTaskWriterFactory
should be serializable (include itself), unfortunately OutputFileFactory
is one of its member.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outputFileFactory
should be transient
in RowDataTaskWriterFactory, it is inited in initialize
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, it make sense.
private IcebergSinkUtil() { | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
static DataStreamSink<RowData> write(DataStream<RowData> inputStream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is a user API, I think a class FlinkSink
(Instead of IcebergSink?) is more suitable.
And do you think we can let it be a builder kind class?
Configuration conf, | ||
String fullTableName, | ||
Table table, | ||
TableSchema requestedSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I think it is just flinkSchema
.
// checkpoint id should be positive. If it's not positive, that means someone might have removed or expired the | ||
// iceberg snapshot, in that case we should throw an exception in case of committing duplicated data files into | ||
// the iceberg table. | ||
Preconditions.checkArgument(maxCommittedCheckpointId > 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe should be checkState
.
|
||
private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class); | ||
private static final String GLOBAL_FILES_COMMITTER_UID = "flink.files-committer.uid"; | ||
private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed.checkpoint.id"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkpoint-id
?
private static final long INITIAL_CHECKPOINT_ID = -1L; | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class); | ||
private static final String GLOBAL_FILES_COMMITTER_UID = "flink.files-committer.uid"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uid -> job-id?
.setParallelism(1) | ||
.setMaxParallelism(1); | ||
|
||
return returnStream.addSink(new DiscardingSink()).setParallelism(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please name the sink.
String fullTableName, | ||
Table table, | ||
TableSchema requestedSchema) { | ||
IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, requestedSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just notice IcebergStreamWriter
does not set chaining strategy. Should add in the constructor of IcebergStreamWriter
: setChainingStrategy(ChainingStrategy.ALWAYS);
I think this one is ready to review and commit, right? |
That's right. ( After this DataStream sink, the next patch is #1348). |
Rebased the master and did the things btw:
|
Hi @openinx , for the API level, I think we can do more:
Just like:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me overall, left some minor comments.
|
||
public class FlinkSink { | ||
|
||
private static final TypeInformation<DataFile> DATA_FILE_TYPE_INFO = TypeInformation.of(DataFile.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: I think these fields not have to be static.
return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo); | ||
} | ||
|
||
static Long getMaxCommittedCheckpointId(Table table, String flinkJobId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the default value of maxCommittedCheckpointId
can be null instead of INITIAL_CHECKPOINT_ID
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me, initializing the maxCommittedCheckpointId
to -1 seems simpler to handle, because when calling SortedMap#tailMap
or SortedMap#headMap
we don't need to check nullable. The comparasion in notifyCheckpointComplete
is the similar point.
IcebergFilesCommitter(TableLoader tableLoader, Configuration hadoopConf) { | ||
this.tableLoader = tableLoader; | ||
this.hadoopConf = new SerializableConfiguration(hadoopConf); | ||
setChainingStrategy(ChainingStrategy.ALWAYS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can let committer be a single node.
flinkJobId = getContainingTask().getEnvironment().getJobID().toString(); | ||
|
||
// Open the table loader and load the table. | ||
tableLoader.open(hadoopConf.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to close this tableLoader
in the dispose
.
Ping @rdblue , we ( JingsongLi and I ) have reached a basic agreement about this patch. Would you mind to take a look when you have time ? Thanks. |
Since both parquet reader and writer have been merged, so let me update this patch to add parquet into unit tests. |
"Should initialize input DataStream first with DataStream<Row> or DataStream<RowData>"); | ||
Preconditions.checkArgument(rowInput == null || rowDataInput == null, | ||
"Could only initialize input DataStream with either DataStream<Row> or DataStream<RowData>"); | ||
Preconditions.checkNotNull(table, "Table shouldn't be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we already have TableLoader, will it be easier for user if we load table here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The table
is used to initialize IcebergStreamWriter
at client side, while the tableLoader
is used to load table at TaskManager
side, using the tableLoader
for TaskManager
to load table for client side looks strange to me.
…it to iceberg table.
// <2, <file3>>. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect | ||
// any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit | ||
// iceberg table when the next checkpoint happen. | ||
private final NavigableMap<Long, List<DataFile>> dataFilesPerCheckpoint = Maps.newTreeMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in the Netflix version, we track old checkpoint state by writing a manifest file and appending that manifest to the table. The advantage is that state is really small so the job can be blocked from committing for a long time (days) and can recover when connectivity to commit has been restored.
We use this to handle resilience when AWS regions can't talk to one another and our metastore is in a single region.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for this approach. There may be thousands of datafiles in a checkpoint for partitioned table.
- In
snapshotState
,serialize files to Flink state
VS.serialize files to manifest file
, they should cost about the same. - In
notifyCheckpointComplete
, just commit a manifest file has better performance instead of commit data files. - Can reduce the size of States, which are actually put into the memory of the job manager.
Although it is rare that checkpoint fails for a long time, it can improve the robustness of the system.
But, this approach does lead to more code complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I saw that it will archive all the finished data files into a manifest file and maintain only the manifest files in state backend. (https://github.com/apache/iceberg/pull/856/files#diff-0245903e25b2cace005c52b2c7b130bbR372) . For my understanding, that's a great point to handle the case that we fail to commit to iceberg table frequently (because we'v buffered all the data files in the manifest files rather than flink state backend), but I'm no sure whether it is a common case for most of users, the commit failure should be easily found if they separated the data files regions from the metastore region, others also use this way to maitain their data and meta ? or their states are enough to maintain those uncommitted data files ?
If the commit failure is a common case, I'd happy to contribute to make those data files buffered in a manifest file. For now, I think we can make the simple state way go firstly. Does that make sense ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I completely agree that we should not block adding the sink until this is done. I just wanted to note that this is a good place to put some extra effort. I think that multi-region deployments are common and this can really help. We don't keep Kafka data for a long time, so it is important to us that Flink continues to consume Kafka data to eventually commit to the table, even if it can't commit the manifests right away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu ^^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to provide more context on why we need it. Our data warehouses (metastore) only lives in us-east-1 region, while Flink streaming jobs can run in 3 regions (us-east-1, us-west-2, and eu-west-1). Transient commits failures happen from time to time, which aren't big concerns. We are more concerned about extended outages (like a day) for whatever reason (us-east-1 outage, cross-region network issue, metastore service outage). For high-parallelism or event time partitioned tables, there could be thousands or tens of thousands of files per checkpoint interval. This manifest approach allows the Flink jobs to handle those extended outages better.
Flink operator list state can't handle large state well. I vaguely remember limit is 1 or 2 GBs. And it can get pretty slow when the list is large.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The manifest approach sounds a good improvement to me, it's a todo issue in the next improvement #1403.
flinkJobId = getContainingTask().getEnvironment().getJobID().toString(); | ||
|
||
// Open the table loader and load the table. | ||
tableLoader.open(hadoopConf.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be closed?
https://github.com/apache/iceberg/pull/1346/files#r474605890
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, looks like this is done in dispose()
-- here's a good example of where prefixing with this.
would be more clear.
// Open the table loader and load the table. | ||
tableLoader.open(hadoopConf.get()); | ||
table = tableLoader.loadTable(); | ||
maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: I prefer to prefix field names with this.
when setting instance fields, so it is obvious that it is not a local variable.
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
Show resolved
Hide resolved
Overall, I think this is ready to commit. I'd prefer to remove |
|
||
FlinkSink.forRowData(dataStream) | ||
.table(table) | ||
.tableLoader(tableLoader) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: it seems odd to provide the loader and the table. Couldn't the loader be called in the sink builder?
Merged. Thanks @openinx, it is great to have this in. And thanks for helping review, @JingsongLi and @kbendick! |
What are the next steps for Flink writes? I think we can probably start writing some documentation. The tests for the sink look like this is mostly complete. Maybe docs are an area that @kbendick could help with? |
IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, hadoopConf); | ||
|
||
DataStream<Void> returnStream = rowDataInput | ||
.transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(DataFile.class), streamWriter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there can be multiple Iceberg sinks in the same job. we probably should add the table identifier string suffix to make operator name and id unique. We have a unique sinkName
within a job and we add the sinkName
suffix for operator name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good to me, it's necessary to support multiple iceberg sinks in the same job, I will open an issue and provide a patch with unit tests to address this thing.
DataStream<Void> returnStream = rowDataInput | ||
.transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(DataFile.class), streamWriter) | ||
.setParallelism(rowDataInput.getParallelism()) | ||
.transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
committer is a stateful operator, we should probably explicitly set uid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
never mind. saw earlier comment that this is a todo item.
.setParallelism(1) | ||
.setMaxParallelism(1); | ||
|
||
return returnStream.addSink(new DiscardingSink()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious about the reason why don't we make the committer a sink function and instead add a dummy DiscardingSink.
Conceptually, this writer-committer combo is the reverse/mirror of split enumerator-reader FLIP-27 source interface. It will be nice to run committer on jobmanager (similar to enumerator). This way, Iceberg sink won't change the nature of the embarrassingly-parallel DAG.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would answer your question about why did we use an operator rather than sink function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@openinx thx. that does answer my question.
Still adding a DiscardingSink may confuse users. It seems that we really need a unified/improved sink interface (similar to FLIP-27) to supported bounded input jobs in sink function.
What about any other Flink sinks used by bounded streaming/batch job? Do they all have to go through this model?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There're three cases: 1> unbounded streaming job; 2> bounded streaming job; 3> batch job. If users only need the unbounded streaming
ability, then only need to implement the SinkFunction
, otherwise if need both unbounded streaming
and bounded streaming
ability, then we need to extend/implement the AbstractStreamOperator
& BoundedOneInput
and add the DiscardingSink
to the tail. If want to batch
ability, then need to provide an OutputFormat
implementation. In future flink, we will unify the case#2 and case#3 in one sink interface, but for now we have to implement separately for the bounded streaming and batch cases.
Flink hive connector is a good case, which have support case1, case2 and case3. It also use the similar way to the current iceberg sink connector now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough. we can't solve this problem until Flink improved sink interface.
|
||
DataStream<Void> returnStream = rowDataInput | ||
.transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(DataFile.class), streamWriter) | ||
.setParallelism(rowDataInput.getParallelism()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a good default value for writer parallelism. we have users who want to explicitly control the writer parallelism to control the number of written files. in the future, we may want to allow user to set parallelism in the builder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense to me, could be a following issue.
return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo); | ||
} | ||
|
||
static long getMaxCommittedCheckpointId(Table table, String flinkJobId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice. this is the reverse iteration I was looking for. we were just walking through the table.snapshots() iterable.
} | ||
|
||
@Override | ||
public void notifyCheckpointComplete(long checkpointId) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if Flink guarantees the serialized execution for notifyCheckpointComplete?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the current implementation, it should be, because task checkpoints are ordered and RPC messages are ordered. But it's better not to rely on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx. should we raise a jira to track the task?
@JingsongLi you are saying that notifyCheckpointComplete and snapshotState are serialized by the same lock/mutex, right? Otherwise, I can see problem with the concurrent checkpoint handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. You don't need care about thread safety.
|
||
this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR); | ||
if (context.isRestored()) { | ||
this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, flinkJobId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might have a problem if redeploying the Flink job from external checkpoint. It is a new flinkJobId
in this case. maxCommittedCheckpointId will be -1
. As a result, we can commit those committed files again later.
The way we do de-dup is to generate a hash for the manifest file path and store the hash in snapshot summary. During restore, we use the hash to de-dup if the manifest file was committed or not.
List<String> hashes = new ArrayList<>(flinkManifestFiles.size());
AppendFiles appendFiles = transaction.newAppend();
for (FlinkManifestFile flinkManifestFile : flinkManifestFiles) {
appendFiles.appendManifest(flinkManifestFile);
hashes.add(flinkManifestFile.hash());
}
appendFiles.set(
COMMIT_MANIFEST_HASHES_KEY, FlinkManifestFileUtil.hashesListToString(hashes));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a great point that we could handle, I think we can attach both the flinkJobId
and maxCommittedCheckPointId
to the checkpoint state. when in restoring path, we read the flinkJobId
and maxCommittedCheckpointId
from states, and compare the current flink job id with flinkJobId
from state. If the job ids are matched, then it is surely be the case that restoring without redploying(case#1), otherwise it's the case you said (case#2).
For case#1, the current code should be correct.
For case#2, we should use the old flinkJobId
from state to parse the maxCommittedCheckpointId
in iceberg table, and use that checkpoint id to filter all the committed data files.
Does that make sense ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the proposed solution. it should work.
nit: on class names IcebergStreamWriter -> IcebergWriter, because it also BoundedOneInput interface, which seems like a batch job. IcebergFilesCommitter -> IcebergCommitter. Iceberg only commits files. So including Files seems redundant. |
|
||
SortedMap<Long, List<DataFile>> restoredDataFiles = checkpointsState.get().iterator().next(); | ||
// Only keep the uncommitted data files in the cache. | ||
this.dataFilesPerCheckpoint.putAll(restoredDataFiles.tailMap(maxCommittedCheckpointId + 1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our implementation, we immediately commit any uncommitted files upon restore to avoid waiting for another checkpoint cycle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Committing those uncommitted data files here immediately sounds good to me, it should won't impact the correctness here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considered this issue again, if redeploying flink job from external checkpoint, then all data files from restoredDataFiles
are from the old flink job, which means its checkpoint id could be range from 1~N. If we put all those files into dataFilesPerCheckpoint
, that will introduce the problem: it did not align with the checkpoint id with the new flink job, saying when notifyCheckpointComplete(1)
for the new flink job, it won't commit all the old data files with checkpointId > 1 to iceberg table. That's incorrect.
So it should commit those remaining uncommitted data files to iceberg immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@openinx if the new job is redeployed from checkpoint N taken by the old job, checkpointId will start from N+1 for the new job. that is my observation of Flink behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK , checked the flink savepoint code here, it was designed as you said. https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1371 .
Thanks for the confirmation, then I think committing those files in next checkpoint cycle should also be OK, but there's no harmness for us to commit them immediately.
} | ||
} | ||
|
||
private void commitUpToCheckpoint(long checkpointId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just want to point out that this will commit if there is zero pending files. For us, we actually still want to commit in this case mainly to update the region watermark info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That should be similar to this point from this comment, we actually are doing the logic you said and provided a unit test to address this thing.
|
||
@Override | ||
public void processElement(StreamRecord<DataFile> element) { | ||
this.dataFilesOfCurrentCheckpoint.add(element.getValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this probably gets complicated when we allow concurrent checkpoints. Committer can receive files from both checkpoint N and N+1. We probably need add the checkpointId in the DataFile. It might make sense to provide FlinkDataFile
wrapper so that we can add Flink additional metadata.
We have FlinkDataFile
in our implementation for transmitting low and high timestamps. Now thinking about this issue. Maybe we can include checkpointId too so that committer can distinguish data files from different checkpoints.
public class FlinkDataFile implements Serializable {
private final long lowWatermark;
private final long highWatermark;
private final DataFile dataFile;
This does imposes additional requirement on the writer. It needs to know the last/next checkpointId.
- for job started without checkpoint, the last checkpointId is 0
- for job started with checkpoint, now IcebergWriter needs to know the last checkpointId retored.
I couldn't find the checkpointId from the restored context in the initializeState(context)
method for either AbstractStreamOperator
or RichSinkFunction
. It will be nice if it can be exposed.
Alternatively, we can store the nextCheckpointId in the operator state. However, they also have some problems.
- operator list state can't deal with rescale as new subtasks won't get state
- operator union list state is not scalable. Kafka source is suffering the scalability issue with union state.
Note that we may flush file before checkpoint barrier comes. We have two use cases to need the rollover by time and file size.
- As I mentioned in another comment, we have Flink streaming jobs running in 3 regions in AWS and data warehouses live only in us-east-1. There is a backend service monitor files in two other remote regions and lift/copy them back to the us-east-1 home region. S3 cross-region file copy has 5 GB limit. For that reason, we flushes files if the size reaches 4 GB.
- When we implement rough ordering for Iceberg source, we need event time alignment. In these cases, we use Kafka broker time as event time since our Iceberg source tries to emulate the rough ordering for Kafka source. One possible solution is to roll over the file when the min and max timestamp reached certain threshold (e.g. 5 minutes).
As a user, how can i use flink write my DataStream to iceberg. |
Hi @tangchenyang The unit test TestFlinkIcebergSink provide a complete demo to show you how to write records into iceberg table by flink. I'm writing a document to show users how to write data into iceberg by both table sql and datastream api, but not finished yet. |
Greate! Many thanks. @openinx |
As we discussed in the pull request #856, the flink sink connector actually are composed by two stateful flink operator:
FlinkStreamWriter
is used for receiving the records and writing them toDataFile
(s), when a checkpoint happen then it will emit the completedDataFile
to downstream operator.FlinkFilesCommitter
: it will collect theDataFile
fromFlinkStreamWriter
and commit the files to iceberg in a transaction, it's a single parallelism operator because we don't expect transaction conflict happen when transaction committing.This patch mainly implemented the
FlinkFilesCommitter
and provided several end to end integrate tests to address the correctness. the dependencies of this one are:#1180,#1145,#1175