Skip to content

Commit 10eb564

Browse files
适配MSSQL CDC
1 parent 607f13d commit 10eb564

File tree

12 files changed

+274
-110
lines changed

12 files changed

+274
-110
lines changed

easy-flink-cdc-boot-starter/pom.xml

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
<maven.compiler.source>8</maven.compiler.source>
1717
<maven.compiler.target>8</maven.compiler.target>
1818
<scala.version>2.12</scala.version>
19-
<flink.version>1.13.6</flink.version>
19+
<flink.version>1.18.0</flink.version>
2020
</properties>
2121

2222
<dependencies>
@@ -47,7 +47,7 @@
4747
</dependency>
4848
<dependency>
4949
<groupId>org.apache.flink</groupId>
50-
<artifactId>flink-clients_2.12</artifactId>
50+
<artifactId>flink-clients</artifactId>
5151
<version>${flink.version}</version>
5252
<exclusions>
5353
<exclusion>
@@ -69,7 +69,7 @@
6969
</dependency>
7070
<dependency>
7171
<groupId>org.apache.flink</groupId>
72-
<artifactId>flink-streaming-java_2.12</artifactId>
72+
<artifactId>flink-streaming-java</artifactId>
7373
<version>${flink.version}</version>
7474
<exclusions>
7575
<exclusion>
@@ -80,9 +80,10 @@
8080
</dependency>
8181
<!--mysql -cdc-->
8282
<dependency>
83-
<groupId>com.ververica</groupId>
83+
<groupId>org.apache.flink</groupId>
8484
<artifactId>flink-connector-mysql-cdc</artifactId>
85-
<version>2.0.0</version>
85+
<!-- 请使用已发布的版本依赖,snapshot 版本的依赖需要本地自行编译。 -->
86+
<version>3.1.0</version>
8687
</dependency>
8788
<dependency>
8889
<groupId>com.google.guava</groupId>
@@ -93,6 +94,17 @@
9394
<groupId>com.alibaba</groupId>
9495
<artifactId>fastjson</artifactId>
9596
</dependency>
97+
<!-- sqlserver cdc -->
98+
<dependency>
99+
<groupId>org.apache.flink</groupId>
100+
<artifactId>flink-connector-sqlserver-cdc</artifactId>
101+
<version>3.2.0</version>
102+
</dependency>
103+
<dependency>
104+
<groupId>com.microsoft.sqlserver</groupId>
105+
<artifactId>mssql-jdbc</artifactId>
106+
<version>9.4.1.jre8</version>
107+
</dependency>
96108

97109
<dependency>
98110
<groupId>org.springframework.boot</groupId>

easy-flink-cdc-boot-starter/src/main/java/com/esflink/starter/common/data/DataChangeInfo.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package com.esflink.starter.common.data;
22

33

