From e9df431f02497ab16eea6108db37706bd2fbf3f2 Mon Sep 17 00:00:00 2001 From: Arek Burdach <> Date: Fri, 11 Oct 2024 22:00:19 +0200 Subject: [PATCH] Flink: Add IcebergSinkBuilder interface allowed unification of most of operations on FlinkSink and IcebergSink Builders --- .../apache/iceberg/flink/sink/FlinkSink.java | 18 ++-- .../iceberg/flink/sink/IcebergSink.java | 14 +++- .../flink/sink/IcebergSinkBuilder.java | 82 +++++++++++++++++++ 3 files changed, 108 insertions(+), 6 deletions(-) create mode 100644 flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index c53431490984..116d52d1be7f 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -31,7 +31,6 @@ import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.function.Function; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -67,11 +66,8 @@ import org.apache.iceberg.flink.sink.shuffle.StatisticsOrRecord; import org.apache.iceberg.flink.sink.shuffle.StatisticsType; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; -import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.SerializableSupplier; import org.slf4j.Logger; @@ -134,7 +130,7 @@ public static Builder forRowData(DataStream input) { return new Builder().forRowData(input); } - public static class Builder { + public static class Builder implements IcebergSinkBuilder { private Function> inputCreator = null; private TableLoader tableLoader; private Table table; @@ -179,6 +175,7 @@ private Builder forMapperOutputType( * @param newTable the loaded iceberg table instance. * @return {@link Builder} to connect the iceberg table. */ + @Override public Builder table(Table newTable) { this.table = newTable; return this; @@ -192,6 +189,7 @@ public Builder table(Table newTable) { * @param newTableLoader to load iceberg table inside tasks. * @return {@link Builder} to connect the iceberg table. */ + @Override public Builder tableLoader(TableLoader newTableLoader) { this.tableLoader = newTableLoader; return this; @@ -210,21 +208,25 @@ public Builder set(String property, String value) { * Set the write properties for Flink sink. View the supported properties in {@link * FlinkWriteOptions} */ + @Override public Builder setAll(Map properties) { writeOptions.putAll(properties); return this; } + @Override public Builder tableSchema(TableSchema newTableSchema) { this.tableSchema = newTableSchema; return this; } + @Override public Builder overwrite(boolean newOverwrite) { writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite)); return this; } + @Override public Builder flinkConf(ReadableConfig config) { this.readableConfig = config; return this; @@ -237,6 +239,7 @@ public Builder flinkConf(ReadableConfig config) { * @param mode to specify the write distribution mode. * @return {@link Builder} to connect the iceberg table. */ + @Override public Builder distributionMode(DistributionMode mode) { if (mode != null) { writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName()); @@ -306,6 +309,7 @@ public Builder rangeDistributionSortKeyBaseWeight(double weight) { * @param newWriteParallelism the number of parallel iceberg stream writer. * @return {@link Builder} to connect the iceberg table. */ + @Override public Builder writeParallelism(int newWriteParallelism) { writeOptions.put( FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism)); @@ -321,6 +325,7 @@ public Builder writeParallelism(int newWriteParallelism) { * @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT. * @return {@link Builder} to connect the iceberg table. */ + @Override public Builder upsert(boolean enabled) { writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(enabled)); return this; @@ -332,6 +337,7 @@ public Builder upsert(boolean enabled) { * @param columns defines the iceberg table's key. * @return {@link Builder} to connect the iceberg table. */ + @Override public Builder equalityFieldColumns(List columns) { this.equalityFieldColumns = columns; return this; @@ -376,6 +382,7 @@ public Builder setSnapshotProperty(String property, String value) { return this; } + @Override public Builder toBranch(String branch) { writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch); return this; @@ -436,6 +443,7 @@ private DataStreamSink chainIcebergOperators() { * * @return {@link DataStreamSink} for sink. */ + @Override public DataStreamSink append() { return chainIcebergOperators(); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index d080169544cd..01be4a2eef71 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -255,7 +255,7 @@ public SimpleVersionedSerializer getWriteResultSerializer() { return new WriteResultSerializer(); } - public static class Builder { + public static class Builder implements IcebergSinkBuilder { private TableLoader tableLoader; private String uidSuffix = ""; private Function> inputCreator = null; @@ -311,6 +311,7 @@ private Builder forMapperOutputType( * @param newTable the loaded iceberg table instance. * @return {@link IcebergSink.Builder} to connect the iceberg table. */ + @Override public Builder table(Table newTable) { this.table = (SerializableTable) SerializableTable.copyOf(newTable); return this; @@ -325,6 +326,7 @@ public Builder table(Table newTable) { * @param newTableLoader to load iceberg table inside tasks. * @return {@link Builder} to connect the iceberg table. */ + @Override public Builder tableLoader(TableLoader newTableLoader) { this.tableLoader = newTableLoader; return this; @@ -347,21 +349,25 @@ public Builder set(String property, String value) { * Set the write properties for IcebergSink. View the supported properties in {@link * FlinkWriteOptions} */ + @Override public Builder setAll(Map properties) { writeOptions.putAll(properties); return this; } + @Override public Builder tableSchema(TableSchema newTableSchema) { this.tableSchema = newTableSchema; return this; } + @Override public Builder overwrite(boolean newOverwrite) { writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite)); return this; } + @Override public Builder flinkConf(ReadableConfig config) { this.readableConfig = config; return this; @@ -374,6 +380,7 @@ public Builder flinkConf(ReadableConfig config) { * @param mode to specify the write distribution mode. * @return {@link IcebergSink.Builder} to connect the iceberg table. */ + @Override public Builder distributionMode(DistributionMode mode) { Preconditions.checkArgument( !DistributionMode.RANGE.equals(mode), @@ -390,6 +397,7 @@ public Builder distributionMode(DistributionMode mode) { * @param newWriteParallelism the number of parallel iceberg stream writer. * @return {@link IcebergSink.Builder} to connect the iceberg table. */ + @Override public Builder writeParallelism(int newWriteParallelism) { writeOptions.put( FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism)); @@ -405,6 +413,7 @@ public Builder writeParallelism(int newWriteParallelism) { * @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT. * @return {@link IcebergSink.Builder} to connect the iceberg table. */ + @Override public Builder upsert(boolean enabled) { writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(enabled)); return this; @@ -416,6 +425,7 @@ public Builder upsert(boolean enabled) { * @param columns defines the iceberg table's key. * @return {@link Builder} to connect the iceberg table. */ + @Override public Builder equalityFieldColumns(List columns) { this.equalityFieldColumns = columns; return this; @@ -458,6 +468,7 @@ public Builder setSnapshotProperty(String property, String value) { return this; } + @Override public Builder toBranch(String branch) { writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch); return this; @@ -527,6 +538,7 @@ IcebergSink build() { * * @return {@link DataStreamSink} for sink. */ + @Override public DataStreamSink append() { IcebergSink sink = build(); String suffix = defaultSuffix(uidSuffix, table.name()); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java new file mode 100644 index 000000000000..711193563d82 --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.util.List; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.Row; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; + +@Internal +/* + This class is for internal purpose of transition between the previous implementation of Flink's sink (FlinkSink) + and the new one implementation based on Flink v2 sink's API (IcebergSink). After we remove the previous implementation, + all occurrences of this class would be replaced by direct IcebergSink usage. +*/ +public interface IcebergSinkBuilder> { + + T tableSchema(TableSchema newTableSchema); + + T tableLoader(TableLoader newTableLoader); + + T equalityFieldColumns(List columns); + + T overwrite(boolean newOverwrite); + + T setAll(Map properties); + + T flinkConf(ReadableConfig config); + + T table(Table newTable); + + T writeParallelism(int newWriteParallelism); + + T distributionMode(DistributionMode mode); + + T toBranch(String branch); + + T upsert(boolean enabled); + + DataStreamSink append(); + + static IcebergSinkBuilder forRow( + DataStream input, TableSchema tableSchema, boolean useV2Sink) { + if (useV2Sink) { + return IcebergSink.forRow(input, tableSchema); + } else { + return FlinkSink.forRow(input, tableSchema); + } + } + + static IcebergSinkBuilder forRowData(DataStream input, boolean useV2Sink) { + if (useV2Sink) { + return IcebergSink.forRowData(input); + } else { + return FlinkSink.forRowData(input); + } + } +}