Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Add the iceberg files committer to collect data files and commit to iceberg table. #1185

Merged
merged 30 commits into from
Aug 28, 2020
Merged

Conversation

openinx
Copy link
Member

@openinx openinx commented Jul 9, 2020

As we discussed in the pull request #856, the flink sink connector actually are composed by two stateful flink operator:

  1. FlinkStreamWriter is used for receiving the records and writing them to DataFile(s), when a checkpoint happen then it will emit the completed DataFile to downstream operator.
  2. FlinkFilesCommitter: it will collect the DataFile from FlinkStreamWriter and commit the files to iceberg in a transaction, it's a single parallelism operator because we don't expect transaction conflict happen when transaction committing.

This patch mainly implemented the FlinkFilesCommitter and provided several end to end integrate tests to address the correctness. the dependencies of this one are: #1180, #1145, #1175

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @openinx for the contribution.

I think we should consider a situation: Multiple jobs write a table at the same time; or kill the old job and restart a new stream job to write the table (this is more common, checkpointId will restart from 1).

So in this situation, we must do something like:

String snapshotQueryId = summary.get(QUERY_ID_PROPERTY);

Before we obtain max_checkpoint from snapshot, we should make sure the snapshot is written by this job.


operator.addSink(filesCommitter)
.name(ICEBERG_FILES_COMMITTER)
.uid(UUID.randomUUID().toString())
Copy link
Contributor

@kbendick kbendick Jul 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the uid to be a randomly generated number is not a good practice. Operator uid's should be stable, such that between intentional redeployments or simply on restarts due to lost task managers, Flink will know which operators each state belongs to.

If the uid is changed between restarts, state could be lost.

You can find more information about the use of operator uid's with flink's savepoints (intentional restarts) and why stateful operators should be assigned a stable uid here: https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#matching-operator-state

Copy link
Contributor

@kbendick kbendick Jul 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the time, I simply assign the name as the uid. In Scala, I use an implicit class to assign name and uid at the same time with the same value. This assumes that the name of an operator is unique (which might not always be true, and is something that should be considered as a library author), but so far I've not run into issues with the handful of jobs that I oversee.

@kbendick
Copy link
Contributor

Given that #1180 and #1175 have been merged, would it be possible for you to merge those commits into this branch to make it easier to review? Or does the dependency on #1145 make that somewhat burdensome?

Not a major issue, but it would help me in being able to just review / look through the code that's being proposed for addition and not the code that has already been merged in.

If not, no worries. Appreciate all of the work it seems you've put into getting Flink going with Iceberg @openinx. I can see you've submitted quite a number of PRs on the matter so I'm especially looking to learn from you by reading and reviewing these PRs 👍

@rdblue
Copy link
Contributor

rdblue commented Aug 7, 2020

@openinx, can you rebase this now that #1145 has been merged?

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
table = TableUtil.findTable(path, conf.get());
maxCommittedCheckpointId = parseMaxCommittedCheckpointId(table.currentSnapshot());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method parseMaxCommittedCheckpointId should be invoked when this job is stored.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. here we could only initialize the max-checkpoint-id to -1, and could not read the checkpoint id from iceberg table if not in a restore job. Because we may have a table with 3 checkpoints and its max-committed-checkpoint id is 3, then we stop the current flink job and start another flink job to continue to write this table, its checkpoint id will start from 1, if we read max-commited-checkpoint id here then we will miss the first three checkpoint's data files in current flink job.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any idea about if the snapshots were removed by another process, and this process commit a new snapshot to iceberg table, the flink committer will not get the correct maxCommittedCheckpointId .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That means someone have expired the latest committed snapshot for a running flink job, usually we flink checkpoint interval will be several minutes, while the snapshot expire interval will be several days or weeks. It's unlikely that we will expire the latest committed snapshot unless someone set the unreasonable intervals.

On the other hand, even if someone removed the latest committed snapshot, the flink job will write the data to iceberg table correctly unless we restore the flink job once snapshot was removed.

@JingsongLi
Copy link
Contributor

JingsongLi commented Aug 13, 2020

@openinx and @rdblue I think we can consider #1332 first, what do you think?