4+
import java.io.Serializable;
5+
46
/**
57
* 数据变更对象
68
*
79
* @author zhouhongyin
810
* @since 2023/3/5 22:23
911
*/
10-
public class DataChangeInfo {
12+
public class DataChangeInfo implements Serializable {
1113
/**
1214
* 变更前数据
1315
*/
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package com.esflink.starter.common.data;
2+
3+
import com.alibaba.fastjson.JSONObject;
4+
import com.google.common.base.CaseFormat;
5+
import io.debezium.data.Envelope;
6+
import org.apache.flink.api.common.typeinfo.TypeInformation;
7+
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
8+
import org.apache.flink.util.Collector;
9+
import org.apache.kafka.connect.data.Field;
10+
import org.apache.kafka.connect.data.Schema;
11+
import org.apache.kafka.connect.data.Struct;
12+
import org.apache.kafka.connect.source.SourceRecord;
13+
14+
import java.util.List;
15+
import java.util.Optional;
16+
17+
/**
18+
* 描述
19+
*
20+
* @author xlh
21+
* @date 2024/12/19
22+
* @desc
23+
*/
24+
public class MssqlDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> {
25+
26+
public static final String TS_MS = "ts_ms";
27+
public static final String BIN_FILE = "file";
28+
public static final String POS = "pos";
29+
public static final String BEFORE = "before";
30+
public static final String AFTER = "after";
31+
public static final String SOURCE = "source";
32+
public static final DataChangeInfo.EventType READ = DataChangeInfo.EventType.READ;
33+
public static final DataChangeInfo.EventType CREATE = DataChangeInfo.EventType.CREATE;
34+
public static final DataChangeInfo.EventType UPDATE = DataChangeInfo.EventType.UPDATE;
35+
public static final DataChangeInfo.EventType DELETE = DataChangeInfo.EventType.DELETE;
36+
37+
/**
38+
* 反序列化数据,转为变更JSON对象
39+
*
40+
* @param sourceRecord
41+
* @param collector
42+
*/
43+
@Override
44+
public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {
45+
String topic = sourceRecord.topic();
46+
String[] fields = topic.split("\\.");
47+
String database = fields[1];
48+
String tableName = fields[2];
49+
Struct struct = (Struct) sourceRecord.value();
50+
final Struct source = struct.getStruct(SOURCE);
51+
DataChangeInfo dataChangeInfo = new DataChangeInfo();
52+
dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());
53+
dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
54+
//5.获取操作类型 CREATE UPDATE DELETE
55+
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
56+
String type = operation.toString().toUpperCase();
57+
DataChangeInfo.EventType eventType = handleEventType(type);
58+
dataChangeInfo.setEventType(eventType);
59+
dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
60+
dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x -> Integer.parseInt(x.toString())).orElse(0));
61+
dataChangeInfo.setDatabase(database);
62+
dataChangeInfo.setTableName(tableName);
63+
//Object value = struct.get(TS_MS);
64+
Object value = source.get(TS_MS);
65+
dataChangeInfo.setChangeTime(Optional.ofNullable(value).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));
66+
//7.输出数据
67+
collector.collect(dataChangeInfo);
68+
}
69+
70+
private DataChangeInfo.EventType handleEventType(String type) {
71+
DataChangeInfo.EventType eventType = null;
72+
if (type.equals(CREATE.getName()) || type.equals(READ.getName())) {
73+
eventType = CREATE;
74+
} else if (type.equals(UPDATE.getName())) {
75+
eventType = UPDATE;
76+
} else if (type.equals(DELETE.getName())) {
77+
eventType = DELETE;
78+
}
79+
return eventType;
80+
}
81+
82+
/**
83+
* 从原始数据获取出变更之前或之后的数据
84+
*
85+
* @param value 变更数据
86+
* @param fieldElement 属性名
87+
*/
88+
private JSONObject getJsonObject(Struct value, String fieldElement) {
89+
Struct element = value.getStruct(fieldElement);
90+
JSONObject jsonObject = new JSONObject();
91+
if (element != null) {
92+
Schema afterSchema = element.schema();
93+
List<Field> fieldList = afterSchema.fields();
94+
for (Field field : fieldList) {
95+
Object afterValue = element.get(field);
96+
jsonObject.put(CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, field.name()), afterValue);
97+
}
98+
}
99+
return jsonObject;
100+
}
101+
102+
103+
@Override
104+
public TypeInformation<DataChangeInfo> getProducedType() {
105+
return TypeInformation.of(DataChangeInfo.class);
106+
}
107+
}

easy-flink-cdc-boot-starter/src/main/java/com/esflink/starter/common/data/MysqlDeserialization.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
import com.alibaba.fastjson.JSONObject;
44
import com.esflink.starter.common.data.DataChangeInfo.EventType;
55
import com.google.common.base.CaseFormat;
6-
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
76
import io.debezium.data.Envelope;
87
import org.apache.flink.api.common.typeinfo.TypeInformation;
8+
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
99
import org.apache.flink.util.Collector;
1010
import org.apache.kafka.connect.data.Field;
1111
import org.apache.kafka.connect.data.Schema;

