Skip to content

Commit

Permalink
Merge branch 'dev' of https://github.com//DataLinkDC/dlink into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
walkhan committed Feb 10, 2022
2 parents 79356c8 + 0a474c0 commit b153cd3
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,26 @@ public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig
if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){
sourceBuilder.tableList(config.getTable().toArray(new String[0]));
}
MySqlSource<String> sourceFunction = sourceBuilder
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.latest())
.build();
MySqlSourceBuilder<String> builder = sourceBuilder
.deserializer(new JsonDebeziumDeserializationSchema());
if(Asserts.isNotNullString(config.getStartupMode())){
switch (config.getStartupMode().toUpperCase()){
case "INITIAL":
builder.startupOptions(StartupOptions.initial());
break;
case "EARLIEST":
builder.startupOptions(StartupOptions.earliest());
break;
case "LATEST":
builder.startupOptions(StartupOptions.latest());
break;
default:
builder.startupOptions(StartupOptions.latest());
}
}else {
builder.startupOptions(StartupOptions.latest());
}
MySqlSource<String> sourceFunction = builder.build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,26 @@ public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig
if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){
sourceBuilder.tableList(config.getTable().toArray(new String[0]));
}
MySqlSource<String> sourceFunction = sourceBuilder
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.latest())
.build();
MySqlSourceBuilder<String> builder = sourceBuilder
.deserializer(new JsonDebeziumDeserializationSchema());
if(Asserts.isNotNullString(config.getStartupMode())){
switch (config.getStartupMode().toUpperCase()){
case "INITIAL":
builder.startupOptions(StartupOptions.initial());
break;
case "EARLIEST":
builder.startupOptions(StartupOptions.earliest());
break;
case "LATEST":
builder.startupOptions(StartupOptions.latest());
break;
default:
builder.startupOptions(StartupOptions.latest());
}
}else {
builder.startupOptions(StartupOptions.latest());
}
MySqlSource<String> sourceFunction = builder.build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,26 @@ public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig
if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){
sourceBuilder.tableList(config.getTable().toArray(new String[0]));
}
MySqlSource<String> sourceFunction = sourceBuilder
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.latest())
.build();
MySqlSourceBuilder<String> builder = sourceBuilder
.deserializer(new JsonDebeziumDeserializationSchema());
if(Asserts.isNotNullString(config.getStartupMode())){
switch (config.getStartupMode().toUpperCase()){
case "INITIAL":
builder.startupOptions(StartupOptions.initial());
break;
case "EARLIEST":
builder.startupOptions(StartupOptions.earliest());
break;
case "LATEST":
builder.startupOptions(StartupOptions.latest());
break;
default:
builder.startupOptions(StartupOptions.latest());
}
}else {
builder.startupOptions(StartupOptions.latest());
}
MySqlSource<String> sourceFunction = builder.build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,26 @@ public static void buildMySqlCDC(StreamExecutionEnvironment env, FlinkCDCConfig
if(Asserts.isNotNull(config.getTable())&&config.getTable().size()>0){
sourceBuilder.tableList(config.getTable().toArray(new String[0]));
}
MySqlSource<String> sourceFunction = sourceBuilder
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.latest())
.build();
MySqlSourceBuilder<String> builder = sourceBuilder
.deserializer(new JsonDebeziumDeserializationSchema());
if(Asserts.isNotNullString(config.getStartupMode())){
switch (config.getStartupMode().toUpperCase()){
case "INITIAL":
builder.startupOptions(StartupOptions.initial());
break;
case "EARLIEST":
builder.startupOptions(StartupOptions.earliest());
break;
case "LATEST":
builder.startupOptions(StartupOptions.latest());
break;
default:
builder.startupOptions(StartupOptions.latest());
}
}else {
builder.startupOptions(StartupOptions.latest());
}
MySqlSource<String> sourceFunction = builder.build();
DataStreamSource<String> streamSource = env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "MySQL Source");
streamSource.addSink(getKafkaProducer(config.getBrokers(),config.getTopic()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ public class FlinkCDCConfig {
private Integer parallelism;
private List<String> database;
private List<String> table;
private String startupMode;
private String topic;
private String brokers;

public FlinkCDCConfig() {
}

public FlinkCDCConfig(String hostname, int port, String username, String password, int checkpoint, int parallelism, List<String> database, List<String> table, String topic, String brokers) {
public FlinkCDCConfig(String hostname, int port, String username, String password, int checkpoint, int parallelism, List<String> database, List<String> table, String startupMode, String topic, String brokers) {
this.hostname = hostname;
this.port = port;
this.username = username;
Expand All @@ -33,6 +34,7 @@ public FlinkCDCConfig(String hostname, int port, String username, String passwor
this.parallelism = parallelism;
this.database = database;
this.table = table;
this.startupMode = startupMode;
this.topic = topic;
this.brokers = brokers;
}
Expand All @@ -45,11 +47,11 @@ public void setHostname(String hostname) {
this.hostname = hostname;
}

public int getPort() {
public Integer getPort() {
return port;
}

public void setPort(int port) {
public void setPort(Integer port) {
this.port = port;
}

Expand All @@ -69,19 +71,19 @@ public void setPassword(String password) {
this.password = password;
}

public int getCheckpoint() {
public Integer getCheckpoint() {
return checkpoint;
}

public void setCheckpoint(int checkpoint) {
public void setCheckpoint(Integer checkpoint) {
this.checkpoint = checkpoint;
}

public int getParallelism() {
public Integer getParallelism() {
return parallelism;
}

public void setParallelism(int parallelism) {
public void setParallelism(Integer parallelism) {
this.parallelism = parallelism;
}

Expand Down Expand Up @@ -116,4 +118,12 @@ public String getBrokers() {
public void setBrokers(String brokers) {
this.brokers = brokers;
}

public String getStartupMode() {
return startupMode;
}

public void setStartupMode(String startupMode) {
this.startupMode = startupMode;
}
}
28 changes: 12 additions & 16 deletions dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ public class CDCSource {
private Integer parallelism;
private List<String> database;
private List<String> table;
private String startupMode;
private String topic;
private String brokers;

public CDCSource(String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String topic, String brokers) {
public CDCSource(String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode, String topic, String brokers) {
this.statement = statement;
this.name = name;
this.hostname = hostname;
Expand All @@ -41,21 +42,7 @@ public CDCSource(String statement, String name, String hostname, Integer port, S
this.password = password;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.topic = topic;
this.brokers = brokers;
}

public CDCSource(String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, List<String> database, List<String> table, String topic, String brokers) {
this.statement = statement;
this.name = name;
this.hostname = hostname;
this.port = port;
this.username = username;
this.password = password;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.database = database;
this.table = table;
this.startupMode = startupMode;
this.topic = topic;
this.brokers = brokers;
}
Expand All @@ -71,6 +58,7 @@ public static CDCSource build(String statement) {
config.get("password"),
Integer.valueOf(config.get("checkpoint")),
Integer.valueOf(config.get("parallelism")),
config.get("startup"),
config.get("topic"),
config.get("brokers")
);
Expand Down Expand Up @@ -190,4 +178,12 @@ public String getBrokers() {
public void setBrokers(String brokers) {
this.brokers = brokers;
}

public String getStartupMode() {
return startupMode;
}

public void setStartupMode(String startupMode) {
this.startupMode = startupMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void build(Executor executor) {
CDCSource cdcSource = CDCSource.build(statement);
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getHostname(),cdcSource.getPort(),cdcSource.getUsername()
,cdcSource.getPassword(),cdcSource.getCheckpoint(),cdcSource.getParallelism(),cdcSource.getDatabase(),cdcSource.getTable()
,cdcSource.getTopic(),cdcSource.getBrokers());
,cdcSource.getStartupMode(),cdcSource.getTopic(),cdcSource.getBrokers());
try {
FlinkCDCMergeBuilder.buildMySqlCDC(executor.getStreamExecutionEnvironment(),config);
} catch (Exception e) {
Expand Down

0 comments on commit b153cd3

Please sign in to comment.