Skip to content

Commit c0cde94

Browse files
committed
fixup! [FLINK-37856] Ensure sink option hints are present in compiled plan
Move merging to planner
1 parent 00d38e8 commit c0cde94

File tree

7 files changed

+49
-71
lines changed

7 files changed

+49
-71
lines changed

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import java.util.LinkedHashMap;
3030
import java.util.Map;
3131

32-
import static org.apache.flink.table.operations.utils.HintUtils.mergeTableOptions;
33-
3432
/**
3533
* DML operation that tells to write to a sink.
3634
*
@@ -100,7 +98,7 @@ public SinkModifyOperation(
10098
boolean overwrite,
10199
Map<String, String> dynamicOptions,
102100
ModifyType modifyType) {
103-
this.contextResolvedTable = withDynamicOptions(contextResolvedTable, dynamicOptions);
101+
this.contextResolvedTable = contextResolvedTable;
104102
this.child = child;
105103
this.staticPartitions = staticPartitions;
106104
this.targetColumns = targetColumns;
@@ -109,16 +107,6 @@ public SinkModifyOperation(
109107
this.modifyType = modifyType;
110108
}
111109

112-
private ContextResolvedTable withDynamicOptions(
113-
ContextResolvedTable contextResolvedTable, Map<String, String> dynamicOptions) {
114-
if (dynamicOptions.isEmpty()) {
115-
return contextResolvedTable;
116-
}
117-
return contextResolvedTable.copy(
118-
mergeTableOptions(
119-
dynamicOptions, contextResolvedTable.getResolvedTable().getOptions()));
120-
}
121-
122110
public ContextResolvedTable getContextResolvedTable() {
123111
return contextResolvedTable;
124112
}

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/HintUtils.java

Lines changed: 0 additions & 51 deletions
This file was deleted.

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@
107107
import java.util.stream.IntStream;
108108
import java.util.stream.Stream;
109109

110+
import static org.apache.flink.table.planner.hint.FlinkHints.mergeTableOptions;
110111
import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext;
111112
import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
112113
import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsAvoidingCast;
@@ -258,6 +259,14 @@ private static RelNode convertSinkToRel(
258259
int[][] targetColumns,
259260
boolean isOverwrite,
260261
DynamicTableSink sink) {
262+
if (!dynamicOptions.isEmpty()) {
263+
contextResolvedTable =
264+
contextResolvedTable.copy(
265+
mergeTableOptions(
266+
dynamicOptions,
267+
contextResolvedTable.getResolvedTable().getOptions()));
268+
}
269+
261270
final DataTypeFactory dataTypeFactory =
262271
unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
263272
final FlinkTypeFactory typeFactory = unwrapTypeFactory(relBuilder);

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.commons.lang3.StringUtils;
3838

3939
import java.util.Collections;
40+
import java.util.HashMap;
4041
import java.util.List;
4142
import java.util.Map;
4243
import java.util.Optional;
@@ -76,6 +77,29 @@ public static Map<String, String> getHintedOptions(List<RelHint> tableHints) {
7677
.orElse(Collections.emptyMap());
7778
}
7879

80+
/**
81+
* Merges the dynamic table options from {@code hints} and static table options from table
82+
* definition {@code props}.
83+
*
84+
* <p>The options in {@code hints} would override the ones in {@code props} if they have the
85+
* same option key.
86+
*
87+
* @param hints Dynamic table options, usually from the OPTIONS hint
88+
* @param props Static table options defined in DDL or connect API
89+
* @return New options with merged dynamic table options, or the old {@code props} if there is
90+
* no dynamic table options
91+
*/
92+
public static Map<String, String> mergeTableOptions(
93+
Map<String, String> hints, Map<String, String> props) {
94+
if (hints.size() == 0) {
95+
return props;
96+
}
97+
Map<String, String> newProps = new HashMap<>();
98+
newProps.putAll(props);
99+
newProps.putAll(hints);
100+
return Collections.unmodifiableMap(newProps);
101+
}
102+
79103
public static Optional<String> getTableAlias(RelNode node) {
80104
if (node instanceof Hintable) {
81105
Hintable aliasNode = (Hintable) node;

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.flink.table.factories.DynamicTableSourceFactory;
3030
import org.apache.flink.table.factories.FactoryUtil;
3131
import org.apache.flink.table.module.Module;
32-
import org.apache.flink.table.operations.utils.HintUtils;
3332
import org.apache.flink.table.planner.calcite.FlinkContext;
3433
import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
3534
import org.apache.flink.table.planner.catalog.CatalogSchemaTable;
@@ -148,7 +147,7 @@ private ContextResolvedTable computeContextResolvedTable(
148147
contextResolvedTable.getIdentifier()));
149148
}
150149
return contextResolvedTable.copy(
151-
HintUtils.mergeTableOptions(
150+
FlinkHints.mergeTableOptions(
152151
hintedOptions, contextResolvedTable.getResolvedTable().getOptions()));
153152
}
154153

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ abstract class PlannerBase(
299299
case catalogSink: SinkModifyOperation =>
300300
val input = createRelBuilder.queryOperation(modifyOperation.getChild).build()
301301
val dynamicOptions = catalogSink.getDynamicOptions
302-
getTableSink(catalogSink.getContextResolvedTable).map {
302+
getTableSink(catalogSink.getContextResolvedTable, dynamicOptions).map {
303303
case (table, sink: TableSink[_]) =>
304304
// Legacy tables can't be anonymous
305305
val identifier = catalogSink.getContextResolvedTable.getIdentifier
@@ -451,7 +451,8 @@ abstract class PlannerBase(
451451
}
452452

453453
private def getTableSink(
454-
contextResolvedTable: ContextResolvedTable): Option[(ResolvedCatalogTable, Any)] = {
454+
contextResolvedTable: ContextResolvedTable,
455+
dynamicOptions: JMap[String, String]): Option[(ResolvedCatalogTable, Any)] = {
455456
contextResolvedTable.getTable[CatalogBaseTable] match {
456457
case connectorTable: ConnectorCatalogTable[_, _] =>
457458
val resolvedTable = contextResolvedTable.getResolvedTable[ResolvedCatalogTable]
@@ -461,7 +462,15 @@ abstract class PlannerBase(
461462
}
462463

463464
case regularTable: CatalogTable =>
464-
val resolvedTable = contextResolvedTable.getResolvedTable[ResolvedCatalogTable]
465+
val resolvedTable = {
466+
val resolvedTable = contextResolvedTable.getResolvedTable[ResolvedCatalogTable]
467+
if (dynamicOptions.nonEmpty) {
468+
resolvedTable.copy(
469+
FlinkHints.mergeTableOptions(dynamicOptions, resolvedTable.getOptions))
470+
} else {
471+
resolvedTable
472+
}
473+
}
465474
val catalog = toScala(contextResolvedTable.getCatalog)
466475
val objectIdentifier = contextResolvedTable.getIdentifier
467476
val isTemporary = contextResolvedTable.isTemporary

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.flink.table.legacy.sources.TableSource
2828
import org.apache.flink.table.planner.JMap
2929
import org.apache.flink.table.planner.calcite.{FlinkRelBuilder, FlinkTypeFactory}
3030
import org.apache.flink.table.planner.catalog.CatalogSchemaTable
31-
import org.apache.flink.table.operations.utils.HintUtils
31+
import org.apache.flink.table.planner.hint.FlinkHints
3232
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext
3333
import org.apache.flink.table.sources.TableSourceValidation
3434
import org.apache.flink.table.types.logical.{LocalZonedTimestampType, TimestampKind, TimestampType}
@@ -171,7 +171,7 @@ class LegacyCatalogSourceTable[T](
171171
hintedOptions: JMap[String, String],
172172
conf: ReadableConfig): TableSource[T] = {
173173
val tableToFind = if (hintedOptions.nonEmpty) {
174-
catalogTable.copy(HintUtils.mergeTableOptions(hintedOptions, catalogTable.getOptions))
174+
catalogTable.copy(FlinkHints.mergeTableOptions(hintedOptions, catalogTable.getOptions))
175175
} else {
176176
catalogTable
177177
}

0 commit comments

Comments
 (0)