Skip to content

Commit

Permalink
Flink: add read options configuration for watermark settings
Browse files Browse the repository at this point in the history
  • Loading branch information
rodmeneses committed Jan 8, 2024
1 parent 8c7001e commit ba4204b
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 21 deletions.
4 changes: 2 additions & 2 deletions docs/flink-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ The following properties can be set if using the REST catalog:
| credential | | | A credential to exchange for a token in the OAuth2 client credentials flow. |
| token | | | A token which will be used to interact with the server. |


## Runtime configuration

### Read options
Expand Down Expand Up @@ -133,7 +132,8 @@ env.getConfig()
| max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A | Integer.MAX_VALUE | Max number of snapshots limited per split enumeration. Applicable only to streaming read. |
| limit | connector.iceberg.limit | N/A | -1 | Limited output number of rows. |
| max-allowed-planning-failures | connector.iceberg.max-allowed-planning-failures | N/A | 3 | Max allowed consecutive failures for scan planning before failing the job. Set to -1 for never failing the job for scan planing failure. |

| watermark-column | connector.iceberg.watermark-column | N/A | null | Specifies the watermark column to use for watermark generation. If this option is present, the `splitAssignerFactory` will be overridden with `OrderedSplitAssignerFactory`. |
| watermark-column-time-unit | connector.iceberg.watermark-column-time-unit | N/A | TimeUnit.MICROSECONDS | Specifies the watermark time unit to use for watermark generation. The possible values are DAYS, HOURS, MINUTES, SECONDS, MILLISECONDS, MICROSECONDS, NANOSECONDS. |

### Write options

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.TimeUtils;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -190,4 +191,22 @@ public int maxAllowedPlanningFailures() {
.defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
.parse();
}

public String watermarkColumn() {
return confParser
.stringConf()
.option(FlinkReadOptions.WATERMARK_COLUMN)
.flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_OPTION)
.defaultValue(FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue())
.parseOptional();
}

public TimeUnit watermarkTimeUnit() {
return confParser
.enumConfParser(TimeUnit.class)
.option(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT)
.flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION)
.defaultValue(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue())
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink;

import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -109,4 +110,14 @@ private FlinkReadOptions() {}
public static final String MAX_ALLOWED_PLANNING_FAILURES = "max-allowed-planning-failures";
public static final ConfigOption<Integer> MAX_ALLOWED_PLANNING_FAILURES_OPTION =
ConfigOptions.key(PREFIX + MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);

public static final String WATERMARK_COLUMN = "watermark-column";
public static final ConfigOption<String> WATERMARK_COLUMN_OPTION =
ConfigOptions.key(PREFIX + WATERMARK_COLUMN).stringType().noDefaultValue();

