-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: new sink base on the unified sink API #4904
Conversation
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
Outdated
Show resolved
Hide resolved
62069cc
to
dcc635e
Compare
cc @openinx @chenjunjiedada @stevenzwu @kbendick , PTAL , thx. |
e893aa3
to
10e49ca
Compare
Thanks for ping me, I will take a look this week. |
.../flink/src/main/java/org/apache/iceberg/flink/sink/v2/IcebergFlinkCommittableSerializer.java
Outdated
Show resolved
Hide resolved
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/v2/TestFlinkIcebergSink.java
Outdated
Show resolved
Hide resolved
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/v2/TestFlinkIcebergSinkV2.java
Outdated
Show resolved
Hide resolved
throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e); | ||
} | ||
} else { | ||
this.table = newTable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the table loader can not be null, why do we need to pass in the table and not just call loadTable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Providing a table would avoid so many table loading from each separate task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by each separated task? We usually have only one sink in the Flink job. Or you mean the table is loaded previously and here can be skipped?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we'll have multiple sinks with multiple writes and globally one committer.
ece1b24
to
3595ba6
Compare
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/v2/FlinkSink.java
Outdated
Show resolved
Hide resolved
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/v2/FlinkSink.java
Outdated
Show resolved
Hide resolved
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/v2/FlinkSink.java
Outdated
Show resolved
Hide resolved
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/v2/IcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/v2/IcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/v2/IcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/v2/IcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/v2/IcebergStreamWriter.java
Outdated
Show resolved
Hide resolved
905774a
to
8258d07
Compare
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
Outdated
Show resolved
Hide resolved
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
Outdated
Show resolved
Hide resolved
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/writer/StreamWriter.java
Outdated
Show resolved
Hide resolved
|
||
this.taskWriterFactory = taskWriterFactory; | ||
// Initialize the task writer factory. | ||
taskWriterFactory.initialize(numberOfParallelSubtasks, subTaskId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not correct.
void initialize(int taskId, int attemptId);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. Honestly, this is one of the things that puzzled me. So let's talk about it.
There is only one implementation of initialize
:
@Override
public void initialize(int taskId, int attemptId) {
this.outputFileFactory =
OutputFileFactory.builderFor(table, taskId, attemptId).format(format).build();
}
And the signature of OutputFileFactory.builderFor
goes like this:
public static Builder builderFor(Table table, int partitionId, long taskId) {
return new Builder(table, partitionId, taskId);
}
so, in the initialize
method, the pass-through relationship is as follows:
initialize:taskId -> builderFor.partitionId
initialize:attemptId -> builderFor.taskId
It makes me feel a little confused.
I'm trying to reestablish this relationship here:
numberOfParallelSubtasks -> builderFor.partitionId
subTaskId -> builderFor.taskId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like Ryan answered it here: #5586 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy that. Let me fix it.
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
Outdated
Show resolved
Hide resolved
} | ||
}) | ||
.uid(uidPrefix + "-pre-commit-topology") | ||
.global(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a fan of using global partitioner to achieve effectively global committer behavior with parallel downstream committer operators. if we only need a single-parallelism committer, why don't we make it explicitly so. having parallel committers tasks and only route committables to task 0 is weird. It can also lead to more questions from users why other committer subtasks have zero input. Sink V1 interface from Flink has GlobalCommitter
interface, which would work well for Iceberg use case. It seems that in sink V2 has no equivalent replacement. Javadoc mentioned WithPostCommitTopology
.
Are we supposed to do sth like this for global committer?
private class GlobalCommittingSinkAdapter extends TwoPhaseCommittingSinkAdapter
implements WithPostCommitTopology<InputT, CommT> {
@Override
public void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables) {
StandardSinkTopologies.addGlobalCommitter(
committables,
GlobalCommitterAdapter::new,
() -> sink.getCommittableSerializer().get());
}
}
But they what is the purpose of the regular committer from TwoPhaseCommittingSink#createCommitter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to avoiding having subtasks that don’t process any input. Even if it’s correct, it will look strange when monitoring. Being more explicit by using the code @stevenzwu mentioned or somehow repartitioning to 1 would make the user experience better as seeing many tasks where only one gets input would be the first thing I’d start investigating upon too much backpressure etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hililiwei @pvary @kbendick FYI, I started a discussion thread in dev@flink. subject is Sink V2 interface replacement for GlobalCommitter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lack of GlobalCommitter in the v2 sink interface makes it not a good fit for Iceberg sink. I added a few more comment in the writer class: https://github.com/apache/iceberg/pull/4904/files#r951000248.
I think we need to address this high-level design question first with the Flink community before we can really adopt the v2 sink interface. So far, we haven't heard back anything on the email thread. We will have to figure out who can we work with from the Flink community.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah agreed that the lack of global committer presents a problem for us. Even the post-commit topology sounds like possibly not the right place (based on name alone even). Plus we lose access to using that for doing small file merging etc
The Flink community has a Slack now. Maybe we can go fishing for support / people to discuss with over there? I’m gonna look for the invite link but if anybody needs an invite now, I think I can do it if you send me your email.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah agreed that the lack of global committer presents a problem for us. Even the post-commit topology sounds like possibly not the right place (based on name alone even). Plus we lose access to using that for doing small file merging etc
post-commit topology is one cycle behind Commiter, so it is currently fine to use it for small file merges and index generation, and we use it internally as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lack of GlobalCommitter in the v2 sink interface makes it not a good fit for Iceberg sink. I added a few more comment in the writer class: https://github.com/apache/iceberg/pull/4904/files#r951000248.
I think we need to address this high-level design question first with the Flink community before we can really adopt the v2 sink interface. So far, we haven't heard back anything on the email thread. We will have to figure out who can we work with from the Flink community.
As discussed in the mail thread, the ability to customize Commiter parallelism is what we need. This capability does not seem to be available until at least >flink 1.17, and for now, there are no obvious blocking issues that require us to use the new sink right away. So it looks like we might be able to wait until flink provides this capability before going back and optimizing it.
But maybe one thing we need to consider is that the auto-compacting of small files is a good thing for the user. Maybe we can consider using it the current way (using global), because it doesn't have a problem with functionality, just commiters that won't do anything. Adaptation comes after flink provides the ability to customize Commiter parallelism.
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/writer/StreamWriter.java
Show resolved
Hide resolved
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
Outdated
Show resolved
Hide resolved
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
Show resolved
Hide resolved
@Override | ||
public List<StreamWriterState> snapshotState(long checkpointId) { | ||
List<StreamWriterState> state = Lists.newArrayList(); | ||
state.add(new StreamWriterState(Lists.newArrayList(writeResultsState))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checkpoint individual data files as writer state can be expensive. E.g., if one checkpoint cycle produces a lot of files with many columns, the amount of metadata can be very significant.
I am also think we shouldn't checkpoint the flushed files as writer state. How does it help with fault tolerance? The current FlinkSink writer only flush the data files and emit them to the downstream committer operator.
In the current FlinkSink
implementation, IcebergFilesCommitter
checkpoint a staging ManifestFile
object per checkpoint sync. Here we only need to checkpoint a single manifest file per cycle, which is very small in size. Upon successful commit, committer clears the committed manifest files. Upon commit failure, manifest files are accumulate and attempted for commit in the next checkpoint cycle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
writeResultsState
contains only the metadata of the data file. We don't really put data into the state.
If we remove the state here, this part of the data may be lost if the task fails after the file is closed and before it is sent to the committer.
The more important reason is, in the v2 Sink , CommT
is transferred between the Writer and Commiter, flink automatically processes its state. If we're going to use Manifest
instread of WriteResult
as CommT
, it means need to generate it in the every writer operator, so that it can be passed to the commiter, i think that's not reasonable. If we generate it in Commiter, as the IcebergFilesCommitter
did, because the CommT
still is WriteResult
, the new version of Commiter doesn't put it in the state, so there is no practical point in generating it. Given the differences between the v1 and v2 versions, I currently feel it would be better to use WriterResult directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not saying with should use Manifest
as CommT
. Again, this is where I feel the v2 sink interface is not a good fit for Iceberg. In the v1 sink interface, we have GlobalCommT
for GlobalCommitter
. That will be the ideal interface for Iceberg sink. WriteResult
would be CommT
and Manifest
will be GlobalCommT
.
public interface GlobalCommitter<CommT, GlobalCommT>
Here is the how that Iceberg sink works today and how I hope that it should continue to work with v2 sink interface. Writers flushed the data files and send WriteResult
to the global committer. The global committer checkpoints the Manifest
file for fault tolerance/exactly-once.
I understand that WriteResult
only contains the metadata. It can still be substantial in certain use cases. E.g.,
- writer parallelism is 1,000
- Each writer writes to 100 files (e.g. bucketing or hourly partition by event time)
- table has many columns. with column-level stats (min, max, count, null etc.). it would be significant metadata bytes per data file. say there are 200 columns, and each columns stats take up 100 bytes. that is 20 KB per file.
- each checkpoint would have size of 20 KB x 100 x 1,000. that is 2 GB.
- if checkpoint interval is 5 minutes and commit can't succeed for 10 hours, we are talking about 120 checkpoints. Then Iceberg sink would accumulate 240 GB of state.
I know this is the worst-case scenario, but it is the motivation behind the current IcebergSink design where the global committer only checkpoint one ManifestFile
per checkpoint cycle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To adapt to the new architecture, I've made a compromise here. Serialize the WriteResult sent by the StreamWriter into a Manifest file as CommT and deserialize the file in the Committer.
b33aeb9
to
a649e82
Compare
ec16a9e
to
b1e694a
Compare
cbbc841
to
2e56954
Compare
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Co-authored-by: Kyle Bendickson kjbendickson@gmail.com
What is the purpose of the change
There is a preliminary proposal:
https://docs.google.com/document/d/1G4O6JidAoKgbIdy8Ts73OfG_KBEMpsW-LkXIb89I5k8/edit?usp=sharing
Motivation & Structure
In Flink 1.12 the community focused on implementing a unified Data Sink API (FLIP-143). The new abstraction introduces a write/commit protocol and a more modular interface where the individual components are transparently exposed to the framework.
A Sink implementor will have to provide the what and how: a SinkWriter that writes data and outputs what needs to be committed (i.e. committables); and a Committer and GlobalCommitter that encapsulate how to handle the committables. The framework is responsible for the when and where: at what time and on which machine or process to commit.
GlobalCommitter can complete the action of submitting snapshots in Iceberg.
There is a QA related to Iceberg in its design documentation.
The current Writing process of Iceberg Flink Sink is as follows:
In the figure above, data is written by multiple write operators, and then snapshot is submitted by an operator with 1 parallelism. Finally, an empty Sink node is added to end the whole process.

Based on the FLIP 143, the above flow can be modified to:
The Sink node writes data, and the GlobalCommiter operator commits the snapshot.
There is an unsolved problem in the above structure. When streaming data is written, Sink will generate a large number of small files, which accumulate over a long period of time and eventually lead to problems such as excessive system pressure and decreased reading performance. This problem not only exists in Iceberg, but also in file storage.
Flink tried to solve this problem with FLIP-191.
The goal of this document is to extend the unified Sink API to broaden the spectrum of supported scenarios and fix the small-file-compaction problem.
To overcome the limitations of Sink V1 Flink offer different hooks to insert custom topologies into the sink.
On the basis of FLIP-191, the structure of Iceberg Flink Sink will be similar to the above, except that it only uses one committer operator, that is, all the output of Sink should be written to one committer, and all the submission actions should be completed by it.
PostCommit gives us a portal to do things like asynchronous small file merges and index generation, where we can do some optimization without affecting stream data writing.
The Advantage
Provides interfaces for small file merging and index
The structure is clearer, and fault tolerance is guaranteed by Flink itself
Proposed Changes
The data writing action is converted from the transform operator to Sink
Establish a separate Commiter operator to commit the snapshot
Iceberg currently maintains three versions of Flink 1.13, 1.14 and 1.15, while Flip-191 was introduced in Flink 1.15, so we plan to complete the versions containing these two flips in Flink 1.15 first. The one containing only FLIP-141 will be migrated to 1.13 1.14
immovable
The logic for writing data to a file
This proposal only modifies the timing&operator of data writing, it does not change the logic of writing
committer logic
Same with above, we don't change the action of commit, just put it into another operator.
Compatibility, Deprecation, and Migration Plan
Java API
Deprecate old sink, and keep them at least until the next major version of Iceberg.
SQL API
Use new sink instead of the old one.
To do
Small file merge
Index file generation
close #5119