Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions flink-table/flink-sql-client/src/test/resources/sql/view.q
Original file line number Diff line number Diff line change
Expand Up @@ -59,39 +59,39 @@ create temporary view if not exists v2 as select * from v1;

# test show create a temporary view
show create view v1;
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| result |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| result |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v1` (
`user`,
`product`,
`amount`,
`ts`,
`ptime`
)
AS SELECT *
FROM `default_catalog`.`default_database`.`orders`
AS SELECT `orders`.`user`, `orders`.`product`, `orders`.`amount`, `orders`.`ts`, `orders`.`ptime`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since now validated query is used, star is expanded

FROM `default_catalog`.`default_database`.`orders` AS `orders`
|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set
!ok

# test show create a temporary view reference another view
show create view v2;
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| result |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| result |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v2` (
`user`,
`product`,
`amount`,
`ts`,
`ptime`
)
AS SELECT *
FROM `default_catalog`.`default_database`.`v1`
AS SELECT `v1`.`user`, `v1`.`product`, `v1`.`amount`, `v1`.`ts`, `v1`.`ptime`
FROM `default_catalog`.`default_database`.`v1` AS `v1`
|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set
!ok

Expand Down Expand Up @@ -142,20 +142,20 @@ create view permanent_v1 as select * from orders;

# test show create a permanent view
show create view permanent_v1;
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| result |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| result |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| CREATE VIEW `default_catalog`.`default_database`.`permanent_v1` (
`user`,
`product`,
`amount`,
`ts`,
`ptime`
)
AS SELECT *
FROM `default_catalog`.`default_database`.`orders`
AS SELECT `orders`.`user`, `orders`.`product`, `orders`.`amount`, `orders`.`ts`, `orders`.`ptime`
FROM `default_catalog`.`default_database`.`orders` AS `orders`
|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set
!ok

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ void testAlterMaterializedTableAsQueryInFullMode() throws Exception {
String.format(
"SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) AS `order_amount_sum`\n"
+ "FROM (SELECT `my_source`.`user_id`, `my_source`.`shop_id`, `my_source`.`order_created_at` AS `ds`, `my_source`.`order_id`, 1 AS `order_amount`\n"
+ "FROM `%s`.`test_db`.`my_source`) AS `tmp`\n"
+ "FROM `%s`.`test_db`.`my_source` AS `my_source`) AS `tmp`\n"
+ "GROUP BY ROW(`tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`)",
fileSystemCatalogName));
// the refresh handler in full mode should be the same as the old one
Expand Down Expand Up @@ -1228,7 +1228,7 @@ void testAlterMaterializedTableAsQueryInFullModeWithSuspendStatus() throws Excep
String.format(
"SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) AS `order_amount_sum`\n"
+ "FROM (SELECT `my_source`.`user_id`, `my_source`.`shop_id`, `my_source`.`order_created_at` AS `ds`, `my_source`.`order_id`, 1 AS `order_amount`\n"
+ "FROM `%s`.`test_db`.`my_source`) AS `tmp`\n"
+ "FROM `%s`.`test_db`.`my_source` AS `my_source`) AS `tmp`\n"
+ "GROUP BY ROW(`tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`)",
fileSystemCatalogName));

Expand Down Expand Up @@ -1313,11 +1313,11 @@ void testAlterMaterializedTableAsQueryInContinuousMode(@TempDir Path temporaryPa
assertThat(newTable.getDefinitionQuery())
.isEqualTo(
String.format(
"SELECT COALESCE(`tmp`.`user_id`, 0) AS `user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
+ "FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, `DATE_FORMAT`(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, `datagenSource`.`payment_amount_cents`\n"
+ "FROM `%s`.`test_db`.`datagenSource`) AS `tmp`\n"
"SELECT COALESCE(`tmp`.`user_id`, CAST(0 AS BIGINT)) AS `user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
+ "FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, `datagenSource`.`payment_amount_cents`\n"
+ "FROM `%s`.`%s`.`datagenSource` AS `datagenSource`) AS `tmp`\n"
+ "GROUP BY ROW(`tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`)",
fileSystemCatalogName));
fileSystemCatalogName, TEST_DEFAULT_DATABASE));
assertThat(oldTable.getSerializedRefreshHandler())
.isNotEqualTo(newTable.getSerializedRefreshHandler());

