Skip to content

Commit 9a2c0ca

Browse files
committed
[FLINK-38493][table] Port Calcite's fix for LITERAL is lost while validation
1 parent 1f34a07 commit 9a2c0ca

File tree

11 files changed

+102
-89
lines changed

11 files changed

+102
-89
lines changed

flink-table/flink-sql-client/src/test/resources/sql/view.q

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -59,39 +59,39 @@ create temporary view if not exists v2 as select * from v1;
5959

6060
# test show create a temporary view
6161
show create view v1;
62-
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
63-
| result |
64-
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
62+
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
63+
| result |
64+
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
6565
| CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v1` (
6666
`user`,
6767
`product`,
6868
`amount`,
6969
`ts`,
7070
`ptime`
7171
)
72-
AS SELECT *
73-
FROM `default_catalog`.`default_database`.`orders`
72+
AS SELECT `orders`.`user`, `orders`.`product`, `orders`.`amount`, `orders`.`ts`, `orders`.`ptime`
73+
FROM `default_catalog`.`default_database`.`orders` AS `orders`
7474
|
75-
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
75+
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
7676
1 row in set
7777
!ok
7878

7979
# test show create a temporary view reference another view
8080
show create view v2;
81-
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
82-
| result |
83-
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
81+
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
82+
| result |
83+
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
8484
| CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v2` (
8585
`user`,
8686
`product`,
8787
`amount`,
8888
`ts`,
8989
`ptime`
9090
)
91-
AS SELECT *
92-
FROM `default_catalog`.`default_database`.`v1`
91+
AS SELECT `v1`.`user`, `v1`.`product`, `v1`.`amount`, `v1`.`ts`, `v1`.`ptime`
92+
FROM `default_catalog`.`default_database`.`v1` AS `v1`
9393
|
94-
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
94+
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
9595
1 row in set
9696
!ok
9797

@@ -142,20 +142,20 @@ create view permanent_v1 as select * from orders;
142142

143143
# test show create a permanent view
144144
show create view permanent_v1;
145-
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
146-
| result |
147-
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
145+
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
146+
| result |
147+
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
148148
| CREATE VIEW `default_catalog`.`default_database`.`permanent_v1` (
149149
`user`,
150150
`product`,
151151
`amount`,
152152
`ts`,
153153
`ptime`
154154
)
155-
AS SELECT *
156-
FROM `default_catalog`.`default_database`.`orders`
155+
AS SELECT `orders`.`user`, `orders`.`product`, `orders`.`amount`, `orders`.`ts`, `orders`.`ptime`
156+
FROM `default_catalog`.`default_database`.`orders` AS `orders`
157157
|
158-
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
158+
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
159159
1 row in set
160160
!ok
161161

flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,7 +1088,7 @@ void testAlterMaterializedTableAsQueryInFullMode() throws Exception {
10881088
String.format(
10891089
"SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) AS `order_amount_sum`\n"
10901090
+ "FROM (SELECT `my_source`.`user_id`, `my_source`.`shop_id`, `my_source`.`order_created_at` AS `ds`, `my_source`.`order_id`, 1 AS `order_amount`\n"
1091-
+ "FROM `%s`.`test_db`.`my_source`) AS `tmp`\n"
1091+
+ "FROM `%s`.`test_db`.`my_source` AS `my_source`) AS `tmp`\n"
10921092
+ "GROUP BY ROW(`tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`)",
10931093
fileSystemCatalogName));
10941094
// the refresh handler in full mode should be the same as the old one
@@ -1228,7 +1228,7 @@ void testAlterMaterializedTableAsQueryInFullModeWithSuspendStatus() throws Excep
12281228
String.format(
12291229
"SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) AS `order_amount_sum`\n"
12301230
+ "FROM (SELECT `my_source`.`user_id`, `my_source`.`shop_id`, `my_source`.`order_created_at` AS `ds`, `my_source`.`order_id`, 1 AS `order_amount`\n"
1231-
+ "FROM `%s`.`test_db`.`my_source`) AS `tmp`\n"
1231+
+ "FROM `%s`.`test_db`.`my_source` AS `my_source`) AS `tmp`\n"
12321232
+ "GROUP BY ROW(`tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`)",
12331233
fileSystemCatalogName));
12341234