// State for all checkpoints;
private static final ListStateDescriptor<byte[]> STATE_DESCRIPTOR =
new ListStateDescriptor<>("checkpoints-state", BytePrimitiveArraySerializer.INSTANCE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use Flink MapTypeInfo, and addAll to sorted map?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good, I've implemented it in this patch: 15dd8e0


@Override
public void notifyCheckpointComplete(long checkpointId) {
NavigableMap<Long, List<DataFile>> pendingFileMap = dataFilesPerCheckpoint.tailMap(maxCommittedCheckpointId, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why exclude maxCommittedCheckpointId and checkpointId? Just dataFilesPerCheckpoint.tailMap(checkpointId, true)? I don't understand why we need store previous checkpoint id.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added few comments about those members here 6b43ec5.

Why exclude maxCommittedCheckpointId and checkpointId?

The maxCommittedCheckPointId 's data files have been committed to iceberg, so no need to commit them again, otherwise the table will have redundant datas. For current checkpointId, we've flushed the dataFilesOfCurrentCheckpoint into dataFilesPerCheckpoint in snapshotState method, so we did not exclude the files for current checkpointId.

Copy link
Contributor

@JingsongLi JingsongLi Aug 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the current code, the data of the current checkpoint needs to be committed in the next cycle.
This should not be the ideal way, which can lead to excessive delay.

if (maxCommittedCheckpointId >= checkpointId) {
  return;
}
Map<Long, List<DataFile>> pendingFileMap = dataFilesPerCheckpoint.headMap(checkpointId, true);
List<DataFile> pendingDataFiles = Lists.newArrayList();
for (List<DataFile> dataFiles : pendingFileMap.values()) {
      pendingDataFiles.addAll(dataFiles);
}
-- Do AppendFiles committing
maxCommittedCheckpointId = checkpointId;
pendingFileMap.clear();

This is the process I imagined.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the data of the current checkpoint needs to be committed in the next cycle.

That's not correct. Since we've merged the dataFilesOfCurrentCheckpoint into dataFilesPerCheckpoint, so the dataFilesPerCheckpoint.tailMap(maxCommittedCheckpointId, false); will contains all the uncommitted data files, include the files from current checkpoint.

About your described process:

Map<Long, List<DataFile>> pendingFileMap = dataFilesPerCheckpoint.tailMap(checkpointId, true);

This seems won't commit all the previous files which have committed with failure . How about the data files between (maxCommittedCheckpointId, checkpointId) ?

In my thought, the current process should be correct. If there're some places I misunderstood, pls correct me,thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, wrong reading. Should beheadMap instead of tailMap. You can think again about my above example.

I got your point about tailMap. But it is incorrect too. Consider the case:

  • 1.snapshot(cpId)
  • 2.snapshot(cpId + 1)
  • 3.cpId success, notifyComplete(cpId)
  • 4.cpId + 1 success, notifyComplete(cpId + 1)

If there is a failure between 3 and 4, the data in table will be incorrect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great case (which I did not consider) to proof your point. Yes, we need the headMap(checkpointId, true) to avoid future data files being committed to iceberg table, I've fixed this point in 598f235.

@openinx
Copy link
Member Author

openinx commented Aug 14, 2020

Let me address above comments and fix the failed unit test.

@openinx openinx marked this pull request as draft August 14, 2020 03:49
@openinx openinx marked this pull request as ready for review August 17, 2020 01:55
@openinx
Copy link
Member Author

openinx commented Aug 19, 2020

For this pull request, there are several issue I need to explain here:

  1. @JingsongLi suggest to consider the two cases: 1. multiple flink jobs are writing the same table; 2. restart with a new job and continue to write the same time. For the former case, we will need a global id, such as job id or application id, to identify the max checkpoint id we've committed for a given job. For example, we have two flink job: job1 and job2:
    a. job1 commit the iceberg table with maxCheckpointId=1 ;
    b. the second job2 commit the iceberg table with maxCheckpointId=5;
    c. job1 commit the iceberg table with maxCheckpointId=2;
    d. the job2 start to commit again, it need to find the maxCheckpointId corresponding to job2, which is 5 now. Then it need to rewrite its maxCheckpointId = 6 and commit the txn.

The global id of job will also help to resolve the restart issue2, because we will know that the newly started job is starting from checkpoint=1.

  1. the uid of operator (from @kbendick) issue, I read the document in flink. Indeed, it's used for state recover and need to be unique across jobs.

I've address this two issue in the new patch , also attached the unit tests.

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, left some comments.

@@ -113,7 +114,7 @@ protected PartitionKey partition(RowData row) {
}
}

private static class FlinkFileAppenderFactory implements FileAppenderFactory<RowData> {
private static class FlinkFileAppenderFactory implements FileAppenderFactory<RowData>, Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add parquet support, and also add tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to wait until the parquet writer is available. #1272. Now only the parquet readers get merged.

@@ -113,7 +114,7 @@ protected PartitionKey partition(RowData row) {
}
}

private static class FlinkFileAppenderFactory implements FileAppenderFactory<RowData> {
private static class FlinkFileAppenderFactory implements FileAppenderFactory<RowData>, Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can let FileAppenderFactory extends Serializable

@@ -27,7 +28,7 @@
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionManager;

public class OutputFileFactory {
public class OutputFileFactory implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the IcebergStreamWriter will need the RowDataTaskWriterFactory to create TaskWriter, it could pass it to TaskWriterFactory when constructing, that means all members inside RowDataTaskWriterFactory should be serializable (include itself), unfortunately OutputFileFactory is one of its member.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outputFileFactory should be transient in RowDataTaskWriterFactory, it is inited in initialize.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, it make sense.

private IcebergSinkUtil() {
}

@SuppressWarnings("unchecked")
static DataStreamSink<RowData> write(DataStream<RowData> inputStream,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a user API, I think a class FlinkSink (Instead of IcebergSink?) is more suitable.

And do you think we can let it be a builder kind class?

Configuration conf,
String fullTableName,
Table table,
TableSchema requestedSchema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think it is just flinkSchema.

// checkpoint id should be positive. If it's not positive, that means someone might have removed or expired the
// iceberg snapshot, in that case we should throw an exception in case of committing duplicated data files into
// the iceberg table.
Preconditions.checkArgument(maxCommittedCheckpointId > 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe should be checkState.


private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class);
private static final String GLOBAL_FILES_COMMITTER_UID = "flink.files-committer.uid";
private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed.checkpoint.id";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkpoint-id?

private static final long INITIAL_CHECKPOINT_ID = -1L;

private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class);
private static final String GLOBAL_FILES_COMMITTER_UID = "flink.files-committer.uid";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uid -> job-id?

.setParallelism(1)
.setMaxParallelism(1);

return returnStream.addSink(new DiscardingSink()).setParallelism(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please name the sink.

String fullTableName,
Table table,
TableSchema requestedSchema) {
IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, requestedSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just notice IcebergStreamWriter does not set chaining strategy. Should add in the constructor of IcebergStreamWriter: setChainingStrategy(ChainingStrategy.ALWAYS);

@rdblue
Copy link
Contributor

rdblue commented Aug 20, 2020

I think this one is ready to review and commit, right?

@openinx
Copy link
Member Author

openinx commented Aug 21, 2020

I think this one is ready to review and commit, right?

That's right. ( After this DataStream sink, the next patch is #1348).

@openinx
Copy link
Member Author

openinx commented Aug 21, 2020

Rebased the master and did the things btw:

  1. add ORC as a case in the unit tests;
  2. Since we've introduced the CatalogLoader and TableLoader, then we could just use it inside IcebergFilesCommitter to load iceberg table lazily, that looks more reasonable.

@JingsongLi
Copy link
Contributor

Hi @openinx , for the API level, I think we can do more:

  • We can provide two API, one for RowData, one for Row, This can cover not only most users of DataStream, but also the requirements of SQL layer.
  • Users can just construct a sink from DataStream and TableLoader, other information can be inferred.

Just like:

public class FlinkSink {
  private FlinkSink() {
  }

  public static Builder<Row> forRow(DataStream<Row> input) {
    return new Builder<>(input);
  }

  public static Builder<RowData> forRowData(DataStream<RowData> input) {
    return new Builder<>(input);
  }

  private static class Builder<T> {
    private final DataStream<T> input;
    private TableLoader loader;
    private Configuration hadoopConf;
    private Table table;
    private TableSchema tableSchema;

    // ---------- Required options ------------

    private Builder(DataStream<T> input) {
      this.input = input;
    }

    public Builder tableLoader(TableLoader loader) {
      this.loader = loader;
      return this;
    }

    // ---------- Optional options ------------

    public Builder table(Table newTable) {
      this.table = newTable;
      return this;
    }

    public Builder hadoopConf(Configuration newConf) {
      this.hadoopConf = newConf;
      return this;
    }

    public Builder tableSchema(TableSchema newSchema) {
      this.tableSchema = newSchema;
      return this;
    }

    @SuppressWarnings("unchecked")
    public DataStreamSink<RowData> build() {
      Preconditions.checkNotNull(input, "Input data stream shouldn't be null");
      Preconditions.checkNotNull(loader, "Table loader shouldn't be null");

      if (hadoopConf == null) {
        // load cluster conf
      }

      if (table == null) {
        // load table from table loader
      }

      // tableSchema can be optional

      DataStream<RowData> inputStream;
      Class<T> inputClass = input.getType().getTypeClass();
      if (inputClass == Row.class) {
        DataType type;
        if (tableSchema != null) {
          type = tableSchema.toRowDataType();
        } else {
          type = TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(table.schema()));
        }
        DataStructureConverter converter = DataStructureConverters.getConverter(type);
        inputStream = input.map((MapFunction) converter::toInternal);
      } else if (inputClass == RowData.class) {
        inputStream = (DataStream<RowData>) input;
      } else {
        throw new IllegalArgumentException("Should be Row or RowData");
      }

      // create writer form inputStream.
      // create committer.
      // return DataStreamSink.
      return null;
    }
  }
}

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me overall, left some minor comments.


public class FlinkSink {

private static final TypeInformation<DataFile> DATA_FILE_TYPE_INFO = TypeInformation.of(DataFile.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: I think these fields not have to be static.

return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo);
}

static Long getMaxCommittedCheckpointId(Table table, String flinkJobId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the default value of maxCommittedCheckpointId can be null instead of INITIAL_CHECKPOINT_ID.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, initializing the maxCommittedCheckpointId to -1 seems simpler to handle, because when calling SortedMap#tailMap or SortedMap#headMap we don't need to check nullable. The comparasion in notifyCheckpointComplete is the similar point.

IcebergFilesCommitter(TableLoader tableLoader, Configuration hadoopConf) {
this.tableLoader = tableLoader;
this.hadoopConf = new SerializableConfiguration(hadoopConf);
setChainingStrategy(ChainingStrategy.ALWAYS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can let committer be a single node.

flinkJobId = getContainingTask().getEnvironment().getJobID().toString();

// Open the table loader and load the table.
tableLoader.open(hadoopConf.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to close this tableLoader in the dispose.

@openinx
Copy link
Member Author

openinx commented Aug 24, 2020

Ping @rdblue , we ( JingsongLi and I ) have reached a basic agreement about this patch. Would you mind to take a look when you have time ? Thanks.

@openinx
Copy link
Member Author

openinx commented Aug 25, 2020

Since both parquet reader and writer have been merged, so let me update this patch to add parquet into unit tests.

"Should initialize input DataStream first with DataStream<Row> or DataStream<RowData>");
Preconditions.checkArgument(rowInput == null || rowDataInput == null,
"Could only initialize input DataStream with either DataStream<Row> or DataStream<RowData>");
Preconditions.checkNotNull(table, "Table shouldn't be null");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we already have TableLoader, will it be easier for user if we load table here ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The table is used to initialize IcebergStreamWriter at client side, while the tableLoader is used to load table at TaskManager side, using the tableLoader for TaskManager to load table for client side looks strange to me.

// <2, <file3>>. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect
// any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit
// iceberg table when the next checkpoint happen.
private final NavigableMap<Long, List<DataFile>> dataFilesPerCheckpoint = Maps.newTreeMap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the Netflix version, we track old checkpoint state by writing a manifest file and appending that manifest to the table. The advantage is that state is really small so the job can be blocked from committing for a long time (days) and can recover when connectivity to commit has been restored.

We use this to handle resilience when AWS regions can't talk to one another and our metastore is in a single region.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for this approach. There may be thousands of datafiles in a checkpoint for partitioned table.

  • In snapshotState, serialize files to Flink state VS. serialize files to manifest file, they should cost about the same.
  • In notifyCheckpointComplete, just commit a manifest file has better performance instead of commit data files.
  • Can reduce the size of States, which are actually put into the memory of the job manager.

Although it is rare that checkpoint fails for a long time, it can improve the robustness of the system.

But, this approach does lead to more code complexity.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I saw that it will archive all the finished data files into a manifest file and maintain only the manifest files in state backend. (https://github.com/apache/iceberg/pull/856/files#diff-0245903e25b2cace005c52b2c7b130bbR372) . For my understanding, that's a great point to handle the case that we fail to commit to iceberg table frequently (because we'v buffered all the data files in the manifest files rather than flink state backend), but I'm no sure whether it is a common case for most of users, the commit failure should be easily found if they separated the data files regions from the metastore region, others also use this way to maitain their data and meta ? or their states are enough to maintain those uncommitted data files ?

If the commit failure is a common case, I'd happy to contribute to make those data files buffered in a manifest file. For now, I think we can make the simple state way go firstly. Does that make sense ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I completely agree that we should not block adding the sink until this is done. I just wanted to note that this is a good place to put some extra effort. I think that multi-region deployments are common and this can really help. We don't keep Kafka data for a long time, so it is important to us that Flink continues to consume Kafka data to eventually commit to the table, even if it can't commit the manifests right away.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@stevenzwu stevenzwu Aug 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to provide more context on why we need it. Our data warehouses (metastore) only lives in us-east-1 region, while Flink streaming jobs can run in 3 regions (us-east-1, us-west-2, and eu-west-1). Transient commits failures happen from time to time, which aren't big concerns. We are more concerned about extended outages (like a day) for whatever reason (us-east-1 outage, cross-region network issue, metastore service outage). For high-parallelism or event time partitioned tables, there could be thousands or tens of thousands of files per checkpoint interval. This manifest approach allows the Flink jobs to handle those extended outages better.

Flink operator list state can't handle large state well. I vaguely remember limit is 1 or 2 GBs. And it can get pretty slow when the list is large.

Copy link
Member Author

@openinx openinx Aug 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The manifest approach sounds a good improvement to me, it's a todo issue in the next improvement #1403.

flinkJobId = getContainingTask().getEnvironment().getJobID().toString();

// Open the table loader and load the table.
tableLoader.open(hadoopConf.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, looks like this is done in dispose() -- here's a good example of where prefixing with this. would be more clear.

// Open the table loader and load the table.
tableLoader.open(hadoopConf.get());
table = tableLoader.loadTable();
maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: I prefer to prefix field names with this. when setting instance fields, so it is obvious that it is not a local variable.

@rdblue
Copy link
Contributor

rdblue commented Aug 28, 2020

Overall, I think this is ready to commit. I'd prefer to remove Serializable from the appender factory interface first, though. And it would be nice to get some of the other comments fixed or clarified (some are questions since I'm learning about how Flink works). Thanks @openinx!


FlinkSink.forRowData(dataStream)
.table(table)
.tableLoader(tableLoader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: it seems odd to provide the loader and the table. Couldn't the loader be called in the sink builder?

@rdblue rdblue merged commit f950a3e into apache:master Aug 28, 2020
@rdblue
Copy link
Contributor

rdblue commented Aug 28, 2020

Merged. Thanks @openinx, it is great to have this in. And thanks for helping review, @JingsongLi and @kbendick!

@rdblue
Copy link
Contributor

rdblue commented Aug 28, 2020

What are the next steps for Flink writes? I think we can probably start writing some documentation. The tests for the sink look like this is mostly complete. Maybe docs are an area that @kbendick could help with?

IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, hadoopConf);

DataStream<Void> returnStream = rowDataInput
.transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(DataFile.class), streamWriter)
Copy link
Contributor

@stevenzwu stevenzwu Aug 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there can be multiple Iceberg sinks in the same job. we probably should add the table identifier string suffix to make operator name and id unique. We have a unique sinkName within a job and we add the sinkName suffix for operator name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me, it's necessary to support multiple iceberg sinks in the same job, I will open an issue and provide a patch with unit tests to address this thing.

DataStream<Void> returnStream = rowDataInput
.transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(DataFile.class), streamWriter)
.setParallelism(rowDataInput.getParallelism())
.transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

committer is a stateful operator, we should probably explicitly set uid.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind. saw earlier comment that this is a todo item.

.setParallelism(1)
.setMaxParallelism(1);

return returnStream.addSink(new DiscardingSink())
Copy link
Contributor

@stevenzwu stevenzwu Aug 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious about the reason why don't we make the committer a sink function and instead add a dummy DiscardingSink.

Conceptually, this writer-committer combo is the reverse/mirror of split enumerator-reader FLIP-27 source interface. It will be nice to run committer on jobmanager (similar to enumerator). This way, Iceberg sink won't change the nature of the embarrassingly-parallel DAG.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would answer your question about why did we use an operator rather than sink function.

Copy link
Contributor

@stevenzwu stevenzwu Aug 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx thx. that does answer my question.

Still adding a DiscardingSink may confuse users. It seems that we really need a unified/improved sink interface (similar to FLIP-27) to supported bounded input jobs in sink function.

What about any other Flink sinks used by bounded streaming/batch job? Do they all have to go through this model?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There're three cases: 1> unbounded streaming job; 2> bounded streaming job; 3> batch job. If users only need the unbounded streaming ability, then only need to implement the SinkFunction, otherwise if need both unbounded streaming and bounded streaming ability, then we need to extend/implement the AbstractStreamOperator & BoundedOneInput and add the DiscardingSink to the tail. If want to batch ability, then need to provide an OutputFormat implementation. In future flink, we will unify the case#2 and case#3 in one sink interface, but for now we have to implement separately for the bounded streaming and batch cases.

Flink hive connector is a good case, which have support case1, case2 and case3. It also use the similar way to the current iceberg sink connector now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough. we can't solve this problem until Flink improved sink interface.


DataStream<Void> returnStream = rowDataInput
.transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(DataFile.class), streamWriter)
.setParallelism(rowDataInput.getParallelism())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good default value for writer parallelism. we have users who want to explicitly control the writer parallelism to control the number of written files. in the future, we may want to allow user to set parallelism in the builder.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me, could be a following issue.

return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo);
}

static long getMaxCommittedCheckpointId(Table table, String flinkJobId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice. this is the reverse iteration I was looking for. we were just walking through the table.snapshots() iterable.

}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if Flink guarantees the serialized execution for notifyCheckpointComplete?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the current implementation, it should be, because task checkpoints are ordered and RPC messages are ordered. But it's better not to rely on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx. should we raise a jira to track the task?

@JingsongLi you are saying that notifyCheckpointComplete and snapshotState are serialized by the same lock/mutex, right? Otherwise, I can see problem with the concurrent checkpoint handling.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. You don't need care about thread safety.


this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
if (context.isRestored()) {
this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, flinkJobId);
Copy link
Contributor

@stevenzwu stevenzwu Aug 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might have a problem if redeploying the Flink job from external checkpoint. It is a new flinkJobId in this case. maxCommittedCheckpointId will be -1. As a result, we can commit those committed files again later.

The way we do de-dup is to generate a hash for the manifest file path and store the hash in snapshot summary. During restore, we use the hash to de-dup if the manifest file was committed or not.

        List<String> hashes = new ArrayList<>(flinkManifestFiles.size());
        AppendFiles appendFiles = transaction.newAppend();
        for (FlinkManifestFile flinkManifestFile : flinkManifestFiles) {
          appendFiles.appendManifest(flinkManifestFile);
          hashes.add(flinkManifestFile.hash());
        }
        appendFiles.set(
            COMMIT_MANIFEST_HASHES_KEY, FlinkManifestFileUtil.hashesListToString(hashes));

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a great point that we could handle, I think we can attach both the flinkJobId and maxCommittedCheckPointId to the checkpoint state. when in restoring path, we read the flinkJobId and maxCommittedCheckpointId from states, and compare the current flink job id with flinkJobId from state. If the job ids are matched, then it is surely be the case that restoring without redploying(case#1), otherwise it's the case you said (case#2).

For case#1, the current code should be correct.
For case#2, we should use the old flinkJobId from state to parse the maxCommittedCheckpointId in iceberg table, and use that checkpoint id to filter all the committed data files.

Does that make sense ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the proposed solution. it should work.

@stevenzwu
Copy link
Contributor

nit: on class names

IcebergStreamWriter -> IcebergWriter, because it also BoundedOneInput interface, which seems like a batch job.

IcebergFilesCommitter -> IcebergCommitter. Iceberg only commits files. So including Files seems redundant.


SortedMap<Long, List<DataFile>> restoredDataFiles = checkpointsState.get().iterator().next();
// Only keep the uncommitted data files in the cache.
this.dataFilesPerCheckpoint.putAll(restoredDataFiles.tailMap(maxCommittedCheckpointId + 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our implementation, we immediately commit any uncommitted files upon restore to avoid waiting for another checkpoint cycle.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Committing those uncommitted data files here immediately sounds good to me, it should won't impact the correctness here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considered this issue again, if redeploying flink job from external checkpoint, then all data files from restoredDataFiles are from the old flink job, which means its checkpoint id could be range from 1~N. If we put all those files into dataFilesPerCheckpoint, that will introduce the problem: it did not align with the checkpoint id with the new flink job, saying when notifyCheckpointComplete(1) for the new flink job, it won't commit all the old data files with checkpointId > 1 to iceberg table. That's incorrect.

So it should commit those remaining uncommitted data files to iceberg immediately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx if the new job is redeployed from checkpoint N taken by the old job, checkpointId will start from N+1 for the new job. that is my observation of Flink behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK , checked the flink savepoint code here, it was designed as you said. https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1371 .

Thanks for the confirmation, then I think committing those files in next checkpoint cycle should also be OK, but there's no harmness for us to commit them immediately.

}
}

private void commitUpToCheckpoint(long checkpointId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to point out that this will commit if there is zero pending files. For us, we actually still want to commit in this case mainly to update the region watermark info.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be similar to this point from this comment, we actually are doing the logic you said and provided a unit test to address this thing.


@Override
public void processElement(StreamRecord<DataFile> element) {
this.dataFilesOfCurrentCheckpoint.add(element.getValue());
Copy link
Contributor

@stevenzwu stevenzwu Aug 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this probably gets complicated when we allow concurrent checkpoints. Committer can receive files from both checkpoint N and N+1. We probably need add the checkpointId in the DataFile. It might make sense to provide FlinkDataFile wrapper so that we can add Flink additional metadata.

We have FlinkDataFile in our implementation for transmitting low and high timestamps. Now thinking about this issue. Maybe we can include checkpointId too so that committer can distinguish data files from different checkpoints.

public class FlinkDataFile implements Serializable {
  private final long lowWatermark;
  private final long highWatermark;
  private final DataFile dataFile;

This does imposes additional requirement on the writer. It needs to know the last/next checkpointId.

  • for job started without checkpoint, the last checkpointId is 0
  • for job started with checkpoint, now IcebergWriter needs to know the last checkpointId retored.

I couldn't find the checkpointId from the restored context in the initializeState(context) method for either AbstractStreamOperator or RichSinkFunction. It will be nice if it can be exposed.

Alternatively, we can store the nextCheckpointId in the operator state. However, they also have some problems.

  • operator list state can't deal with rescale as new subtasks won't get state
  • operator union list state is not scalable. Kafka source is suffering the scalability issue with union state.

Note that we may flush file before checkpoint barrier comes. We have two use cases to need the rollover by time and file size.

  • As I mentioned in another comment, we have Flink streaming jobs running in 3 regions in AWS and data warehouses live only in us-east-1. There is a backend service monitor files in two other remote regions and lift/copy them back to the us-east-1 home region. S3 cross-region file copy has 5 GB limit. For that reason, we flushes files if the size reaches 4 GB.
  • When we implement rough ordering for Iceberg source, we need event time alignment. In these cases, we use Kafka broker time as event time since our Iceberg source tries to emulate the rough ordering for Kafka source. One possible solution is to roll over the file when the min and max timestamp reached certain threshold (e.g. 5 minutes).

@openinx openinx deleted the flink-committer branch September 2, 2020 03:35
@tangchenyang
Copy link

As a user, how can i use flink write my DataStream to iceberg.
Is there an example or API document or user document for this feature?

@openinx
Copy link
Member Author

openinx commented Sep 8, 2020

Hi @tangchenyang The unit test TestFlinkIcebergSink provide a complete demo to show you how to write records into iceberg table by flink. I'm writing a document to show users how to write data into iceberg by both table sql and datastream api, but not finished yet.
#1423

@tangchenyang
Copy link

Greate! Many thanks. @openinx

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants