Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: new sink base on the unified sink API #4904

Closed
wants to merge 2 commits into from

Conversation

hililiwei
Copy link
Contributor

@hililiwei hililiwei commented May 30, 2022

Co-authored-by: Kyle Bendickson kjbendickson@gmail.com

What is the purpose of the change

There is a preliminary proposal:
https://docs.google.com/document/d/1G4O6JidAoKgbIdy8Ts73OfG_KBEMpsW-LkXIb89I5k8/edit?usp=sharing

Motivation & Structure

In Flink 1.12 the community focused on implementing a unified Data Sink API (FLIP-143). The new abstraction introduces a write/commit protocol and a more modular interface where the individual components are transparently exposed to the framework.
A Sink implementor will have to provide the what and how: a SinkWriter that writes data and outputs what needs to be committed (i.e. committables); and a Committer and GlobalCommitter that encapsulate how to handle the committables. The framework is responsible for the when and where: at what time and on which machine or process to commit.

image

GlobalCommitter can complete the action of submitting snapshots in Iceberg.
There is a QA related to Iceberg in its design documentation.

Is the sink an operator or a topology?
From the discussion in the long run we should give the sink developer the ability of building “arbitrary” topologies. But for Flink-1.12 we should be more focused on only satisfying the S3/HDFS/Iceberg sink.

The current Writing process of Iceberg Flink Sink is as follows:

image

In the figure above, data is written by multiple write operators, and then snapshot is submitted by an operator with 1 parallelism. Finally, an empty Sink node is added to end the whole process.
Based on the FLIP 143, the above flow can be modified to:
image

The Sink node writes data, and the GlobalCommiter operator commits the snapshot.

There is an unsolved problem in the above structure. When streaming data is written, Sink will generate a large number of small files, which accumulate over a long period of time and eventually lead to problems such as excessive system pressure and decreased reading performance. This problem not only exists in Iceberg, but also in file storage.

Flink tried to solve this problem with FLIP-191.

Motivation
With FLIP-143 we introduced the unified Sink API to make it easier for connector developers to support batch and streaming scenarios. While developing the unified Sink API it was already noted that the unified Sink API might not be flexible enough to support all scenarios from the beginning.
With this document, we mainly focus on the small-file-compaction problem which is one of the not yet covered scenarios. The problem manifests when using the FileSink and you want to guarantee a certain file size and not end up with a lot of small files (i.e. one file per parallel subtask). It becomes very important for modern data lake use cases where usually the data is written in some kind of columnar format (Parquet or ORC) and larger files are recommended to amplify the read performance. A few examples are the Iceberg sink [1], Delta lake [2], or Hive. For Hive in particular we already have a sink that supports compaction but it is not generally applicable and only available in Flink’s Table API [3].

The goal of this document is to extend the unified Sink API to broaden the spectrum of supported scenarios and fix the small-file-compaction problem.
To overcome the limitations of Sink V1 Flink offer different hooks to insert custom topologies into the sink.

image

On the basis of FLIP-191, the structure of Iceberg Flink Sink will be similar to the above, except that it only uses one committer operator, that is, all the output of Sink should be written to one committer, and all the submission actions should be completed by it.

image

PostCommit gives us a portal to do things like asynchronous small file merges and index generation, where we can do some optimization without affecting stream data writing.

The Advantage

Provides interfaces for small file merging and index
The structure is clearer, and fault tolerance is guaranteed by Flink itself

Proposed Changes

The data writing action is converted from the transform operator to Sink
Establish a separate Commiter operator to commit the snapshot

Iceberg currently maintains three versions of Flink 1.13, 1.14 and 1.15, while Flip-191 was introduced in Flink 1.15, so we plan to complete the versions containing these two flips in Flink 1.15 first. The one containing only FLIP-141 will be migrated to 1.13 1.14

immovable

The logic for writing data to a file
This proposal only modifies the timing&operator of data writing, it does not change the logic of writing

committer logic
Same with above, we don't change the action of commit, just put it into another operator.

Compatibility, Deprecation, and Migration Plan

Java API
Deprecate old sink, and keep them at least until the next major version of Iceberg.
SQL API
Use new sink instead of the old one.

To do

Small file merge
Index file generation

close #5119

@github-actions github-actions bot added the flink label May 30, 2022
@hililiwei hililiwei force-pushed the new-sink branch 6 times, most recently from 62069cc to dcc635e Compare May 31, 2022 04:57
@hililiwei
Copy link
Contributor Author

hililiwei commented May 31, 2022

cc @openinx @chenjunjiedada @stevenzwu @kbendick , PTAL , thx.

@chenjunjiedada
Copy link
Collaborator