Expand Down Expand Up @@ -1413,8 +1413,8 @@ void testAlterMaterializedTableAsQueryInContinuousModeWithSuspendStatus(
.isEqualTo(
String.format(
"SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
+ "FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, `DATE_FORMAT`(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, `datagenSource`.`payment_amount_cents`\n"
+ "FROM `%s`.`test_db`.`datagenSource`) AS `tmp`\n"
+ "FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, DATE_FORMAT(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, `datagenSource`.`payment_amount_cents`\n"
+ "FROM `%s`.`test_db`.`datagenSource` AS `datagenSource`) AS `tmp`\n"
+ "GROUP BY ROW(`tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`)",
fileSystemCatalogName));
assertThat(oldTable.getSerializedRefreshHandler())
Expand Down
12 changes: 6 additions & 6 deletions flink-table/flink-sql-gateway/src/test/resources/sql/view.q
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v1` (
`ts`,
`ptime`
)
AS SELECT *
FROM `default_catalog`.`default_database`.`orders`
AS SELECT `orders`.`user`, `orders`.`product`, `orders`.`amount`, `orders`.`ts`, `orders`.`ptime`
FROM `default_catalog`.`default_database`.`orders` AS `orders`
!ok

# test show create a temporary view reference another view
Expand All @@ -105,8 +105,8 @@ CREATE TEMPORARY VIEW `default_catalog`.`default_database`.`v2` (
`ts`,
`ptime`
)
AS SELECT *
FROM `default_catalog`.`default_database`.`v1`
AS SELECT `v1`.`user`, `v1`.`product`, `v1`.`amount`, `v1`.`ts`, `v1`.`ptime`
FROM `default_catalog`.`default_database`.`v1` AS `v1`
!ok

show tables;
Expand Down Expand Up @@ -178,8 +178,8 @@ CREATE VIEW `default_catalog`.`default_database`.`permanent_v1` (
`ts`,
`ptime`
)
AS SELECT *
FROM `default_catalog`.`default_database`.`orders`
AS SELECT `orders`.`user`, `orders`.`product`, `orders`.`amount`, `orders`.`ts`, `orders`.`ptime`
FROM `default_catalog`.`default_database`.`orders` AS `orders`
!ok

# remove permanent_v1 view
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,20 @@
* Default implementation of {@link SqlValidator}, the class was copied over because of
* CALCITE-4554.
*
* <p>Lines 197 ~ 200, Flink improves error message for functions without appropriate arguments in
* <p>Lines 202 ~ 205, Flink improves error message for functions without appropriate arguments in
* handleUnresolvedFunction.
*
* <p>Lines 2012 ~ 2032, Flink improves error message for functions without appropriate arguments in
* <p>Lines 1270 ~ 1272, CALCITE-7217, should be removed after upgrading Calcite to 1.41.0.
*
* <p>Lines 2031 ~ 2045, Flink improves error message for functions without appropriate arguments in
* handleUnresolvedFunction at {@link SqlValidatorImpl#handleUnresolvedFunction}.
*
* <p>Lines 3840 ~ 3844, 6511 ~ 6517 Flink improves Optimize the retrieval of sub-operands in
* <p>Lines 2571 ~ 2588, CALCITE-7217, should be removed after upgrading Calcite to 1.41.0.
*
* <p>Lines 3895 ~ 3899, 6574 ~ 6580 Flink improves Optimize the retrieval of sub-operands in
* SqlCall when using NamedParameters at {@link SqlValidatorImpl#checkRollUp}.
*
* <p>Lines 5246 ~ 5252, FLINK-24352 Add null check for temporal table check on SqlSnapshot.
* <p>Lines 5315 ~ 5321, FLINK-24352 Add null check for temporal table check on SqlSnapshot.
*/
public class SqlValidatorImpl implements SqlValidatorWithHints {
// ~ Static fields/initializers ---------------------------------------------
Expand Down Expand Up @@ -1263,6 +1267,9 @@ private SqlValidatorScope getScopeOrThrow(SqlNode node) {
}
// fall through
case TABLE_REF:
// ----- FLINK MODIFICATION BEGIN -----
case LATERAL:
// ----- FLINK MODIFICATION END -----
case SNAPSHOT:
case OVER:
case COLLECTION_TABLE:
Expand Down Expand Up @@ -2561,7 +2568,9 @@ private SqlNode registerFrom(
return newNode;

case LATERAL:
return registerFrom(
// ----- FLINK MODIFICATION BEGIN -----
SqlBasicCall sbc = (SqlBasicCall) node;
registerFrom(
parentScope,
usingScope,
register,
Expand All @@ -2571,6 +2580,12 @@ private SqlNode registerFrom(
extendList,
forceNullable,
true);
// Put the usingScope which is a JoinScope,
// in order to make visible the left items
// of the JOIN tree.
scopes.put(node, usingScope);
return sbc;
// ----- FLINK MODIFICATION END -----

case COLLECTION_TABLE:
call = (SqlCall) node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,18 +245,19 @@
* <p>FLINK modifications are at lines
*
* <ol>
* <li>Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 685 ~ 702
* <li>Added in Flink-24024: Lines 1452 ~ 1458
* <li>Added in Flink-24024: Lines 1472 ~ 1511
* <li>Added in Flink-37269: Lines 2249 ~ 2271
* <li>Added in FLINK-28682: Lines 2382 ~ 2399
* <li>Added in FLINK-28682: Lines 2436 ~ 2464
* <li>Added in FLINK-32474: Lines 2521 ~ 2523
* <li>Added in FLINK-32474: Lines 2527 ~ 2529
* <li>Added in FLINK-32474: Lines 2545 ~ 2547
* <li>Added in FLINK-32474: Lines 2960 ~ 2972
* <li>Added in FLINK-32474: Lines 3073 ~ 3107
* <li>Added in FLINK-34312: Lines 5937 ~ 5948
* <li>Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 686 ~ 703
* <li>Added in Flink-24024: Lines 1453 ~ 1459
* <li>Added in Flink-24024: Lines 1473 ~ 1512
* <li>Added in Flink-37269: Lines 2250 ~ 2272
* <li>Added in FLINK-28682: Lines 2383 ~ 2400
* <li>Added in FLINK-28682: Lines 2437 ~ 2465
* <li>Added in FLINK-32474: Lines 2522 ~ 2524
* <li>Added in FLINK-32474: Lines 2528 ~ 2530
* <li>Added in FLINK-32474: Lines 2546 ~ 2548
* <li>Added in CALCITE-7217: Lines 2587 ~ 2595, should be dropped with upgrade to Calcite 1.41.0
* <li>Added in FLINK-32474: Lines 2970 ~ 2982
* <li>Added in FLINK-32474: Lines 3083 ~ 3117
* <li>Added in FLINK-34312: Lines 5947 ~ 5958
* </ol>
*
* <p>In official extension point (i.e. {@link #convertExtendedExpression(SqlNode, Blackboard)}):
Expand Down Expand Up @@ -2583,6 +2584,15 @@ protected void convertFrom(
convertCollectionTable(bb, call2);
return;

// ----- FLINK MODIFICATION BEGIN -----
case LATERAL:
call = (SqlCall) from;
// Extract and analyze lateral part of join call.
assert call.getOperandList().size() == 1;
final SqlCall callLateral = call.operand(0);
convertFrom(bb, callLateral, fieldNames);
return;
// ----- FLINK MODIFICATION END -----
default:
throw new AssertionError("not a join operator " + from);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,10 @@ public Operation convertSqlNode(
context.toQuotedSqlString(sqlAlterMaterializedTableAsQuery.getAsQuery());
SqlNode validatedQuery =
context.getSqlValidator().validate(sqlAlterMaterializedTableAsQuery.getAsQuery());
// The LATERAL operator was eliminated during sql validation, thus the unparsed SQL
// does not contain LATERAL which is problematic,
// the issue was resolved in CALCITE-4077
// (always treat the table function as implicitly LATERAL).
String definitionQuery = context.expandSqlIdentifiers(originalQuery);
String definitionQuery = context.toQuotedSqlString(validatedQuery);
PlannerQueryOperation queryOperation =
new PlannerQueryOperation(
context.toRelRoot(validatedQuery).project(), () -> originalQuery);
context.toRelRoot(validatedQuery).project(), () -> definitionQuery);

ResolvedCatalogMaterializedTable oldTable =
getResolvedMaterializedTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,13 @@ public Operation convertSqlNode(

// get query schema and definition query
SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
String originalQuery = context.toQuotedSqlString(selectQuery);
SqlNode validateQuery = context.getSqlValidator().validate(selectQuery);
SqlNode validatedQuery = context.getSqlValidator().validate(selectQuery);

// The LATERAL operator was eliminated during sql validation, thus the unparsed SQL
// does not contain LATERAL which is problematic,
// the issue was resolved in CALCITE-4077
// (always treat the table function as implicitly LATERAL).
String definitionQuery = context.expandSqlIdentifiers(originalQuery);
String definitionQuery = context.toQuotedSqlString(validatedQuery);

PlannerQueryOperation queryOperation =
new PlannerQueryOperation(
context.toRelRoot(validateQuery).project(),
() -> context.toQuotedSqlString(validateQuery));
context.toRelRoot(validatedQuery).project(), () -> definitionQuery);

// get schema
ResolvedSchema resolvedSchema = queryOperation.getResolvedSchema();
Expand Down
Loading