public static final String WATERMARK_COLUMN_TIME_UNIT = "watermark-column-time-unit";
public static final ConfigOption<TimeUnit> WATERMARK_COLUMN_TIME_UNIT_OPTION =
ConfigOptions.key(PREFIX + WATERMARK_COLUMN_TIME_UNIT)
.enumType(TimeUnit.class)
.defaultValue(TimeUnit.MICROSECONDS);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkReadConf;
import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
Expand Down Expand Up @@ -219,8 +220,6 @@ public static class Builder<T> {
private Table table;
private SplitAssignerFactory splitAssignerFactory;
private SerializableComparator<IcebergSourceSplit> splitComparator;
private String watermarkColumn;
private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS;
private ReaderFunction<T> readerFunction;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
Expand All @@ -242,9 +241,6 @@ public Builder<T> table(Table newTable) {
}

public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
Preconditions.checkArgument(
watermarkColumn == null,
"Watermark column and SplitAssigner should not be set in the same source");
this.splitAssignerFactory = assignerFactory;
return this;
}
Expand Down Expand Up @@ -441,7 +437,7 @@ public Builder<T> setAll(Map<String, String> properties) {
* Emits watermarks once per split based on the min value of column statistics from files
* metadata in the given split. The generated watermarks are also used for ordering the splits
* for read. Accepted column types are timestamp/timestamptz/long. For long columns consider
* setting {@link #watermarkTimeUnit(TimeUnit)}.
* setting {@link #watermarkColumnTimeUnit(TimeUnit)}.
*
* <p>Consider setting `read.split.open-file-cost` to prevent combining small files to a single
* split when the watermark is used for watermark alignment.
Expand All @@ -450,7 +446,7 @@ public Builder<T> watermarkColumn(String columnName) {
Preconditions.checkArgument(
splitAssignerFactory == null,
"Watermark column and SplitAssigner should not be set in the same source");
this.watermarkColumn = columnName;
readOptions.put(FlinkReadOptions.WATERMARK_COLUMN, columnName);
return this;
}

Expand All @@ -459,8 +455,8 @@ public Builder<T> watermarkColumn(String columnName) {
* org.apache.iceberg.types.Types.LongType}, then sets the {@link TimeUnit} to convert the
* value. The default value is {@link TimeUnit#MICROSECONDS}.
*/
public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {
this.watermarkTimeUnit = timeUnit;
public Builder<T> watermarkColumnTimeUnit(TimeUnit timeUnit) {
readOptions.put(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT, timeUnit.name());
return this;
}

Expand All @@ -482,13 +478,16 @@ public IcebergSource<T> build() {
}

contextBuilder.resolveConfig(table, readOptions, flinkConfig);

Schema icebergSchema = table.schema();
if (projectedFlinkSchema != null) {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema));
}

SerializableRecordEmitter<T> emitter = SerializableRecordEmitter.defaultEmitter();
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, flinkConfig);
String watermarkColumn = flinkReadConf.watermarkColumn();
TimeUnit watermarkTimeUnit = flinkReadConf.watermarkTimeUnit();

if (watermarkColumn != null) {
// Column statistics is needed for watermark generation
contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -67,6 +68,8 @@ public class ScanContext implements Serializable {
private final Integer planParallelism;
private final int maxPlanningSnapshotCount;
private final int maxAllowedPlanningFailures;
private final String watermarkColumn;
private final TimeUnit watermarkTimeUnit;

private ScanContext(
boolean caseSensitive,
Expand All @@ -91,6 +94,8 @@ private ScanContext(
Integer planParallelism,
int maxPlanningSnapshotCount,
int maxAllowedPlanningFailures,
String watermarkColumn,
TimeUnit watermarkTimeUnit,
String branch,
String tag,
String startTag,
Expand Down Expand Up @@ -122,6 +127,8 @@ private ScanContext(
this.planParallelism = planParallelism;
this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;
this.watermarkColumn = watermarkColumn;
this.watermarkTimeUnit = watermarkTimeUnit;

validate();
}
Expand Down Expand Up @@ -272,6 +279,14 @@ public int maxAllowedPlanningFailures() {
return maxAllowedPlanningFailures;
}

public String watermarkColumn() {
return watermarkColumn;
}

public TimeUnit watermarkTimeUnit() {
return watermarkTimeUnit;
}

public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
Expand All @@ -298,6 +313,8 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(watermarkColumn)
.watermarkTimeUnit(watermarkTimeUnit)
.build();
}

Expand Down Expand Up @@ -327,6 +344,8 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) {
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(watermarkColumn)
.watermarkTimeUnit(watermarkTimeUnit)
.build();
}

Expand Down Expand Up @@ -367,6 +386,9 @@ public static class Builder {
FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
private int maxAllowedPlanningFailures =
FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
private String watermarkColumn = FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue();
private TimeUnit watermarkTimeUnit =
FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue();

private Builder() {}

Expand Down Expand Up @@ -500,6 +522,16 @@ public Builder maxAllowedPlanningFailures(int newMaxAllowedPlanningFailures) {
return this;
}

public Builder watermarkColumn(String newWatermarkColumn) {
this.watermarkColumn = newWatermarkColumn;
return this;
}

public Builder watermarkTimeUnit(TimeUnit newWatermarkTimeUnit) {
this.watermarkTimeUnit = newWatermarkTimeUnit;
return this;
}

public Builder resolveConfig(
Table table, Map<String, String> readOptions, ReadableConfig readableConfig) {
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);
Expand All @@ -525,7 +557,9 @@ public Builder resolveConfig(
.planParallelism(flinkReadConf.workerPoolSize())
.includeColumnStats(flinkReadConf.includeColumnStats())
.maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
.maxAllowedPlanningFailures(maxAllowedPlanningFailures);
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(flinkReadConf.watermarkColumn())
.watermarkTimeUnit(flinkReadConf.watermarkTimeUnit());
}

public ScanContext build() {
Expand All @@ -552,6 +586,8 @@ public ScanContext build() {
planParallelism,
maxPlanningSnapshotCount,
maxAllowedPlanningFailures,
watermarkColumn,
watermarkTimeUnit,
branch,
tag,
startTag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static List<Row> convertRowDataToRow(List<RowData> rowDataList, RowType r
.collect(Collectors.toList());
}

public static void assertRecords(List<Row> results, List<Record> expectedRecords, Schema schema) {
private static List<Row> convertRecordToRow(List<Record> expectedRecords, Schema schema) {
List<Row> expected = Lists.newArrayList();
@SuppressWarnings("unchecked")
DataStructureConverter<RowData, Row> converter =
Expand All @@ -135,6 +135,17 @@ public static void assertRecords(List<Row> results, List<Record> expectedRecords
TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema)));
expectedRecords.forEach(
r -> expected.add(converter.toExternal(RowDataConverter.convert(schema, r))));
return expected;
}

public static void assertRecordsWithOrder(
List<Row> results, List<Record> expectedRecords, Schema schema) {
List<Row> expected = convertRecordToRow(expectedRecords, schema);
assertRowsWithOrder(results, expected);
}

public static void assertRecords(List<Row> results, List<Record> expectedRecords, Schema schema) {
List<Row> expected = convertRecordToRow(expectedRecords, schema);
assertRows(results, expected);
}

Expand All @@ -146,6 +157,10 @@ public static void assertRows(List<Row> results, List<Row> expected) {
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
}

public static void assertRowsWithOrder(List<Row> results, List<Row> expected) {
Assertions.assertThat(results).containsExactlyElementsOf(expected);
}

public static void assertRowData(Schema schema, StructLike expected, RowData actual) {
assertRowData(schema.asStruct(), FlinkSchemaUtil.convert(schema), expected, actual);
}
Expand Down
Loading

0 comments on commit ba4204b

Please sign in to comment.