Thanks for ping me, I will take a look this week.

throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e);
}
} else {
this.table = newTable;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the table loader can not be null, why do we need to pass in the table and not just call loadTable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Providing a table would avoid so many table loading from each separate task.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by each separated task? We usually have only one sink in the Flink job. Or you mean the table is loaded previously and here can be skipped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll have multiple sinks with multiple writes and globally one committer.

@hililiwei hililiwei force-pushed the new-sink branch 2 times, most recently from ece1b24 to 3595ba6 Compare June 29, 2022 01:48
@hililiwei hililiwei force-pushed the new-sink branch 3 times, most recently from 905774a to 8258d07 Compare August 15, 2022 10:56

this.taskWriterFactory = taskWriterFactory;
// Initialize the task writer factory.
taskWriterFactory.initialize(numberOfParallelSubtasks, subTaskId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not correct.

  void initialize(int taskId, int attemptId);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. Honestly, this is one of the things that puzzled me. So let's talk about it.
There is only one implementation of initialize:

  @Override
  public void initialize(int taskId, int attemptId) {
    this.outputFileFactory =
        OutputFileFactory.builderFor(table, taskId, attemptId).format(format).build();
  }

And the signature of OutputFileFactory.builderFor goes like this:

  public static Builder builderFor(Table table, int partitionId, long taskId) {
    return new Builder(table, partitionId, taskId);
  }

so, in the initialize method, the pass-through relationship is as follows:

initialize:taskId -> builderFor.partitionId
initialize:attemptId -> builderFor.taskId

It makes me feel a little confused.

I'm trying to reestablish this relationship here:

numberOfParallelSubtasks -> builderFor.partitionId
subTaskId -> builderFor.taskId

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like Ryan answered it here: #5586 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy that. Let me fix it.

}
})
.uid(uidPrefix + "-pre-commit-topology")
.global();
Copy link
Contributor

@stevenzwu stevenzwu Aug 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a fan of using global partitioner to achieve effectively global committer behavior with parallel downstream committer operators. if we only need a single-parallelism committer, why don't we make it explicitly so. having parallel committers tasks and only route committables to task 0 is weird. It can also lead to more questions from users why other committer subtasks have zero input. Sink V1 interface from Flink has GlobalCommitter interface, which would work well for Iceberg use case. It seems that in sink V2 has no equivalent replacement. Javadoc mentioned WithPostCommitTopology.

Are we supposed to do sth like this for global committer?

    private class GlobalCommittingSinkAdapter extends TwoPhaseCommittingSinkAdapter
            implements WithPostCommitTopology<InputT, CommT> {

        @Override
        public void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables) {

            StandardSinkTopologies.addGlobalCommitter(
                    committables,
                    GlobalCommitterAdapter::new,
                    () -> sink.getCommittableSerializer().get());
        }
    }

But they what is the purpose of the regular committer from TwoPhaseCommittingSink#createCommitter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to avoiding having subtasks that don’t process any input. Even if it’s correct, it will look strange when monitoring. Being more explicit by using the code @stevenzwu mentioned or somehow repartitioning to 1 would make the user experience better as seeing many tasks where only one gets input would be the first thing I’d start investigating upon too much backpressure etc.

Copy link
Contributor

@stevenzwu stevenzwu Aug 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hililiwei @pvary @kbendick FYI, I started a discussion thread in dev@flink. subject is Sink V2 interface replacement for GlobalCommitter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lack of GlobalCommitter in the v2 sink interface makes it not a good fit for Iceberg sink. I added a few more comment in the writer class: https://github.com/apache/iceberg/pull/4904/files#r951000248.

I think we need to address this high-level design question first with the Flink community before we can really adopt the v2 sink interface. So far, we haven't heard back anything on the email thread. We will have to figure out who can we work with from the Flink community.

cc @hililiwei @pvary @kbendick @rdblue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agreed that the lack of global committer presents a problem for us. Even the post-commit topology sounds like possibly not the right place (based on name alone even). Plus we lose access to using that for doing small file merging etc

The Flink community has a Slack now. Maybe we can go fishing for support / people to discuss with over there? I’m gonna look for the invite link but if anybody needs an invite now, I think I can do it if you send me your email.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agreed that the lack of global committer presents a problem for us. Even the post-commit topology sounds like possibly not the right place (based on name alone even). Plus we lose access to using that for doing small file merging etc

post-commit topology is one cycle behind Commiter, so it is currently fine to use it for small file merges and index generation, and we use it internally as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lack of GlobalCommitter in the v2 sink interface makes it not a good fit for Iceberg sink. I added a few more comment in the writer class: https://github.com/apache/iceberg/pull/4904/files#r951000248.

