diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java b/dinky-client/dinky-client-1.16/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java index 5fdf43e4d7..1b7c377a38 100644 --- a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java +++ b/dinky-client/dinky-client-1.16/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java @@ -71,6 +71,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * AbstractCDCBuilder * @@ -350,8 +351,8 @@ protected Object convertValue(Object value, LogicalType logicalType) { @Override public String getSinkSchemaName(Table table) { String schemaName = table.getSchema(); - if (config.getSink().containsKey("sink.db")) { - schemaName = config.getSink().get("sink.db"); + if (config.getSink().containsKey(FlinkCDCConfig.SINK_DB)) { + schemaName = config.getSink().get(FlinkCDCConfig.SINK_DB); } return schemaName; } @@ -364,19 +365,19 @@ public String getSinkTableName(Table table) { tableName = table.getSchema() + "_" + tableName; } } - if (config.getSink().containsKey("table.prefix")) { - tableName = config.getSink().get("table.prefix") + tableName; + if (config.getSink().containsKey(FlinkCDCConfig.TABLE_PREFIX)) { + tableName = config.getSink().get(FlinkCDCConfig.TABLE_PREFIX) + tableName; } - if (config.getSink().containsKey("table.suffix")) { - tableName = tableName + config.getSink().get("table.suffix"); + if (config.getSink().containsKey(FlinkCDCConfig.TABLE_SUFFIX)) { + tableName = tableName + config.getSink().get(FlinkCDCConfig.TABLE_SUFFIX); } - if (config.getSink().containsKey("table.lower")) { - if (Boolean.valueOf(config.getSink().get("table.lower"))) { + if (config.getSink().containsKey(FlinkCDCConfig.TABLE_LOWER)) { + if (Boolean.valueOf(config.getSink().get(FlinkCDCConfig.TABLE_LOWER))) { tableName = tableName.toLowerCase(); } } - if (config.getSink().containsKey("table.upper")) { - if (Boolean.valueOf(config.getSink().get("table.upper"))) { + if (config.getSink().containsKey(FlinkCDCConfig.TABLE_UPPER)) { + if (Boolean.valueOf(config.getSink().get(FlinkCDCConfig.TABLE_UPPER))) { tableName = tableName.toUpperCase(); } } diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java b/dinky-client/dinky-client-1.16/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java index 916048d6a5..1907ec492c 100644 --- a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java +++ b/dinky-client/dinky-client-1.16/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java @@ -237,8 +237,8 @@ public DataStreamSource build( StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource dataStreamSource) { - final String timeZone = config.getSink().get("timezone"); - config.getSink().remove("timezone"); + final String timeZone = config.getSink().get(FlinkCDCConfig.TIMEZONE); + config.getSink().remove(FlinkCDCConfig.TIMEZONE); if (Asserts.isNotNullString(timeZone)) { sinkTimeZone = ZoneId.of(timeZone); } diff --git a/dinky-client/dinky-client-base/pom.xml b/dinky-client/dinky-client-base/pom.xml index bdbfbe6414..00c49c54cf 100644 --- a/dinky-client/dinky-client-base/pom.xml +++ b/dinky-client/dinky-client-base/pom.xml @@ -39,6 +39,57 @@ org.dinky dinky-common + + + org.junit.jupiter + junit-jupiter + test + + + org.junit.vintage + junit-vintage-engine + test + + + org.assertj + assertj-core + test + + + org.mockito + mockito-core + jar + test + + + org.powermock + powermock-module-junit4 + jar + test + + + org.powermock + powermock-api-mockito2 + jar + test + + + org.mockito + mockito-core + + + + + org.hamcrest + hamcrest-all + jar + test + + + org.testcontainers + junit-jupiter + test + diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/model/FlinkCDCConfig.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/model/FlinkCDCConfig.java index 37c5a50300..ada0aa4311 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/model/FlinkCDCConfig.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/model/FlinkCDCConfig.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * FlinkCDCConfig @@ -30,6 +31,14 @@ */ public class FlinkCDCConfig { + public static final String SINK_DB = "sink.db"; + public static final String AUTO_CREATE = "auto.create"; + public static final String TABLE_PREFIX = "table.prefix"; + public static final String TABLE_SUFFIX = "table.suffix"; + public static final String TABLE_UPPER = "table.upper"; + public static final String TABLE_LOWER = "table.lower"; + public static final String COLUMN_REPLACE_LINE_BREAK = "column.replace.line-break"; + public static final String TIMEZONE = "timezone"; private String type; private String hostname; private Integer port; @@ -50,8 +59,6 @@ public class FlinkCDCConfig { private List schemaList; private String schemaFieldName; - public FlinkCDCConfig() {} - public FlinkCDCConfig( String type, String hostname, @@ -69,22 +76,23 @@ public FlinkCDCConfig( Map source, Map sink, Map jdbc) { - this.type = type; - this.hostname = hostname; - this.port = port; - this.username = username; - this.password = password; - this.checkpoint = checkpoint; - this.parallelism = parallelism; - this.database = database; - this.schema = schema; - this.table = table; - this.startupMode = startupMode; - this.split = split; - this.debezium = debezium; - this.source = source; - this.sink = sink; - this.jdbc = jdbc; + init( + type, + hostname, + port, + username, + password, + checkpoint, + parallelism, + database, + schema, + table, + startupMode, + split, + debezium, + source, + sink, + jdbc); } public void init( @@ -122,6 +130,32 @@ public void init( this.jdbc = jdbc; } + private boolean isSkip(String key) { + switch (key) { + case SINK_DB: + case AUTO_CREATE: + case TABLE_PREFIX: + case TABLE_SUFFIX: + case TABLE_UPPER: + case TABLE_LOWER: + case COLUMN_REPLACE_LINE_BREAK: + case TIMEZONE: + return true; + default: + return false; + } + } + + public String getSinkConfigurationString() { + List sinkConfiguration = + sink.entrySet().stream() + .filter(t -> !isSkip(t.getKey())) + .map(t -> String.format("'%s' = '%s'", t.getKey(), t.getValue())) + .collect(Collectors.toList()); + + return String.join(",\n", sinkConfiguration); + } + public String getType() { return type; } @@ -222,42 +256,6 @@ public void setSchemaTableNameList(List schemaTableNameList) { this.schemaTableNameList = schemaTableNameList; } - private boolean skip(String key) { - switch (key) { - case "sink.db": - case "auto.create": - case "table.prefix": - case "table.suffix": - case "table.upper": - case "table.lower": - case "column.replace.line-break": - case "timezone": - return true; - default: - return false; - } - } - - public String getSinkConfigurationString() { - StringBuilder sb = new StringBuilder(); - int index = 0; - for (Map.Entry entry : sink.entrySet()) { - if (skip(entry.getKey())) { - continue; - } - if (index > 0) { - sb.append(","); - } - sb.append("'"); - sb.append(entry.getKey()); - sb.append("' = '"); - sb.append(entry.getValue()); - sb.append("'\n"); - index++; - } - return sb.toString(); - } - public void setSink(Map sink) { this.sink = sink; } diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/model/LineageRel.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/model/LineageRel.java index b597ee652f..347b6298c4 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/model/LineageRel.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/model/LineageRel.java @@ -27,21 +27,21 @@ */ public class LineageRel { - private String sourceCatalog; + private final String sourceCatalog; - private String sourceDatabase; + private final String sourceDatabase; - private String sourceTable; + private final String sourceTable; - private String sourceColumn; + private final String sourceColumn; - private String targetCatalog; + private final String targetCatalog; - private String targetDatabase; + private final String targetDatabase; - private String targetTable; + private final String targetTable; - private String targetColumn; + private final String targetColumn; private static final String DELIMITER = "."; diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/utils/FlinkBaseUtil.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/utils/FlinkBaseUtil.java index 5f41fd9908..52fef99534 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/utils/FlinkBaseUtil.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/utils/FlinkBaseUtil.java @@ -150,15 +150,12 @@ public static String convertSinkColumnType(String type, FlinkCDCConfig config) { } public static String getColumnProcessing(Column column, FlinkCDCConfig config) { - if ("true".equals(config.getSink().get("column.replace.line-break")) + if ("true".equals(config.getSink().get(FlinkCDCConfig.COLUMN_REPLACE_LINE_BREAK)) && ColumnType.STRING.equals(column.getJavaType())) { - return "REGEXP_REPLACE(`" - + column.getName() - + "`, '\\n', '') AS `" - + column.getName() - + "`"; + return String.format( + "REGEXP_REPLACE(`%s`, '\\n', '') AS `%s`", column.getName(), column.getName()); } else { - return "`" + column.getName() + "`"; + return String.format("`%s`", column.getName()); } } } diff --git a/dinky-client/dinky-client-base/src/test/java/org/dinky/model/FlinkCDCConfigTest.java b/dinky-client/dinky-client-base/src/test/java/org/dinky/model/FlinkCDCConfigTest.java new file mode 100644 index 0000000000..0e37da1fa4 --- /dev/null +++ b/dinky-client/dinky-client-base/src/test/java/org/dinky/model/FlinkCDCConfigTest.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.model; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +/** */ +class FlinkCDCConfigTest { + + @Test + void getSinkConfigurationString() { + Map sinkConfig = new HashMap<>(); + sinkConfig.put(FlinkCDCConfig.SINK_DB, "sink_db"); + sinkConfig.put("propertyOne", "propertyOneValue"); + sinkConfig.put("propertyTwo", "propertyTwoValue"); + + FlinkCDCConfig flinkCDCConfig = + new FlinkCDCConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + sinkConfig, + null); + String sinkConfigureStr = flinkCDCConfig.getSinkConfigurationString(); + assertThat( + sinkConfigureStr, + equalTo( + "'propertyOne' = 'propertyOneValue',\n" + + "'propertyTwo' = 'propertyTwoValue'")); + } +} diff --git a/dinky-client/dinky-client-base/src/test/java/org/dinky/utils/FlinkBaseUtilTest.java b/dinky-client/dinky-client-base/src/test/java/org/dinky/utils/FlinkBaseUtilTest.java new file mode 100644 index 0000000000..9a17bd5072 --- /dev/null +++ b/dinky-client/dinky-client-base/src/test/java/org/dinky/utils/FlinkBaseUtilTest.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.utils; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +/** */ +class FlinkBaseUtilTest { + + @Test + void getParamsFromArgs() {} + + @Test + void getCDCSqlInsert() { + // FlinkBaseUtil.getCDCSqlInsert("TableName", "SourceName", "") + } + + @Test + void getFlinkDDL() {} + + @Test + void getSinkConfigurationString() {} + + @Test + void convertSinkColumnType() {} + + @Test + void getColumnProcessing() {} +} diff --git a/dinky-common/src/main/java/org/dinky/model/Column.java b/dinky-common/src/main/java/org/dinky/model/Column.java index c0e7f7ab3a..b8298c10e8 100644 --- a/dinky-common/src/main/java/org/dinky/model/Column.java +++ b/dinky-common/src/main/java/org/dinky/model/Column.java @@ -21,6 +21,7 @@ import java.io.Serializable; +import lombok.Builder; import lombok.Getter; import lombok.Setter; @@ -32,6 +33,7 @@ */ @Setter @Getter +@Builder public class Column implements Serializable { private static final long serialVersionUID = 6438514547501611599L; @@ -58,10 +60,11 @@ public String getFlinkType() { return flinkType; } + Integer defaultPrecision = precision; if (precision == null || precision == 0) { - return String.format("%s(%d,%d)", flinkType, 38, scale); + defaultPrecision = 38; } - return String.format("%s(%d,%d)", flinkType, precision, scale); + return String.format("%s(%d,%d)", flinkType, defaultPrecision, scale); } } diff --git a/dinky-common/src/main/java/org/dinky/model/Table.java b/dinky-common/src/main/java/org/dinky/model/Table.java index b7e046f3ff..0bc7d21670 100644 --- a/dinky-common/src/main/java/org/dinky/model/Table.java +++ b/dinky-common/src/main/java/org/dinky/model/Table.java @@ -24,9 +24,12 @@ import java.beans.Transient; import java.io.Serializable; +import java.text.MessageFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import lombok.Getter; import lombok.Setter; @@ -97,71 +100,46 @@ public static Table build(String name, String schema, List columns) { @Transient public String getFlinkTableWith(String flinkConfig) { - String tableWithSql = ""; if (Asserts.isNotNullString(flinkConfig)) { - tableWithSql = SqlUtil.replaceAllParam(flinkConfig, "schemaName", schema); - tableWithSql = SqlUtil.replaceAllParam(tableWithSql, "tableName", name); + return SqlUtil.replaceAllParam( + flinkConfig, Map.of("schemaName", schema, "tableName", name)); } - return tableWithSql; - } - - @Transient - public String getFlinkTableSql(String flinkConfig) { - return getFlinkDDL(flinkConfig, name); + return ""; } @Transient public String getFlinkDDL(String flinkConfig, String tableName) { - StringBuilder sb = new StringBuilder(); - sb.append("CREATE TABLE IF NOT EXISTS " + tableName + " (\n"); - List pks = new ArrayList<>(); - for (int i = 0; i < columns.size(); i++) { - String type = columns.get(i).getFlinkType(); - sb.append(" "); - if (i > 0) { - sb.append(","); - } - sb.append("`" + columns.get(i).getName() + "` " + type); - if (Asserts.isNotNullString(columns.get(i).getComment())) { - if (columns.get(i).getComment().contains("\'") - | columns.get(i).getComment().contains("\"")) { - sb.append( - " COMMENT '" - + columns.get(i).getComment().replaceAll("\"|'", "") - + "'"); - } else { - sb.append(" COMMENT '" + columns.get(i).getComment() + "'"); - } - } - sb.append("\n"); - if (columns.get(i).isKeyFlag()) { - pks.add(columns.get(i).getName()); - } - } - StringBuilder pksb = new StringBuilder("PRIMARY KEY ( "); - for (int i = 0; i < pks.size(); i++) { - if (i > 0) { - pksb.append(","); - } - pksb.append("`" + pks.get(i) + "`"); - } - pksb.append(" ) NOT ENFORCED\n"); - if (pks.size() > 0) { - sb.append(" ,"); - sb.append(pksb); - } - sb.append(")"); - if (Asserts.isNotNullString(comment)) { - if (comment.contains("\'") | comment.contains("\"")) { - sb.append(" COMMENT '" + comment.replaceAll("\"|'", "") + "'\n"); - } else { - sb.append(" COMMENT '" + comment + "'\n"); - } - } - sb.append(" WITH (\n"); - sb.append(flinkConfig); - sb.append(")\n"); - return sb.toString(); + String columnStrs = + columns.stream() + .map( + column -> { + String comment = ""; + if (Asserts.isNotNullString(column.getComment())) { + comment = + String.format( + " COMMENT '%s'", + column.getComment().replaceAll("\"|'", "")); + } + return String.format( + " `%s` %s%s", + column.getName(), column.getFlinkType(), comment); + }) + .collect(Collectors.joining(",\n")); + + String primaryKeyStr = + columns.stream() + .filter(Column::isKeyFlag) + .map(Column::getName) + .map(t -> String.format("`%s`", t)) + .collect( + Collectors.joining( + ",", ",\n PRIMARY KEY ( ", " ) NOT ENFORCED\n")); + + String result = + MessageFormat.format( + "CREATE TABLE IF NOT EXISTS {0} (\n{1}{2}) WITH (\n{3})\n", + tableName, columnStrs, primaryKeyStr, flinkConfig); + return result; } @Transient @@ -220,6 +198,22 @@ public String getFlinkTableSql(String catalogName, String flinkConfig) { return sb.toString(); } + @Override + public Object clone() { + Table table = null; + try { + table = (Table) super.clone(); + } catch (CloneNotSupportedException e) { + e.printStackTrace(); + } + return table; + } + + @Transient + public String getFlinkTableSql(String flinkConfig) { + return getFlinkDDL(flinkConfig, name); + } + @Transient public String getSqlSelect(String catalogName) { StringBuilder sb = new StringBuilder("SELECT\n"); @@ -262,15 +256,4 @@ public String getCDCSqlInsert(String targetName, String sourceName) { sb.append(sourceName); return sb.toString(); } - - @Override - public Object clone() { - Table table = null; - try { - table = (Table) super.clone(); - } catch (CloneNotSupportedException e) { - e.printStackTrace(); - } - return table; - } } diff --git a/dinky-common/src/main/java/org/dinky/utils/SqlUtil.java b/dinky-common/src/main/java/org/dinky/utils/SqlUtil.java index 74923ab530..24e3c0b1d8 100644 --- a/dinky-common/src/main/java/org/dinky/utils/SqlUtil.java +++ b/dinky-common/src/main/java/org/dinky/utils/SqlUtil.java @@ -22,6 +22,8 @@ import org.dinky.assertion.Asserts; import org.dinky.model.SystemConfiguration; +import java.util.Map; + /** * SqlUtil * @@ -64,4 +66,20 @@ public static String removeNote(String sql) { public static String replaceAllParam(String sql, String name, String value) { return sql.replaceAll("\\$\\{" + name + "\\}", value); } + + /** + * replace sql context with values params, map's key is origin variable express by `${key}`, + * value is replacement. for example, if key="name", value="replacement", and sql is "${name}", + * the result will be "replacement". + * + * @param sql sql context + * @param values replacement + * @return replace variable result + */ + public static String replaceAllParam(String sql, Map values) { + for (Map.Entry entry : values.entrySet()) { + sql = replaceAllParam(sql, entry.getKey(), entry.getValue()); + } + return sql; + } } diff --git a/dinky-common/src/test/java/org/dinky/model/TableTest.java b/dinky-common/src/test/java/org/dinky/model/TableTest.java new file mode 100644 index 0000000000..9f87e4c5c4 --- /dev/null +++ b/dinky-common/src/test/java/org/dinky/model/TableTest.java @@ -0,0 +1,93 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.model; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import java.util.Arrays; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** */ +class TableTest { + + private Table table; + private String flinkConfig; + + @BeforeEach + void setUp() { + List columns = + Arrays.asList( + Column.builder() + .name("column1") + .type("int") + .javaType(ColumnType.INT) + .comment("comment abc") + .keyFlag(true) + .build(), + Column.builder() + .name("column2") + .type("varchar") + .javaType(ColumnType.STRING) + .comment("comment 'abc'") + .keyFlag(true) + .build(), + Column.builder() + .name("column3") + .type("double") + .javaType(ColumnType.DOUBLE) + .comment("comment \"abc\"") + .build()); + + table = new Table("TableNameOrigin", "SchemaOrigin", columns); + + flinkConfig = + "${schemaName}=schemaName, ${tableName}=tableName, ${abc}=abc, ${}=null, bcd=bcd"; + } + + @Test + void getFlinkDDL() { + String result = table.getFlinkDDL(flinkConfig, "NewTableName"); + assertThat( + result, + equalTo( + "CREATE TABLE IF NOT EXISTS NewTableName (\n" + + " `column1` INT NOT NULL COMMENT 'comment abc',\n" + + " `column2` STRING COMMENT 'comment abc',\n" + + " `column3` DOUBLE NOT NULL COMMENT 'comment abc',\n" + + " PRIMARY KEY ( `column1`,`column2` ) NOT ENFORCED\n" + + ") WITH (\n" + + "${schemaName}=schemaName, ${tableName}=tableName, ${abc}=abc, ${}=null, bcd=bcd)\n")); + } + + @Test + void getFlinkTableWith() { + String result = table.getFlinkTableWith(flinkConfig); + assertThat( + result, + equalTo( + "SchemaOrigin=schemaName, TableNameOrigin=tableName, ${abc}=abc, ${}=null, " + + "bcd=bcd")); + System.out.println(result); + } +} diff --git a/dinky-executor/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java b/dinky-executor/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java index 6a6282b2bd..527ec39ce1 100644 --- a/dinky-executor/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java +++ b/dinky-executor/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java @@ -228,19 +228,20 @@ public TableResult build(Executor executor) { Driver checkAndCreateSinkSchema(FlinkCDCConfig config, String schemaName) throws Exception { Map sink = config.getSink(); - String autoCreate = sink.get("auto.create"); + String autoCreate = sink.get(FlinkCDCConfig.AUTO_CREATE); if (!Asserts.isEqualsIgnoreCase(autoCreate, "true") || Asserts.isNullString(schemaName)) { return null; } String url = sink.get("url"); - String schema = SqlUtil.replaceAllParam(sink.get("sink.db"), "schemaName", schemaName); + String schema = + SqlUtil.replaceAllParam(sink.get(FlinkCDCConfig.SINK_DB), "schemaName", schemaName); Driver driver = Driver.build( sink.get("connector"), url, sink.get("username"), sink.get("password")); if (null != driver && !driver.existSchema(schema)) { driver.createSchema(schema); } - sink.put("sink.db", schema); + sink.put(FlinkCDCConfig.SINK_DB, schema); sink.put("url", url + "/" + schema); return driver; }