Skip to content

Commit

Permalink
Flink: Add IcebergSinkBuilder interface allowed unification of most o…
Browse files Browse the repository at this point in the history
…f operations on FlinkSink and IcebergSink Builders
  • Loading branch information
Arek Burdach committed Oct 11, 2024
1 parent 67dc9e5 commit e9df431
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand Down Expand Up @@ -67,11 +66,8 @@
import org.apache.iceberg.flink.sink.shuffle.StatisticsOrRecord;
import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
Expand Down Expand Up @@ -134,7 +130,7 @@ public static Builder forRowData(DataStream<RowData> input) {
return new Builder().forRowData(input);
}

public static class Builder {
public static class Builder implements IcebergSinkBuilder<Builder> {
private Function<String, DataStream<RowData>> inputCreator = null;
private TableLoader tableLoader;
private Table table;
Expand Down Expand Up @@ -179,6 +175,7 @@ private <T> Builder forMapperOutputType(
* @param newTable the loaded iceberg table instance.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder table(Table newTable) {
this.table = newTable;
return this;
Expand All @@ -192,6 +189,7 @@ public Builder table(Table newTable) {
* @param newTableLoader to load iceberg table inside tasks.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder tableLoader(TableLoader newTableLoader) {
this.tableLoader = newTableLoader;
return this;
Expand All @@ -210,21 +208,25 @@ public Builder set(String property, String value) {
* Set the write properties for Flink sink. View the supported properties in {@link
* FlinkWriteOptions}
*/
@Override
public Builder setAll(Map<String, String> properties) {
writeOptions.putAll(properties);
return this;
}

@Override
public Builder tableSchema(TableSchema newTableSchema) {
this.tableSchema = newTableSchema;
return this;
}

@Override
public Builder overwrite(boolean newOverwrite) {
writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite));
return this;
}

@Override
public Builder flinkConf(ReadableConfig config) {
this.readableConfig = config;
return this;
Expand All @@ -237,6 +239,7 @@ public Builder flinkConf(ReadableConfig config) {
* @param mode to specify the write distribution mode.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder distributionMode(DistributionMode mode) {
if (mode != null) {
writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName());
Expand Down Expand Up @@ -306,6 +309,7 @@ public Builder rangeDistributionSortKeyBaseWeight(double weight) {
* @param newWriteParallelism the number of parallel iceberg stream writer.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder writeParallelism(int newWriteParallelism) {
writeOptions.put(
FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism));
Expand All @@ -321,6 +325,7 @@ public Builder writeParallelism(int newWriteParallelism) {
* @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder upsert(boolean enabled) {
writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(enabled));
return this;
Expand All @@ -332,6 +337,7 @@ public Builder upsert(boolean enabled) {
* @param columns defines the iceberg table's key.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder equalityFieldColumns(List<String> columns) {
this.equalityFieldColumns = columns;
return this;
Expand Down Expand Up @@ -376,6 +382,7 @@ public Builder setSnapshotProperty(String property, String value) {
return this;
}

@Override
public Builder toBranch(String branch) {
writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
return this;
Expand Down Expand Up @@ -436,6 +443,7 @@ private <T> DataStreamSink<T> chainIcebergOperators() {
*
* @return {@link DataStreamSink} for sink.
*/
@Override
public DataStreamSink<Void> append() {
return chainIcebergOperators();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public SimpleVersionedSerializer<WriteResult> getWriteResultSerializer() {
return new WriteResultSerializer();
}

public static class Builder {
public static class Builder implements IcebergSinkBuilder<Builder> {
private TableLoader tableLoader;
private String uidSuffix = "";
private Function<String, DataStream<RowData>> inputCreator = null;
Expand Down Expand Up @@ -311,6 +311,7 @@ private <T> Builder forMapperOutputType(
* @param newTable the loaded iceberg table instance.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
@Override
public Builder table(Table newTable) {
this.table = (SerializableTable) SerializableTable.copyOf(newTable);
return this;
Expand All @@ -325,6 +326,7 @@ public Builder table(Table newTable) {
* @param newTableLoader to load iceberg table inside tasks.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder tableLoader(TableLoader newTableLoader) {
this.tableLoader = newTableLoader;
return this;
Expand All @@ -347,21 +349,25 @@ public Builder set(String property, String value) {
* Set the write properties for IcebergSink. View the supported properties in {@link
* FlinkWriteOptions}
*/
@Override
public Builder setAll(Map<String, String> properties) {
writeOptions.putAll(properties);
return this;
}

@Override
public Builder tableSchema(TableSchema newTableSchema) {
this.tableSchema = newTableSchema;
return this;
}

@Override
public Builder overwrite(boolean newOverwrite) {
writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite));
return this;
}

@Override
public Builder flinkConf(ReadableConfig config) {
this.readableConfig = config;
return this;
Expand All @@ -374,6 +380,7 @@ public Builder flinkConf(ReadableConfig config) {
* @param mode to specify the write distribution mode.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
@Override
public Builder distributionMode(DistributionMode mode) {
Preconditions.checkArgument(
!DistributionMode.RANGE.equals(mode),
Expand All @@ -390,6 +397,7 @@ public Builder distributionMode(DistributionMode mode) {
* @param newWriteParallelism the number of parallel iceberg stream writer.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
@Override
public Builder writeParallelism(int newWriteParallelism) {
writeOptions.put(
FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism));
Expand All @@ -405,6 +413,7 @@ public Builder writeParallelism(int newWriteParallelism) {
* @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
@Override
public Builder upsert(boolean enabled) {
writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(enabled));
return this;
Expand All @@ -416,6 +425,7 @@ public Builder upsert(boolean enabled) {
* @param columns defines the iceberg table's key.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder equalityFieldColumns(List<String> columns) {
this.equalityFieldColumns = columns;
return this;
Expand Down Expand Up @@ -458,6 +468,7 @@ public Builder setSnapshotProperty(String property, String value) {
return this;
}

@Override
public Builder toBranch(String branch) {
writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
return this;
Expand Down Expand Up @@ -527,6 +538,7 @@ IcebergSink build() {
*
* @return {@link DataStreamSink} for sink.
*/
@Override
public DataStreamSink<RowData> append() {
IcebergSink sink = build();
String suffix = defaultSuffix(uidSuffix, table.name());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink;

import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;

@Internal
/*
This class is for internal purpose of transition between the previous implementation of Flink's sink (FlinkSink)
and the new one implementation based on Flink v2 sink's API (IcebergSink). After we remove the previous implementation,
all occurrences of this class would be replaced by direct IcebergSink usage.
*/
public interface IcebergSinkBuilder<T extends IcebergSinkBuilder<?>> {

T tableSchema(TableSchema newTableSchema);

T tableLoader(TableLoader newTableLoader);

T equalityFieldColumns(List<String> columns);

T overwrite(boolean newOverwrite);

T setAll(Map<String, String> properties);

T flinkConf(ReadableConfig config);

T table(Table newTable);

T writeParallelism(int newWriteParallelism);

T distributionMode(DistributionMode mode);

T toBranch(String branch);

T upsert(boolean enabled);

DataStreamSink<?> append();

static IcebergSinkBuilder<?> forRow(
DataStream<Row> input, TableSchema tableSchema, boolean useV2Sink) {
if (useV2Sink) {
return IcebergSink.forRow(input, tableSchema);
} else {
return FlinkSink.forRow(input, tableSchema);
}
}

static IcebergSinkBuilder<?> forRowData(DataStream<RowData> input, boolean useV2Sink) {
if (useV2Sink) {
return IcebergSink.forRowData(input);
} else {
return FlinkSink.forRowData(input);
}
}
}

0 comments on commit e9df431

Please sign in to comment.