Skip to content

Commit 48cf2d1

Browse files
Merge pull request taozhi8833998#2122 from taozhi8833998/feat-create-table-with-flink
feat: support with clause in create table stmt for flink
2 parents 9cc0b34 + bba901e commit 48cf2d1

File tree

3 files changed

+37
-29
lines changed

3 files changed

+37
-29
lines changed

pegjs/flinksql.pegjs

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -643,8 +643,18 @@ create_db_stmt
643643
}
644644
}
645645
}
646-
647-
646+
with_table_option
647+
= k:ident_without_kw_type __ KW_ASSIGIN_EQUAL __ v:ident_without_kw_type {
648+
return {
649+
keyword: k,
650+
symbol: '=',
651+
value: v
652+
}
653+
}
654+
with_table_options
655+
= head:with_table_option tail:(__ COMMA __ with_table_option)* {
656+
return createList(head, tail);
657+
}
648658
create_table_stmt
649659
= a:KW_CREATE __
650660
tp:KW_TEMPORARY? __
@@ -653,27 +663,10 @@ create_table_stmt
653663
t:table_ref_list __
654664
c:create_table_definition __
655665
to:table_options? __
666+
wr:(KW_WITH __ LPAREN __ with_table_options __ RPAREN)? __
656667
ir: (KW_IGNORE / KW_REPLACE)? __
657668
as: KW_AS? __
658669
qe: union_stmt? {
659-
/*
660-
export type create_table_stmt_node = create_table_stmt_node_simple | create_table_stmt_node_like;
661-
export interface create_table_stmt_node_base {
662-
type: 'create';
663-
keyword: 'table';
664-
temporary?: 'temporary';
665-
if_not_exists?: 'if not exists';
666-
table: table_ref_list;
667-
}
668-
export interface create_table_stmt_node_simple extends create_table_stmt_node_base{
669-
ignore_replace?: 'ignore' | 'replace';
670-
as?: 'as';
671-
query_expr?: union_stmt_node;
672-
create_definition?: create_table_definition;
673-
table_options?: table_options;
674-
}
675-
=> AstStatement<create_table_stmt_node>
676-
*/
677670
if(t) t.forEach(tt => tableList.add(`create::${tt.db}::${tt.table}`));
678671
return {
679672
tableList: Array.from(tableList),
@@ -688,7 +681,8 @@ create_table_stmt
688681
as: as && as[0].toLowerCase(),
689682
query_expr: qe && qe.ast,
690683
create_definitions: c,
691-
table_options: to
684+
table_options: to,
685+
with: wr && wr[4],
692686
}
693687
}
694688
}
@@ -697,14 +691,8 @@ create_table_stmt
697691
KW_TABLE __
698692
ife:if_not_exists_stmt? __
699693
t:table_ref_list __
694+
wr:(KW_WITH __ LPAREN __ with_table_options __ RPAREN)? __
700695
lt:create_like_table {
701-
/*
702-
703-
export interface create_table_stmt_node_like extends create_table_stmt_node_base{
704-
like: create_like_table;
705-
}
706-
=> AstStatement<create_table_stmt_node>;
707-
*/
708696
if(t) t.forEach(tt => tableList.add(`create::${tt.db}::${tt.table}`));
709697
return {
710698
tableList: Array.from(tableList),
@@ -715,7 +703,8 @@ create_table_stmt
715703
temporary: tp && tp[0].toLowerCase(),
716704
if_not_exists:ife,
717705
table: t,
718-
like: lt
706+
like: lt,
707+
with: wr && wr[4],
719708
}
720709
}
721710
}

src/create.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ function createTableToSQL(stmt) {
7474
replace: orReplace,
7575
partition_of: partitionOf,
7676
query_expr: queryExpr,
77+
with: withExpr,
7778
} = stmt
7879
const sql = [toUpper(type), toUpper(orReplace), toUpper(temporary), toUpper(keyword), toUpper(ifNotExists), tablesToSQL(table)]
7980
if (like) {
@@ -85,6 +86,10 @@ function createTableToSQL(stmt) {
8586
if (partitionOf) return sql.concat([createTablePartitionOfToSQL(partitionOf)]).filter(hasVal).join(' ')
8687
if (createDefinition) sql.push(`(${createDefinition.map(createDefinitionToSQL).join(', ')})`)
8788
if (tableOptions) sql.push(tableOptions.map(tableOptionToSQL).join(' '))
89+
if (withExpr) {
90+
const withSQL = withExpr.map(withExprItem => [literalToSQL(withExprItem.keyword), toUpper(withExprItem.symbol), literalToSQL(withExprItem.value)].join(' ')).join(', ')
91+
sql.push(`WITH (${withSQL})`)
92+
}
8893
sql.push(toUpper(ignoreReplace), toUpper(as))
8994
if (queryExpr) sql.push(unionToSQL(queryExpr))
9095
return sql.filter(hasVal).join(' ')

test/flink.spec.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,20 @@ describe('Flink', () => {
412412
"SELECT `name`, `eventTime`, `eventDetail` FROM (SELECT concat('AK中文信息') AS `name`, CAST(`event_time` AS VARCHAR) AS `eventTime`, JSON_OBJECT('risk-tag' VALUE `risk_tag`, 'abc' VALUE (10 * 2), 'user-agent' VALUE JSON_OBJECT('city' VALUE 'New York' ON NULL NULL, 'postalCode' VALUE '10001' ON NULL ABSENT)) AS `eventDetail` FROM `check_risk`)"
413413
]
414414
},
415+
{
416+
title: "create table",
417+
sql: [
418+
"CREATE TABLE Orders (`user` BIGINT)",
419+
"CREATE TABLE `Orders` (`user` BIGINT)",
420+
],
421+
},
422+
{
423+
title: "create table with options",
424+
sql: [
425+
"CREATE TABLE Orders (`user` BIGINT) WITH ('connector' = 'kafka')",
426+
"CREATE TABLE `Orders` (`user` BIGINT) WITH ('connector' = 'kafka')",
427+
],
428+
}
415429
];
416430

417431
SQL_LIST.forEach(sqlInfo => {

0 commit comments

Comments
 (0)