@@ -1313,11 +1313,11 @@ void testAlterMaterializedTableAsQueryInContinuousMode(@TempDir Path temporaryPa
13131313
assertThat(newTable.getDefinitionQuery())
13141314
.isEqualTo(
13151315
String.format(
1316-
"SELECT COALESCE(`tmp`.`user_id`, 0) AS `user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
1317-
+ "FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, `DATE_FORMAT`(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, `datagenSource`.`payment_amount_cents`\n"
1318-
+ "FROM `%s`.`test_db`.`datagenSource`) AS `tmp`\n"
1316+
"SELECT COALESCE(`tmp`.`user_id`, CAST(0 AS BIGINT)) AS `user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
1317+
+ "FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, `datagenSource`.`payment_amount_cents`\n"
1318+
+ "FROM `%s`.`%s`.`datagenSource` AS `datagenSource`) AS `tmp`\n"
13191319
+ "GROUP BY ROW(`tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`)",
1320-
fileSystemCatalogName));
1320+
fileSystemCatalogName, TEST_DEFAULT_DATABASE));
13211321
assertThat(oldTable.getSerializedRefreshHandler())
13221322
.isNotEqualTo(newTable.getSerializedRefreshHandler());
13231323

@@ -1413,8 +1413,8 @@ void testAlterMaterializedTableAsQueryInContinuousModeWithSuspendStatus(
14131413
.isEqualTo(
14141414
String.format(
14151415
"SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
1416-
+ "FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, `DATE_FORMAT`(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, `datagenSource`.`payment_amount_cents`\n"
1417-
+ "FROM `%s`.`test_db`.`datagenSource`) AS `tmp`\n"
1416+
+ "FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, `datagenSource`.`payment_amount_cents`\n"
1417+
+ "FROM `%s`.`test_db`.`datagenSource` AS `datagenSource`) AS `tmp`\n"
14181418
+ "GROUP BY ROW(`tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`)",
14191419
fileSystemCatalogName));
14201420
assertThat(oldTable.getSerializedRefreshHandler())

flink-table/flink-sql-gateway/src/test/resources/sql/view.q

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v1` (
9191
`ts`,
9292
`ptime`
9393
)
94-
AS SELECT *
95-
FROM `default_catalog`.`default_database`.`orders`
94+
AS SELECT `orders`.`user`, `orders`.`product`, `orders`.`amount`, `orders`.`ts`, `orders`.`ptime`
95+
FROM `default_catalog`.`default_database`.`orders` AS `orders`
9696
!ok
9797

9898
# test show create a temporary view reference another view
@@ -105,8 +105,8 @@ CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v2` (
105105
`ts`,
106106
`ptime`
107107
)
108-
AS SELECT *
109-
FROM `default_catalog`.`default_database`.`v1`
108+
AS SELECT `v1`.`user`, `v1`.`product`, `v1`.`amount`, `v1`.`ts`, `v1`.`ptime`
109+
FROM `default_catalog`.`default_database`.`v1` AS `v1`
110110
!ok
111111

112112
show tables;
@@ -178,8 +178,8 @@ CREATE VIEW `default_catalog`.`default_database`.`permanent_v1` (
178178
`ts`,
179179
`ptime`
180180
)
181-
AS SELECT *
182-
FROM `default_catalog`.`default_database`.`orders`
181+
AS SELECT `orders`.`user`, `orders`.`product`, `orders`.`amount`, `orders`.`ts`, `orders`.`ptime`
182+
FROM `default_catalog`.`default_database`.`orders` AS `orders`
183183
!ok
184184

185185
# remove permanent_v1 view

flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,16 +167,20 @@
167167
* Default implementation of {@link SqlValidator}, the class was copied over because of
168168
* CALCITE-4554.
169169
*
170-
* <p>Lines 197 ~ 200, Flink improves error message for functions without appropriate arguments in
170+
* <p>Lines 202 ~ 205, Flink improves error message for functions without appropriate arguments in
171171
* handleUnresolvedFunction.
172172
*
173-
* <p>Lines 2012 ~ 2032, Flink improves error message for functions without appropriate arguments in
173+
* <p>Lines 1270 ~ 1272, CALCITE-7217, should be removed after upgrading Calcite to 1.41.0.
174+
*
175+
* <p>Lines 2031 ~ 2045, Flink improves error message for functions without appropriate arguments in
174176
* handleUnresolvedFunction at {@link SqlValidatorImpl#handleUnresolvedFunction}.
175177
*
176-
* <p>Lines 3840 ~ 3844, 6511 ~ 6517 Flink improves Optimize the retrieval of sub-operands in
178+
* <p>Lines 2571 ~ 2588, CALCITE-7217, should be removed after upgrading Calcite to 1.41.0.
179+
*
180+
* <p>Lines 3895 ~ 3899, 6574 ~ 6580 Flink improves Optimize the retrieval of sub-operands in
177181
* SqlCall when using NamedParameters at {@link SqlValidatorImpl#checkRollUp}.
178182
*
179-
* <p>Lines 5246 ~ 5252, FLINK-24352 Add null check for temporal table check on SqlSnapshot.
183+
* <p>Lines 5315 ~ 5321, FLINK-24352 Add null check for temporal table check on SqlSnapshot.
180184
*/
181185
public class SqlValidatorImpl implements SqlValidatorWithHints {
182186
// ~ Static fields/initializers ---------------------------------------------
@@ -1263,6 +1267,9 @@ private SqlValidatorScope getScopeOrThrow(SqlNode node) {
12631267
}
12641268
// fall through
12651269
case TABLE_REF:
1270+
// ----- FLINK MODIFICATION BEGIN -----
1271+
case LATERAL:
1272+
// ----- FLINK MODIFICATION END -----
12661273
case SNAPSHOT:
12671274
case OVER:
12681275
case COLLECTION_TABLE:
@@ -2561,7 +2568,9 @@ private SqlNode registerFrom(
25612568
return newNode;
25622569

25632570
case LATERAL:
2564-
return registerFrom(
2571+
// ----- FLINK MODIFICATION BEGIN -----
2572+
SqlBasicCall sbc = (SqlBasicCall) node;
2573+
registerFrom(
25652574
parentScope,
25662575
usingScope,
25672576
register,
@@ -2571,6 +2580,12 @@ private SqlNode registerFrom(
25712580
extendList,
25722581
forceNullable,
25732582
true);
2583+
// Put the usingScope which is a JoinScope,
2584+
// in order to make visible the left items
2585+
// of the JOIN tree.
2586+
scopes.put(node, usingScope);
2587+
return sbc;
2588+
// ----- FLINK MODIFICATION END -----
25742589

25752590
case COLLECTION_TABLE:
25762591
call = (SqlCall) node;

flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -245,18 +245,19 @@
245245
* <p>FLINK modifications are at lines
246246
*
247247
* <ol>
248-
* <li>Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 685 ~ 702
249-
* <li>Added in Flink-24024: Lines 1452 ~ 1458
250-
* <li>Added in Flink-24024: Lines 1472 ~ 1511
251-
* <li>Added in Flink-37269: Lines 2249 ~ 2271
252-
* <li>Added in FLINK-28682: Lines 2382 ~ 2399
253-
* <li>Added in FLINK-28682: Lines 2436 ~ 2464
254-
* <li>Added in FLINK-32474: Lines 2521 ~ 2523
255-
* <li>Added in FLINK-32474: Lines 2527 ~ 2529
256-
* <li>Added in FLINK-32474: Lines 2545 ~ 2547
257-
* <li>Added in FLINK-32474: Lines 2960 ~ 2972
258-
* <li>Added in FLINK-32474: Lines 3073 ~ 3107
259-
* <li>Added in FLINK-34312: Lines 5937 ~ 5948
248+
* <li>Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 686 ~ 703
249+
* <li>Added in Flink-24024: Lines 1453 ~ 1459
250+
* <li>Added in Flink-24024: Lines 1473 ~ 1512
251+
* <li>Added in Flink-37269: Lines 2250 ~ 2272
252+
* <li>Added in FLINK-28682: Lines 2383 ~ 2400
253+
* <li>Added in FLINK-28682: Lines 2437 ~ 2465
254+
* <li>Added in FLINK-32474: Lines 2522 ~ 2524
255+
* <li>Added in FLINK-32474: Lines 2528 ~ 2530
256+
* <li>Added in FLINK-32474: Lines 2546 ~ 2548
257+
* <li>Added in FLINK-32474: Lines 2587 ~ 2595
258+
* <li>Added in FLINK-32474: Lines 2970 ~ 2982
259+
* <li>Added in FLINK-32474: Lines 3083 ~ 3117
260+
* <li>Added in FLINK-34312: Lines 5947 ~ 5958
260261
* </ol>
261262
*
262263
* <p>In official extension point (i.e. {@link #convertExtendedExpression(SqlNode, Blackboard)}):
@@ -2583,6 +2584,15 @@ protected void convertFrom(
25832584
convertCollectionTable(bb, call2);
25842585
return;
25852586

2587+
// ----- FLINK MODIFICATION BEGIN -----
2588+
case LATERAL:
2589+
call = (SqlCall) from;
2590+
// Extract and analyze lateral part of join call.
2591+
assert call.getOperandList().size() == 1;
2592+
final SqlCall callLateral = call.operand(0);
2593+
convertFrom(bb, callLateral, fieldNames);
2594+
return;
2595+
// ----- FLINK MODIFICATION END -----
25862596
default:
25872597
throw new AssertionError("not a join operator " + from);
25882598
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,10 @@ public Operation convertSqlNode(
5252
context.toQuotedSqlString(sqlAlterMaterializedTableAsQuery.getAsQuery());
5353
SqlNode validatedQuery =
5454
context.getSqlValidator().validate(sqlAlterMaterializedTableAsQuery.getAsQuery());
55-
// The LATERAL operator was eliminated during sql validation, thus the unparsed SQL
56-
// does not contain LATERAL which is problematic,
57-
// the issue was resolved in CALCITE-4077
58-
// (always treat the table function as implicitly LATERAL).
59-
String definitionQuery = context.expandSqlIdentifiers(originalQuery);
55+
String definitionQuery = context.toQuotedSqlString(validatedQuery);
6056
PlannerQueryOperation queryOperation =
6157
new PlannerQueryOperation(
62-
context.toRelRoot(validatedQuery).project(), () -> originalQuery);
58+
context.toRelRoot(validatedQuery).project(), () -> definitionQuery);
6359

6460
ResolvedCatalogMaterializedTable oldTable =
6561
getResolvedMaterializedTable(

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -108,19 +108,13 @@ public Operation convertSqlNode(
108108

109109
// get query schema and definition query
110110
SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
111-
String originalQuery = context.toQuotedSqlString(selectQuery);
112-
SqlNode validateQuery = context.getSqlValidator().validate(selectQuery);
111+
SqlNode validatedQuery = context.getSqlValidator().validate(selectQuery);
113112

114-
// The LATERAL operator was eliminated during sql validation, thus the unparsed SQL
115-
// does not contain LATERAL which is problematic,
116-
// the issue was resolved in CALCITE-4077
117-
// (always treat the table function as implicitly LATERAL).
118-
String definitionQuery = context.expandSqlIdentifiers(originalQuery);
113+
String definitionQuery = context.toQuotedSqlString(validatedQuery);
119114

120115
PlannerQueryOperation queryOperation =
121116
new PlannerQueryOperation(
122-
context.toRelRoot(validateQuery).project(),
123-
() -> context.toQuotedSqlString(validateQuery));
117+
context.toRelRoot(validatedQuery).project(), () -> definitionQuery);
124118

125119
// get schema
126120
ResolvedSchema resolvedSchema = queryOperation.getResolvedSchema();

0 commit comments

Comments
 (0)