easy-flink-cdc-boot-starter/src/main/java/com/esflink/starter/configuration/FlinkJobConfiguration.java

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.esflink.starter.annotation.FlinkSink;
44
import com.esflink.starter.common.data.DataChangeInfo;
55
import com.esflink.starter.common.data.FlinkJobSink;
6+
import com.esflink.starter.common.data.MssqlDeserialization;
67
import com.esflink.starter.common.data.MysqlDeserialization;
78
import com.esflink.starter.constants.BaseEsConstants;
89
import com.esflink.starter.holder.FlinkJobBus;
@@ -15,9 +16,10 @@
1516
import com.esflink.starter.properties.EasyFlinkProperties;
1617
import com.esflink.starter.properties.FlinkJobProperties;
1718
import com.esflink.starter.prox.FlinkSinkProxy;
18-
import com.ververica.cdc.connectors.mysql.MySqlSource;
19-
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
20-
import com.ververica.cdc.debezium.DebeziumSourceFunction;
19+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
20+
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
21+
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
22+
import org.apache.flink.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
2123
import org.apache.flink.streaming.api.datastream.DataStream;
2224
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2325
import org.slf4j.Logger;
@@ -99,12 +101,19 @@ private void initFlinkJob(FlinkJobProperties flinkProperty) throws Exception {
99101

100102
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
101103
env.setParallelism(1);
102-
103-
DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource(flinkProperty, flinkJobIdentity);
104-
DataStream<DataChangeInfo> streamSource = env
105-
.addSource(dataChangeInfoMySqlSource)
106-
.setParallelism(1);
107-
104+
DataStream<DataChangeInfo> streamSource = null;
105+
if ("MYSQL".equalsIgnoreCase(flinkProperty.getDbType())) {
106+
MySqlSource<DataChangeInfo> dataChangeInfoMySqlSource = buildMysqlDataChangeSource(flinkProperty, flinkJobIdentity);
107+
streamSource = env
108+
.fromSource(dataChangeInfoMySqlSource, WatermarkStrategy.noWatermarks(), "MYSQLIncrementalSource")
109+
.setParallelism(1);
110+
}
111+
if ("MSSQL".equalsIgnoreCase(flinkProperty.getDbType())) {
112+
SqlServerSourceBuilder.SqlServerIncrementalSource<DataChangeInfo> dataChangeInfoMySqlSource = buildMssqlDataChangeSource(flinkProperty, flinkJobIdentity);
113+
streamSource = env
114+
.fromSource(dataChangeInfoMySqlSource, WatermarkStrategy.noWatermarks(), "MSSQLIncrementalSource")
115+
.setParallelism(1);
116+
}
108117
FlinkJobSink sinkProxyInstance = (FlinkJobSink) Proxy.newProxyInstance(
109118
FlinkJobSink.class.getClassLoader(),
110119
new Class<?>[]{FlinkJobSink.class},
@@ -117,13 +126,13 @@ private void initFlinkJob(FlinkJobProperties flinkProperty) throws Exception {
117126
}
118127

119128
/**
120-
* 构造变更数据源
129+
* 构造MYSQL变更数据源
121130
*/
122-
private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource(FlinkJobProperties flinkJobProperties, FlinkJobIdentity flinkJobIdentity) {
131+
private MySqlSource<DataChangeInfo> buildMysqlDataChangeSource(FlinkJobProperties flinkJobProperties, FlinkJobIdentity flinkJobIdentity) {
123132
MetaManager metaManager = FlinkJobBus.getMetaManager();
124133
LogPosition cursor = metaManager.getCursor(flinkJobIdentity);
125-
StartupOptions startupOptions = null;
126134

135+
StartupOptions startupOptions = null;
127136
// 有 cursor 信息,默认 TIMESTAMP 方式启动
128137
if (cursor != null) {
129138
startupOptions = StartupOptions.timestamp(cursor.getStartupTimestampMillis() + 1);
@@ -136,18 +145,39 @@ private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource(FlinkJobPro
136145
.tableList(flinkJobProperties.getTableList())
137146
.username(flinkJobProperties.getUsername())
138147
.password(flinkJobProperties.getPassword())
139-
//.serverId(flinkJobProperties.getServerId())
140-
141148
/*initial初始化快照,即全量导入后增量导入(检测更新数据写入)
142149
* latest:只进行增量导入(不读取历史变化)
143150
* timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)
144151
*/
145-
.startupOptions(startupOptions != null ? startupOptions : flinkJobProperties.getStartupOptions())
152+
.startupOptions(startupOptions != null ? startupOptions : flinkJobProperties.getMysqlStartupOptions())
146153
.deserializer(new MysqlDeserialization())
147154
.serverTimeZone(flinkJobProperties.getServerTimeZone())
148155
.build();
149156
}
150157

158+
/**
159+
* 构造MSSQL变更数据源
160+
*/
161+
private SqlServerSourceBuilder.SqlServerIncrementalSource<DataChangeInfo> buildMssqlDataChangeSource(FlinkJobProperties flinkJobProperties, FlinkJobIdentity flinkJobIdentity) {
162+
MetaManager metaManager = FlinkJobBus.getMetaManager();
163+
LogPosition cursor = metaManager.getCursor(flinkJobIdentity);
164+
org.apache.flink.cdc.connectors.base.options.StartupOptions startupOptions = null;
165+
// 有 cursor 信息,默认 TIMESTAMP 方式启动
166+
if (cursor != null) {
167+
startupOptions = org.apache.flink.cdc.connectors.base.options.StartupOptions.timestamp(cursor.getStartupTimestampMillis() + 1);
168+
}
169+
return SqlServerSourceBuilder.SqlServerIncrementalSource.<DataChangeInfo>builder()
170+
.hostname(flinkJobProperties.getHostname())
171+
.port(Integer.parseInt(flinkJobProperties.getPort()))
172+
.databaseList(flinkJobProperties.getDatabaseList())
173+
.tableList(flinkJobProperties.getTableList())
174+
.username(flinkJobProperties.getUsername())
175+
.password(flinkJobProperties.getPassword())
176+
.deserializer(new MssqlDeserialization())
177+
.startupOptions(startupOptions != null ? startupOptions : flinkJobProperties.getMssqlStartupOptions())
178+
.build();
179+
}
180+
151181

152182
@Override
153183
public int getOrder() {

easy-flink-cdc-boot-starter/src/main/java/com/esflink/starter/meta/LogPosition.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package com.esflink.starter.meta;
22

3-
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
4-
53
import java.io.Serializable;
64

75
/**

easy-flink-cdc-boot-starter/src/main/java/com/esflink/starter/properties/FlinkJobProperties.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22

33

44
import com.esflink.starter.configuration.FlinkJobPropertiesConfiguration;
5-
import com.ververica.cdc.connectors.mysql.MySqlSource;
6-
import com.ververica.cdc.connectors.mysql.table.StartupMode;
7-
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
85

96
/**
107
* flink job 配置信息
@@ -20,6 +17,11 @@ public class FlinkJobProperties {
2017

2118
private String port;
2219

20+
/**
21+
* 数据库类型:MSSQL、MYSQL
22+
*/
23+
private String dbType;
24+
2325
/**
2426
* <p>数据库名</p>
2527
* <p>example: database1,database2</p>
@@ -157,16 +159,37 @@ public void setServerId(Integer serverId) {
157159
this.serverId = serverId;
158160
}
159161

160-
public StartupOptions getStartupOptions() {
162+
public String getDbType() {
163+
return dbType;
164+
}
165+
166+
public void setDbType(String dbType) {
167+
this.dbType = dbType;
168+
}
169+
170+
public org.apache.flink.cdc.connectors.mysql.table.StartupOptions getMysqlStartupOptions() {
171+
String startupMode = this.startupMode;
172+
173+
switch (startupMode.toUpperCase()) {
174+
case "INITIAL":
175+
return org.apache.flink.cdc.connectors.mysql.table.StartupOptions.initial();
176+
case "TIMESTAMP":
177+
return org.apache.flink.cdc.connectors.mysql.table.StartupOptions.timestamp(startupTimestampMillis);
178+
default:
179+
return org.apache.flink.cdc.connectors.mysql.table.StartupOptions.latest();
180+
}
181+
}
182+
183+
public org.apache.flink.cdc.connectors.base.options.StartupOptions getMssqlStartupOptions() {
161184
String startupMode = this.startupMode;
162185

163186
switch (startupMode.toUpperCase()) {
164187
case "INITIAL":
165-
return StartupOptions.initial();
188+
return org.apache.flink.cdc.connectors.base.options.StartupOptions.initial();
166189
case "TIMESTAMP":
167-
return StartupOptions.timestamp(startupTimestampMillis);
190+
return org.apache.flink.cdc.connectors.base.options.StartupOptions.timestamp(startupTimestampMillis);
168191
default:
169-
return StartupOptions.latest();
192+
return org.apache.flink.cdc.connectors.base.options.StartupOptions.latest();
170193
}
171194
}
172195
}

0 commit comments

Comments
 (0)