Skip to content

Commit 983aa25

Browse files
committed
[FLINK-35633][cli] Verify pipeline definition early
1 parent 323fc05 commit 983aa25

30 files changed

+1075
-38
lines changed

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.cdc.cli;
1919

20+
import org.apache.flink.cdc.cli.guard.PipelineDefGuard;
2021
import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser;
2122
import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
2223
import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
@@ -64,6 +65,9 @@ public PipelineExecution.ExecutionInfo run() throws Exception {
6465
PipelineDef pipelineDef =
6566
pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);
6667

68+
// Verify pipeline definitions
69+
PipelineDefGuard.verify(pipelineDef);
70+
6771
// Create composer
6872
PipelineComposer composer = getComposer();
6973

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.cli.guard;
19+
20+
import org.apache.flink.cdc.common.event.TableId;
21+
import org.apache.flink.cdc.common.exceptions.GuardVerificationException;
22+
23+
import org.apache.calcite.config.Lex;
24+
import org.apache.calcite.sql.parser.SqlParser;
25+
import org.apache.calcite.sql.validate.SqlConformanceEnum;
26+
27+
/** Verification guard for common fields. */
28+
public class CommonDefGuard {
29+
public static void verifyTableQualifier(String rawTableQualifier)
30+
throws GuardVerificationException {
31+
String tableQualifier = rawTableQualifier.replace("\\.", "A");
32+
if (tableQualifier.isEmpty()) {
33+
throw new GuardVerificationException(tableQualifier, "Empty table qualifier.");
34+
}
35+
if (tableQualifier.charAt(0) == '.') {
36+
throw new GuardVerificationException(
37+
rawTableQualifier,
38+
"Dot (.) was used as delimiter between schema and table name, which should not present at the beginning. Other usages in RegExp should be escaped with backslash (\\).");
39+
}
40+
if (tableQualifier.charAt(tableQualifier.length() - 1) == '.') {
41+
throw new GuardVerificationException(
42+
rawTableQualifier,
43+
"Dot (.) was used as delimiter between schema and table name, which should not present at the end. Other usages in RegExp should be escaped with backslash (\\).");
44+
}
45+
46+
int delimiterDotCount = 0;
47+
for (int i = 0; i < tableQualifier.length(); i++) {
48+
// Accessing charAt(i - 1) is safe here
49+
// since tableQualifier[0] can't be '.'
50+
if (tableQualifier.charAt(i) == '.') {
51+
delimiterDotCount++;
52+
}
53+
}
54+
55+
// delimiter dot must present exactly once
56+
if (delimiterDotCount > 2) {
57+
throw new GuardVerificationException(
58+
rawTableQualifier,
59+
"Dot (.) was used as delimiter between schema and table name. Other usages in RegExp should be escaped with backslash (\\).");
60+
}
61+
62+
try {
63+
TableId.parse(tableQualifier);
64+
} catch (IllegalArgumentException e) {
65+
throw new GuardVerificationException(
66+
rawTableQualifier, "Illegal table qualifier " + tableQualifier);
67+
}
68+
}
69+
70+
public static SqlParser getCalciteParser(String sql) {
71+
return SqlParser.create(
72+
sql,
73+
SqlParser.Config.DEFAULT
74+
.withConformance(SqlConformanceEnum.MYSQL_5)
75+
.withCaseSensitive(true)
76+
.withLex(Lex.JAVA));
77+
}
78+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.cli.guard;
19+
20+
import org.apache.flink.cdc.common.exceptions.GuardVerificationException;
21+
import org.apache.flink.cdc.composer.definition.PipelineDef;
22+
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
/** Syntax guard for overall pipeline definition. */
27+
public class PipelineDefGuard {
28+
29+
private static final Logger LOG = LoggerFactory.getLogger(PipelineDefGuard.class);
30+
31+
public static void verify(PipelineDef pipelineDef) throws GuardVerificationException {
32+
LOG.info("Verifying pipeline definition {}", pipelineDef);
33+
34+
pipelineDef.getTransforms().forEach(TransformDefGuard::verify);
35+
pipelineDef.getRoute().forEach(RouteDefGuard::verify);
36+
}
37+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.cli.guard;
19+
20+
import org.apache.flink.cdc.common.exceptions.GuardVerificationException;
21+
import org.apache.flink.cdc.composer.definition.RouteDef;
22+
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import java.util.Optional;
27+
28+
import static org.apache.flink.cdc.cli.guard.CommonDefGuard.verifyTableQualifier;
29+
30+
/** Syntax guard for route definition. */
31+
public class RouteDefGuard {
32+
private static final Logger LOG = LoggerFactory.getLogger(RouteDefGuard.class);
33+
34+
public static void verify(RouteDef routeDef) throws GuardVerificationException {
35+
LOG.info("Verifying route definition {}", routeDef);
36+
37+
Optional<String> replaceSymbol = routeDef.getReplaceSymbol();
38+
39+
verifyTableQualifier(routeDef.getSourceTable());
40+
if (replaceSymbol.isPresent()) {
41+
if (!routeDef.getSinkTable().contains(replaceSymbol.get())) {
42+
throw new GuardVerificationException(
43+
routeDef,
44+
"Replace symbol is specified but not present in sink definition.");
45+
}
46+
verifyTableQualifier(
47+
routeDef.getSinkTable().replace(replaceSymbol.get(), "SomeValidTableName"));
48+
} else {
49+
verifyTableQualifier(routeDef.getSinkTable());
50+
}
51+
}
52+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.cli.guard;
19+
20+
import org.apache.flink.cdc.common.exceptions.GuardVerificationException;
21+
import org.apache.flink.cdc.composer.definition.TransformDef;
22+
23+
import org.apache.flink.shaded.guava31.com.google.common.base.Strings;
24+
25+
import org.apache.calcite.sql.SqlBasicCall;
26+
import org.apache.calcite.sql.SqlIdentifier;
27+
import org.apache.calcite.sql.SqlKind;
28+
import org.apache.calcite.sql.SqlNode;
29+
import org.apache.calcite.sql.SqlSelect;
30+
import org.apache.calcite.sql.parser.SqlParseException;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import java.util.Arrays;
35+
36+
import static org.apache.flink.cdc.cli.guard.CommonDefGuard.getCalciteParser;
37+
import static org.apache.flink.cdc.cli.guard.CommonDefGuard.verifyTableQualifier;
38+
39+
/** Syntax guard for transform definition. */
40+
public class TransformDefGuard {
41+
42+
private static final Logger LOG = LoggerFactory.getLogger(TransformDefGuard.class);
43+
44+
public static void verify(TransformDef transformDef) throws GuardVerificationException {
45+
LOG.info("Verifying transform definition {}", transformDef);
46+
47+
verifyTableQualifier(transformDef.getSourceTable());
48+
49+
verifyKeys(transformDef.getPrimaryKeys());
50+
verifyKeys(transformDef.getPartitionKeys());
51+
52+
transformDef.getProjection().ifPresent(TransformDefGuard::verifyProjectionRule);
53+
transformDef.getFilter().ifPresent(TransformDefGuard::verifyFilterRule);
54+
}
55+
56+
private static void verifyKeys(String keys) throws GuardVerificationException {
57+
if (Strings.isNullOrEmpty(keys)) {
58+
return;
59+
}
60+
Arrays.stream(keys.split(","))
61+
.map(String::trim)
62+
.forEach(TransformDefGuard::verifySqlIdentifier);
63+
}
64+
65+
private static void verifyProjectionRule(String projectionRule)
66+
throws GuardVerificationException {
67+
try {
68+
SqlNode selectNode =
69+
getCalciteParser("select " + projectionRule + " from tb").parseQuery();
70+
if (selectNode instanceof SqlSelect) {
71+
for (SqlNode node : ((SqlSelect) selectNode).getSelectList().getList()) {
72+
if (node instanceof SqlBasicCall) {
73+
SqlBasicCall call = (SqlBasicCall) node;
74+
if (!(SqlKind.AS.equals(call.getOperator().getKind())
75+
&& call.getOperandList().size() == 2
76+
&& call.getOperandList().get(1) instanceof SqlIdentifier)) {
77+
throw new GuardVerificationException(
78+
projectionRule,
79+
String.format(
80+
"%s is neither a column identifier nor an aliased expression. Expected: <Expression> AS <Identifier>",
81+
node));
82+
}
83+
} else if (!(node instanceof SqlIdentifier)) {
84+
throw new GuardVerificationException(
85+
projectionRule,
86+
String.format(
87+
"%s is neither a column identifier nor an aliased expression. Expected: <Expression> AS <Identifier>",
88+
node));
89+
}
90+
}
91+
}
92+
} catch (SqlParseException e) {
93+
throw new GuardVerificationException(
94+
projectionRule, projectionRule + " is not a valid projection rule");
95+
}
96+
}
97+
98+
private static void verifyFilterRule(String filterRule) throws GuardVerificationException {
99+
try {
100+
SqlNode selectNode =
101+
getCalciteParser("select * from tb where " + filterRule).parseQuery();
102+
} catch (SqlParseException e) {
103+
throw new GuardVerificationException(
104+
filterRule, filterRule + " is not a valid filter rule");
105+
}
106+
}
107+
108+
private static void verifySqlIdentifier(String identifier) throws GuardVerificationException {
109+
try {
110+
SqlNode selectNode = getCalciteParser("select " + identifier + " from tb").parseQuery();
111+
if (selectNode instanceof SqlSelect) {
112+
if (!((SqlSelect) selectNode)
113+
.getSelectList().getList().stream()
114+
.allMatch(node -> node instanceof SqlIdentifier)) {
115+
throw new GuardVerificationException(
116+
identifier, identifier + " is not a valid SQL identifier");
117+
}
118+
} else {
119+
throw new GuardVerificationException(
120+
identifier, identifier + " is not a valid SQL identifier");
121+
}
122+
} catch (SqlParseException e) {
123+
throw new GuardVerificationException(
124+
identifier, identifier + " is not a valid SQL identifier");
125+
}
126+
}
127+
}

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.cli.parser;
1919

2020
import org.apache.flink.cdc.common.configuration.Configuration;
21+
import org.apache.flink.cdc.common.exceptions.GuardVerificationException;
2122
import org.apache.flink.cdc.common.utils.StringUtils;
2223
import org.apache.flink.cdc.composer.definition.PipelineDef;
2324
import org.apache.flink.cdc.composer.definition.RouteDef;
@@ -95,6 +96,15 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi
9596
SINK_KEY));
9697

9798
// Transforms are optional
99+
Optional.ofNullable(root.get(TRANSFORM_KEY))
100+
.ifPresent(
101+
node -> {
102+
if (!node.isArray()) {
103+
throw new GuardVerificationException(
104+
node,
105+
"Transform rules should be an array. Maybe missed a hyphen in YAML?");
106+
}
107+
});
98108
List<TransformDef> transformDefs = new ArrayList<>();
99109
Optional.ofNullable(root.get(TRANSFORM_KEY))
100110
.ifPresent(
@@ -103,7 +113,18 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi
103113
transform -> transformDefs.add(toTransformDef(transform))));
104114

105115
// Routes are optional
116+
Optional.ofNullable(root.get(ROUTE_KEY))
117+
.ifPresent(
118+
node -> {
119+
if (!node.isArray()) {
120+
throw new GuardVerificationException(
121+
node,
122+
"Route rules should be an array. Maybe missed a hyphen in YAML?");
123+
}
124+
});
125+
106126
List<RouteDef> routeDefs = new ArrayList<>();
127+
107128
Optional.ofNullable(root.get(ROUTE_KEY))
108129
.ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route))));
109130

0 commit comments

Comments
 (0)