I think we need to address this high-level design question first with the Flink community before we can really adopt the v2 sink interface. So far, we haven't heard back anything on the email thread. We will have to figure out who can we work with from the Flink community.

cc @hililiwei @pvary @kbendick @rdblue

As discussed in the mail thread, the ability to customize Commiter parallelism is what we need. This capability does not seem to be available until at least >flink 1.17, and for now, there are no obvious blocking issues that require us to use the new sink right away. So it looks like we might be able to wait until flink provides this capability before going back and optimizing it.

But maybe one thing we need to consider is that the auto-compacting of small files is a good thing for the user. Maybe we can consider using it the current way (using global), because it doesn't have a problem with functionality, just commiters that won't do anything. Adaptation comes after flink provides the ability to customize Commiter parallelism.

@Override
public List<StreamWriterState> snapshotState(long checkpointId) {
List<StreamWriterState> state = Lists.newArrayList();
state.add(new StreamWriterState(Lists.newArrayList(writeResultsState)));
Copy link
Contributor

@stevenzwu stevenzwu Aug 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checkpoint individual data files as writer state can be expensive. E.g., if one checkpoint cycle produces a lot of files with many columns, the amount of metadata can be very significant.

I am also think we shouldn't checkpoint the flushed files as writer state. How does it help with fault tolerance? The current FlinkSink writer only flush the data files and emit them to the downstream committer operator.

In the current FlinkSink implementation, IcebergFilesCommitter checkpoint a staging ManifestFile object per checkpoint sync. Here we only need to checkpoint a single manifest file per cycle, which is very small in size. Upon successful commit, committer clears the committed manifest files. Upon commit failure, manifest files are accumulate and attempted for commit in the next checkpoint cycle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writeResultsState contains only the metadata of the data file. We don't really put data into the state.

If we remove the state here, this part of the data may be lost if the task fails after the file is closed and before it is sent to the committer.

The more important reason is, in the v2 Sink , CommT is transferred between the Writer and Commiter, flink automatically processes its state. If we're going to use Manifest instread of WriteResult as CommT, it means need to generate it in the every writer operator, so that it can be passed to the commiter, i think that's not reasonable. If we generate it in Commiter, as the IcebergFilesCommitter did, because the CommT still is WriteResult, the new version of Commiter doesn't put it in the state, so there is no practical point in generating it. Given the differences between the v1 and v2 versions, I currently feel it would be better to use WriterResult directly.

Copy link
Contributor

@stevenzwu stevenzwu Aug 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not saying with should use Manifest as CommT. Again, this is where I feel the v2 sink interface is not a good fit for Iceberg. In the v1 sink interface, we have GlobalCommT for GlobalCommitter. That will be the ideal interface for Iceberg sink. WriteResult would be CommT and Manifest will be GlobalCommT.

public interface GlobalCommitter<CommT, GlobalCommT>

Here is the how that Iceberg sink works today and how I hope that it should continue to work with v2 sink interface. Writers flushed the data files and send WriteResult to the global committer. The global committer checkpoints the Manifest file for fault tolerance/exactly-once.

I understand that WriteResult only contains the metadata. It can still be substantial in certain use cases. E.g.,

  • writer parallelism is 1,000
  • Each writer writes to 100 files (e.g. bucketing or hourly partition by event time)
  • table has many columns. with column-level stats (min, max, count, null etc.). it would be significant metadata bytes per data file. say there are 200 columns, and each columns stats take up 100 bytes. that is 20 KB per file.
  • each checkpoint would have size of 20 KB x 100 x 1,000. that is 2 GB.
  • if checkpoint interval is 5 minutes and commit can't succeed for 10 hours, we are talking about 120 checkpoints. Then Iceberg sink would accumulate 240 GB of state.

I know this is the worst-case scenario, but it is the motivation behind the current IcebergSink design where the global committer only checkpoint one ManifestFile per checkpoint cycle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To adapt to the new architecture, I've made a compromise here. Serialize the WriteResult sent by the StreamWriter into a Manifest file as CommT and deserialize the file in the Committer.

@hililiwei hililiwei force-pushed the new-sink branch 3 times, most recently from b33aeb9 to a649e82 Compare August 18, 2022 01:37
@hililiwei hililiwei force-pushed the new-sink branch 2 times, most recently from ec16a9e to b1e694a Compare October 10, 2022 02:27
@hililiwei hililiwei force-pushed the new-sink branch 2 times, most recently from cbbc841 to 2e56954 Compare November 3, 2022 01:17
Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Aug 11, 2024
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Aug 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Flink: FLIP-143 & FLIP-191 based Iceberg sink
5 participants