Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add actionType and refact clinet-base and common Table class #1588

Merged
merged 14 commits into from
Feb 1, 2023
Prev Previous commit
Next Next commit
refactor: Table getFlinkDDL function, and extract FlinkCDCConfig conf…
…igure constant variable.
  • Loading branch information
leechor committed Feb 1, 2023
commit 33a987b762e297c610df8af6cfbcfe4cc086dfb4
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* AbstractCDCBuilder
*
Expand Down Expand Up @@ -350,8 +351,8 @@ protected Object convertValue(Object value, LogicalType logicalType) {
@Override
public String getSinkSchemaName(Table table) {
String schemaName = table.getSchema();
if (config.getSink().containsKey("sink.db")) {
schemaName = config.getSink().get("sink.db");
if (config.getSink().containsKey(FlinkCDCConfig.SINK_DB)) {
schemaName = config.getSink().get(FlinkCDCConfig.SINK_DB);
}
return schemaName;
}
Expand All @@ -364,19 +365,19 @@ public String getSinkTableName(Table table) {
tableName = table.getSchema() + "_" + tableName;
}
}
if (config.getSink().containsKey("table.prefix")) {
tableName = config.getSink().get("table.prefix") + tableName;
if (config.getSink().containsKey(FlinkCDCConfig.TABLE_PREFIX)) {
tableName = config.getSink().get(FlinkCDCConfig.TABLE_PREFIX) + tableName;
}
if (config.getSink().containsKey("table.suffix")) {
tableName = tableName + config.getSink().get("table.suffix");
if (config.getSink().containsKey(FlinkCDCConfig.TABLE_SUFFIX)) {
tableName = tableName + config.getSink().get(FlinkCDCConfig.TABLE_SUFFIX);
}
if (config.getSink().containsKey("table.lower")) {
if (Boolean.valueOf(config.getSink().get("table.lower"))) {
if (config.getSink().containsKey(FlinkCDCConfig.TABLE_LOWER)) {
if (Boolean.valueOf(config.getSink().get(FlinkCDCConfig.TABLE_LOWER))) {
tableName = tableName.toLowerCase();
}
}
if (config.getSink().containsKey("table.upper")) {
if (Boolean.valueOf(config.getSink().get("table.upper"))) {
if (config.getSink().containsKey(FlinkCDCConfig.TABLE_UPPER)) {
if (Boolean.valueOf(config.getSink().get(FlinkCDCConfig.TABLE_UPPER))) {
tableName = tableName.toUpperCase();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ public DataStreamSource build(
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final String timeZone = config.getSink().get("timezone");
config.getSink().remove("timezone");
final String timeZone = config.getSink().get(FlinkCDCConfig.TIMEZONE);
config.getSink().remove(FlinkCDCConfig.TIMEZONE);
if (Asserts.isNotNullString(timeZone)) {
sinkTimeZone = ZoneId.of(timeZone);
}
Expand Down
51 changes: 51 additions & 0 deletions dinky-client/dinky-client-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,57 @@
<groupId>org.dinky</groupId>
<artifactId>dinky-common</artifactId>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<type>jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* FlinkCDCConfig
Expand All @@ -30,6 +31,14 @@
*/
public class FlinkCDCConfig {

public static final String SINK_DB = "sink.db";
public static final String AUTO_CREATE = "auto.create";
public static final String TABLE_PREFIX = "table.prefix";
public static final String TABLE_SUFFIX = "table.suffix";
public static final String TABLE_UPPER = "table.upper";
public static final String TABLE_LOWER = "table.lower";
public static final String COLUMN_REPLACE_LINE_BREAK = "column.replace.line-break";
public static final String TIMEZONE = "timezone";
private String type;
private String hostname;
private Integer port;
Expand All @@ -50,8 +59,6 @@ public class FlinkCDCConfig {
private List<Schema> schemaList;
private String schemaFieldName;

public FlinkCDCConfig() {}

public FlinkCDCConfig(
String type,
String hostname,
Expand All @@ -69,22 +76,21 @@ public FlinkCDCConfig(
Map<String, String> source,
Map<String, String> sink,
Map<String, String> jdbc) {
this.type = type;
this.hostname = hostname;
this.port = port;
this.username = username;
this.password = password;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.database = database;
this.schema = schema;
this.table = table;
this.startupMode = startupMode;
this.split = split;
this.debezium = debezium;
this.source = source;
this.sink = sink;
this.jdbc = jdbc;
init(type,
hostname,
port,
username,
password,
checkpoint,
parallelism,
database,
schema,
table,
startupMode, split,
debezium,
source,
sink,
jdbc);
}

public void init(
Expand Down Expand Up @@ -122,7 +128,32 @@ public void init(
this.jdbc = jdbc;
}

public String getType() {
private boolean isSkip(String key) {
switch (key) {
case SINK_DB:
case AUTO_CREATE:
case TABLE_PREFIX:
case TABLE_SUFFIX:
case TABLE_UPPER:
case TABLE_LOWER:
case COLUMN_REPLACE_LINE_BREAK:
case TIMEZONE:
return true;
default:
return false;
}
}

public String getSinkConfigurationString() {
List<String> sinkConfiguration = sink.entrySet().stream()
.filter(t -> !isSkip(t.getKey()))
.map(t -> String.format("'%s' = '%s'", t.getKey(), t.getValue()))
.collect(Collectors.toList());

return String.join(",\n", sinkConfiguration);
}

public String getType() {
return type;
}

Expand Down Expand Up @@ -222,42 +253,6 @@ public void setSchemaTableNameList(List<String> schemaTableNameList) {
this.schemaTableNameList = schemaTableNameList;
}

private boolean skip(String key) {
switch (key) {
case "sink.db":
case "auto.create":
case "table.prefix":
case "table.suffix":
case "table.upper":
case "table.lower":
case "column.replace.line-break":
case "timezone":
return true;
default:
return false;
}
}

public String getSinkConfigurationString() {
StringBuilder sb = new StringBuilder();
int index = 0;
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (skip(entry.getKey())) {
continue;
}
if (index > 0) {
sb.append(",");
}
sb.append("'");
sb.append(entry.getKey());
sb.append("' = '");
sb.append(entry.getValue());
sb.append("'\n");
index++;
}
return sb.toString();
}

public void setSink(Map<String, String> sink) {
this.sink = sink;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@
*/
public class LineageRel {

private String sourceCatalog;
private final String sourceCatalog;

private String sourceDatabase;
private final String sourceDatabase;

private String sourceTable;
private final String sourceTable;

private String sourceColumn;
private final String sourceColumn;

private String targetCatalog;
private final String targetCatalog;

private String targetDatabase;
private final String targetDatabase;

private String targetTable;
private final String targetTable;

private String targetColumn;
private final String targetColumn;

private static final String DELIMITER = ".";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,11 @@ public static String convertSinkColumnType(String type, FlinkCDCConfig config) {
}

public static String getColumnProcessing(Column column, FlinkCDCConfig config) {
if ("true".equals(config.getSink().get("column.replace.line-break"))
if ("true".equals(config.getSink().get(FlinkCDCConfig.COLUMN_REPLACE_LINE_BREAK))
&& ColumnType.STRING.equals(column.getJavaType())) {
return "REGEXP_REPLACE(`"
+ column.getName()
+ "`, '\\n', '') AS `"
+ column.getName()
+ "`";
return String.format("REGEXP_REPLACE(`%s`, '\\n', '') AS `%s`", column.getName(), column.getName());
} else {
return "`" + column.getName() + "`";
return String.format("`%s`", column.getName());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.dinky.model;


import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

/**
*
*/
class FlinkCDCConfigTest {

@Test
void getSinkConfigurationString() {
Map<String, String> sinkConfig = new HashMap<>();
sinkConfig.put(FlinkCDCConfig.SINK_DB, "sink_db");
sinkConfig.put("propertyOne", "propertyOneValue");
sinkConfig.put("propertyTwo", "propertyTwoValue");

FlinkCDCConfig flinkCDCConfig = new FlinkCDCConfig(null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
sinkConfig,
null);
String sinkConfigureStr = flinkCDCConfig.getSinkConfigurationString();
assertThat(sinkConfigureStr, equalTo("'propertyOne' = 'propertyOneValue',\n" +
"'propertyTwo' = 'propertyTwoValue'"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.dinky.utils;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.*;

/**
*
*/
class FlinkBaseUtilTest {

@Test
void getParamsFromArgs() {
}

@Test
void getCDCSqlInsert() {
// FlinkBaseUtil.getCDCSqlInsert("TableName", "SourceName", "")
}

@Test
void getFlinkDDL() {
}

@Test
void getSinkConfigurationString() {
}

@Test
void convertSinkColumnType() {
}

@Test
void getColumnProcessing() {
}
}
Loading