Skip to content

Commit

Permalink
Flink: Support write options in the in-line SQL comments (apache#5050)
Browse files Browse the repository at this point in the history
  • Loading branch information
hililiwei authored Aug 1, 2022
1 parent 14f4bc1 commit 00e0f7b
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 4 deletions.
5 changes: 5 additions & 0 deletions docs/flink-getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,11 @@ FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SC
.set("write-format", "orc")
.set(FlinkWriteOptions.OVERWRITE_MODE, "true");
```
For Flink SQL, write options can be passed in via SQL hints like this:
```
INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
...
```

| Flink option | Default | Description |
|------------------------| -------------------------- |------------------------------------------------------------------------------------------------------------|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
public DynamicTableSink createDynamicTableSink(Context context) {
ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
CatalogTable catalogTable = context.getCatalogTable();
Map<String, String> tableProps = catalogTable.getOptions();
Map<String, String> writeProps = catalogTable.getOptions();
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());

TableLoader tableLoader;
Expand All @@ -116,10 +116,10 @@ public DynamicTableSink createDynamicTableSink(Context context) {
} else {
tableLoader =
createTableLoader(
catalogTable, tableProps, objectPath.getDatabaseName(), objectPath.getObjectName());
catalogTable, writeProps, objectPath.getDatabaseName(), objectPath.getObjectName());
}

return new IcebergTableSink(tableLoader, tableSchema, context.getConfiguration());
return new IcebergTableSink(tableLoader, tableSchema, context.getConfiguration(), writeProps);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
private final TableLoader tableLoader;
private final TableSchema tableSchema;
private final ReadableConfig readableConfig;
private final Map<String, String> writeProps;

private boolean overwrite = false;

Expand All @@ -45,13 +46,18 @@ private IcebergTableSink(IcebergTableSink toCopy) {
this.tableSchema = toCopy.tableSchema;
this.overwrite = toCopy.overwrite;
this.readableConfig = toCopy.readableConfig;
this.writeProps = toCopy.writeProps;
}

public IcebergTableSink(
TableLoader tableLoader, TableSchema tableSchema, ReadableConfig readableConfig) {
TableLoader tableLoader,
TableSchema tableSchema,
ReadableConfig readableConfig,
Map<String, String> writeProps) {
this.tableLoader = tableLoader;
this.tableSchema = tableSchema;
this.readableConfig = readableConfig;
this.writeProps = writeProps;
}

@Override
Expand All @@ -70,6 +76,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
.overwrite(overwrite)
.setAll(writeProps)
.flinkConf(readableConfig)
.append();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,53 @@ public void testUpsertAndQuery() {
}
}

@Test
public void testUpsertOptions() {
String tableName = "test_upsert_options";
LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
LocalDate dt20220302 = LocalDate.of(2022, 3, 2);

Map<String, String> optionsUpsertProps = Maps.newHashMap(tableUpsertProps);
optionsUpsertProps.remove(TableProperties.UPSERT_ENABLED);
sql(
"CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE, PRIMARY KEY(id,province) NOT ENFORCED) "
+ "PARTITIONED BY (province) WITH %s",
tableName, toWithClause(optionsUpsertProps));

try {
sql(
"INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ VALUES "
+ "(1, 'a', DATE '2022-03-01'),"
+ "(2, 'b', DATE '2022-03-01'),"
+ "(1, 'b', DATE '2022-03-01')",
tableName);

sql(
"INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ VALUES "
+ "(4, 'a', DATE '2022-03-02'),"
+ "(5, 'b', DATE '2022-03-02'),"
+ "(1, 'b', DATE '2022-03-02')",
tableName);

List<Row> rowsOn20220301 =
Lists.newArrayList(Row.of(2, "b", dt20220301), Row.of(1, "a", dt20220301));
TestHelpers.assertRows(
sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName), rowsOn20220301);

List<Row> rowsOn20220302 =
Lists.newArrayList(
Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5, "b", dt20220302));
TestHelpers.assertRows(
sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName), rowsOn20220302);

TestHelpers.assertRows(
sql("SELECT * FROM %s", tableName),
Lists.newArrayList(Iterables.concat(rowsOn20220301, rowsOn20220302)));
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
}

@Test
public void testPrimaryKeyEqualToPartitionKey() {
// This is an SQL based reproduction of TestFlinkIcebergSinkV2#testUpsertOnDataKey
Expand Down

0 comments on commit 00e0f7b

Please sign in to comment.