Skip to content

[FLINK-37856] Ensure sink option hints are present in compiled plan #26601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 31, 2025

Conversation

AHeise
Copy link
Contributor

@AHeise AHeise commented May 27, 2025

What is the purpose of the change

compilePlan and executeSql use two different code paths. The former uses DynamicTableSinkSpec to render the json, the latter directly creates the DynamicSinkTable. Merging the dynamic options so far only happened on the second code path.

This commit eagerly merges the options into the ResolvedCatalogTable such that the combined options can be used in both code paths. The commit still retains all hints fields and parameters to ensure that explain still outputs the same plan where hints are explicitly added if available. The hints are not explicitly used while creating the DynamicSinkTable anymore because the ResolvedCatalogTable already contains them.

Brief change log

  • Extract mergeTableOptions to HintsUtil in table-api
  • Apply that eagerly in SinkModifyOperation

Verifying this change

This change added tests and can be verified as follows:

  • Added new integration test CompiledPlanITCase#testSinkTableWithHints

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented May 27, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@@ -98,7 +100,7 @@ public SinkModifyOperation(
boolean overwrite,
Map<String, String> dynamicOptions,
ModifyType modifyType) {
this.contextResolvedTable = contextResolvedTable;
this.contextResolvedTable = withDynamicOptions(contextResolvedTable, dynamicOptions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we are mixing the api module with the planner module here. Ideally, all the logic should go into the planner. Can we perform the merging of options there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can move it later but then there will be multiple code paths that perform merging.

+ "/*+ OPTIONS('sink.parallelism'='2', 'sink-insert-only'='false') */ "
+ "SELECT * FROM MyTable");

String expected = TableTestUtil.readFromResource("/jsonplan/testSinkTableWithHints.out");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename the file for the source above to be symmetric?

@AHeise AHeise force-pushed the FLINK-37856-sink-options branch 2 times, most recently from 722a4a9 to d13d28c Compare May 27, 2025 13:11
@@ -258,6 +259,14 @@ private static RelNode convertSinkToRel(
int[][] targetColumns,
boolean isOverwrite,
DynamicTableSink sink) {
if (!dynamicOptions.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this cannot be null? Maybe use CollectionUtils.isEmpty()?

@@ -462,11 +462,14 @@ abstract class PlannerBase(
}

case regularTable: CatalogTable =>
val resolvedTable = contextResolvedTable.getResolvedTable[ResolvedCatalogTable]
val tableToFind = if (dynamicOptions.nonEmpty) {
resolvedTable.copy(FlinkHints.mergeTableOptions(dynamicOptions, resolvedTable.getOptions))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it not be the same to add resolvedTable = to the existing code before the resolvedTable.copy rather than adding the extra val resolvedTable = {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extra val allows us to use val in scala rather than vars for the result. I can change to var but I found the code base uses mostly vals but I'm new to the planner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense @AHeise val seems preferable in Scala

@AHeise AHeise force-pushed the FLINK-37856-sink-options branch from d13d28c to df0a31a Compare May 28, 2025 12:49
@AHeise
Copy link
Contributor Author

AHeise commented May 28, 2025

Squashed and reverted the changes to SqlDmlToOperationConverterTest.

@AHeise AHeise force-pushed the FLINK-37856-sink-options branch from df0a31a to 19cc093 Compare May 28, 2025 21:56
@AHeise
Copy link
Contributor Author

AHeise commented May 29, 2025

@flinkbot run azure

@AHeise AHeise force-pushed the FLINK-37856-sink-options branch from 19cc093 to d5cc05d Compare May 29, 2025 17:05
compilePlan and executeSql use two different code paths. The former uses DynamicTableSinkSpec to render the json, the latter directly creates the DynamicSinkTable. Merging the dynamic options so far only happened on the second code path.

This commit eagerly merges the options into the ResolvedCatalogTable such that the combined options can be used in both code paths. The commit still retains all hints fields and parameters to ensure that explain still outputs the same plan where hints are explicitly added if available. The hints are not explicitly used while creating the DynamicSinkTable anymore because the ResolvedCatalogTable already contains them.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants