Skip to content

Commit

Permalink
refactor: Table getFlinkDDL function, and extract FlinkCDCConfig conf…
Browse files Browse the repository at this point in the history
…igure constant variable.
  • Loading branch information
leechor committed Feb 1, 2023
1 parent 33a987b commit 1d686be
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public FlinkCDCConfig(
Map<String, String> source,
Map<String, String> sink,
Map<String, String> jdbc) {
init(type,
init(
type,
hostname,
port,
username,
Expand All @@ -86,7 +87,8 @@ public FlinkCDCConfig(
database,
schema,
table,
startupMode, split,
startupMode,
split,
debezium,
source,
sink,
Expand Down Expand Up @@ -145,15 +147,16 @@ private boolean isSkip(String key) {
}

public String getSinkConfigurationString() {
List<String> sinkConfiguration = sink.entrySet().stream()
.filter(t -> !isSkip(t.getKey()))
.map(t -> String.format("'%s' = '%s'", t.getKey(), t.getValue()))
.collect(Collectors.toList());
List<String> sinkConfiguration =
sink.entrySet().stream()
.filter(t -> !isSkip(t.getKey()))
.map(t -> String.format("'%s' = '%s'", t.getKey(), t.getValue()))
.collect(Collectors.toList());

return String.join(",\n", sinkConfiguration);
}

public String getType() {
public String getType() {
return type;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ public static String convertSinkColumnType(String type, FlinkCDCConfig config) {
public static String getColumnProcessing(Column column, FlinkCDCConfig config) {
if ("true".equals(config.getSink().get(FlinkCDCConfig.COLUMN_REPLACE_LINE_BREAK))
&& ColumnType.STRING.equals(column.getJavaType())) {
return String.format("REGEXP_REPLACE(`%s`, '\\n', '') AS `%s`", column.getName(), column.getName());
return String.format(
"REGEXP_REPLACE(`%s`, '\\n', '') AS `%s`", column.getName(), column.getName());
} else {
return String.format("`%s`", column.getName());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,33 @@
package org.dinky.model;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.model;

import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import org.junit.jupiter.api.Test;

/**
*
*/
/** */
class FlinkCDCConfigTest {

@Test
Expand All @@ -21,24 +37,29 @@ void getSinkConfigurationString() {
sinkConfig.put("propertyOne", "propertyOneValue");
sinkConfig.put("propertyTwo", "propertyTwoValue");

FlinkCDCConfig flinkCDCConfig = new FlinkCDCConfig(null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
sinkConfig,
null);
FlinkCDCConfig flinkCDCConfig =
new FlinkCDCConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
sinkConfig,
null);
String sinkConfigureStr = flinkCDCConfig.getSinkConfigurationString();
assertThat(sinkConfigureStr, equalTo("'propertyOne' = 'propertyOneValue',\n" +
"'propertyTwo' = 'propertyTwoValue'"));
assertThat(
sinkConfigureStr,
equalTo(
"'propertyOne' = 'propertyOneValue',\n"
+ "'propertyTwo' = 'propertyTwoValue'"));
}
}
Original file line number Diff line number Diff line change
@@ -1,36 +1,48 @@
package org.dinky.utils;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import org.junit.jupiter.api.Test;
package org.dinky.utils;

import static org.junit.jupiter.api.Assertions.*;

/**
*
*/
import org.junit.jupiter.api.Test;

/** */
class FlinkBaseUtilTest {

@Test
void getParamsFromArgs() {
}
void getParamsFromArgs() {}

@Test
void getCDCSqlInsert() {
// FlinkBaseUtil.getCDCSqlInsert("TableName", "SourceName", "")
// FlinkBaseUtil.getCDCSqlInsert("TableName", "SourceName", "")
}

@Test
void getFlinkDDL() {
}
void getFlinkDDL() {}

@Test
void getSinkConfigurationString() {
}
void getSinkConfigurationString() {}

@Test
void convertSinkColumnType() {
}
void convertSinkColumnType() {}

@Test
void getColumnProcessing() {
}
void getColumnProcessing() {}
}
8 changes: 4 additions & 4 deletions dinky-common/src/main/java/org/dinky/utils/SqlUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ public static String replaceAllParam(String sql, String name, String value) {
}

/**
* replace sql context with values params, map's key is origin variable express by `${key}`, value is
* replacement.
* for example, if key="name", value="replacement", and sql is "${name}", the result will be "replacement".
* replace sql context with values params, map's key is origin variable express by `${key}`,
* value is replacement. for example, if key="name", value="replacement", and sql is "${name}",
* the result will be "replacement".
*
* @param sql sql context
* @param values replacement
* @return replace variable result
* @return replace variable result
*/
public static String replaceAllParam(String sql, Map<String, String> values) {
for (Map.Entry<String, String> entry : values.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ Driver checkAndCreateSinkSchema(FlinkCDCConfig config, String schemaName) throws
return null;
}
String url = sink.get("url");
String schema = SqlUtil.replaceAllParam(sink.get(FlinkCDCConfig.SINK_DB), "schemaName", schemaName);
String schema =
SqlUtil.replaceAllParam(sink.get(FlinkCDCConfig.SINK_DB), "schemaName", schemaName);
Driver driver =
Driver.build(
sink.get("connector"), url, sink.get("username"), sink.get("password"));
Expand Down

0 comments on commit 1d686be

Please sign in to comment.