-
Notifications
You must be signed in to change notification settings - Fork 13.6k
[FLINK-37856] Ensure sink option hints are present in compiled plan #26601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -98,7 +100,7 @@ public SinkModifyOperation( | |||
boolean overwrite, | |||
Map<String, String> dynamicOptions, | |||
ModifyType modifyType) { | |||
this.contextResolvedTable = contextResolvedTable; | |||
this.contextResolvedTable = withDynamicOptions(contextResolvedTable, dynamicOptions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we are mixing the api
module with the planner
module here. Ideally, all the logic should go into the planner. Can we perform the merging of options there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can move it later but then there will be multiple code paths that perform merging.
+ "/*+ OPTIONS('sink.parallelism'='2', 'sink-insert-only'='false') */ " | ||
+ "SELECT * FROM MyTable"); | ||
|
||
String expected = TableTestUtil.readFromResource("/jsonplan/testSinkTableWithHints.out"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe rename the file for the source above to be symmetric?
722a4a9
to
d13d28c
Compare
@@ -258,6 +259,14 @@ private static RelNode convertSinkToRel( | |||
int[][] targetColumns, | |||
boolean isOverwrite, | |||
DynamicTableSink sink) { | |||
if (!dynamicOptions.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure this cannot be null? Maybe use CollectionUtils.isEmpty()?
@@ -462,11 +462,14 @@ abstract class PlannerBase( | |||
} | |||
|
|||
case regularTable: CatalogTable => | |||
val resolvedTable = contextResolvedTable.getResolvedTable[ResolvedCatalogTable] | |||
val tableToFind = if (dynamicOptions.nonEmpty) { | |||
resolvedTable.copy(FlinkHints.mergeTableOptions(dynamicOptions, resolvedTable.getOptions)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it not be the same to add resolvedTable =
to the existing code before the resolvedTable.copy
rather than adding the extra val resolvedTable = {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extra val allows us to use val in scala rather than vars for the result. I can change to var but I found the code base uses mostly vals but I'm new to the planner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that makes sense @AHeise val seems preferable in Scala
d13d28c
to
df0a31a
Compare
Squashed and reverted the changes to |
df0a31a
to
19cc093
Compare
@flinkbot run azure |
19cc093
to
d5cc05d
Compare
compilePlan and executeSql use two different code paths. The former uses DynamicTableSinkSpec to render the json, the latter directly creates the DynamicSinkTable. Merging the dynamic options so far only happened on the second code path. This commit eagerly merges the options into the ResolvedCatalogTable such that the combined options can be used in both code paths. The commit still retains all hints fields and parameters to ensure that explain still outputs the same plan where hints are explicitly added if available. The hints are not explicitly used while creating the DynamicSinkTable anymore because the ResolvedCatalogTable already contains them.
d5cc05d
to
309acc8
Compare
What is the purpose of the change
compilePlan and executeSql use two different code paths. The former uses DynamicTableSinkSpec to render the json, the latter directly creates the DynamicSinkTable. Merging the dynamic options so far only happened on the second code path.
This commit eagerly merges the options into the ResolvedCatalogTable such that the combined options can be used in both code paths. The commit still retains all hints fields and parameters to ensure that explain still outputs the same plan where hints are explicitly added if available. The hints are not explicitly used while creating the DynamicSinkTable anymore because the ResolvedCatalogTable already contains them.
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation