Skip to content

Commit

Permalink
修复字符乱码和时区不正确以及增加datastream实现 (DataLinkDC#845)
Browse files Browse the repository at this point in the history
* fix: MYSQL CDC JsonDebeziumDeserializationSchema 字节转字符提定字符编码 UTF-8

* fix: MYSQL CDC SQLSinkBuilder 转换mysql 类型datetime时,时区不正确,应该设置UTC实区

* [Feature]:
   1:增加datastream-starrocks 实现,目前strrocks官网jar只到1.14对应的flink版本
   2:增加datastream-kafka-json 实现,主要实现抽像出必须的记录数据,增加扩展op、is_delete、schemaName,tableName,ts_ms等字段,方便业务使用

* [optimization]:优化调整重复代码

[optimization]:优化调整重复代码
  • Loading branch information
zhongjingq authored Aug 10, 2022
1 parent a42c7a0 commit 20471f3
Show file tree
Hide file tree
Showing 20 changed files with 1,259 additions and 3 deletions.
6 changes: 6 additions & 0 deletions dlink-client/dlink-client-1.13/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,11 @@
<artifactId>dlink-connector-jdbc-1.13</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>1.2.3_flink-1.13_2.11</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import com.dlink.cdc.doris.DorisSinkBuilder;
import com.dlink.cdc.hudi.HudiSinkBuilder;
import com.dlink.cdc.kafka.KafkaSinkBuilder;
import com.dlink.cdc.kafka.KafkaSinkJsonBuilder;
import com.dlink.cdc.sql.SQLSinkBuilder;
import com.dlink.cdc.starrocks.StarrocksSinkBuilder;
import com.dlink.exception.FlinkClientException;
import com.dlink.model.FlinkCDCConfig;

Expand All @@ -38,7 +40,9 @@ public class SinkBuilderFactory {

private static SinkBuilder[] sinkBuilders = {
new KafkaSinkBuilder(),
new KafkaSinkJsonBuilder(),
new DorisSinkBuilder(),
new StarrocksSinkBuilder(),
new HudiSinkBuilder(),
new SQLSinkBuilder(),
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package com.dlink.cdc.kafka;

import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.ObjectConvertUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.util.Collector;

import javax.xml.bind.DatatypeConverter;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
* @className: com.dlink.cdc.kafka.KafkaSinkSimpleBuilder
*/
public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {

private final static String KEY_WORD = "datastream-kafka-json";
private transient ObjectMapper objectMapper;

public KafkaSinkJsonBuilder() {
}

public KafkaSinkJsonBuilder(FlinkCDCConfig config) {
super(config);
}

@Override
public String getHandle() {
return KEY_WORD;
}

@Override
public SinkBuilder create(FlinkCDCConfig config) {
return new KafkaSinkJsonBuilder(config);
}

@Override
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
try{
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
if(objectMapper == null){
initializeObjectMapper();
}
return objectMapper.readValue(value, Map.class);
}
});
final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) {
for (Schema schema : schemaList) {
for (Table table : schema.getTables()) {
final String tableName = table.getName();
final String schemaName = table.getSchema();
SingleOutputStreamOperator<Map> filterOperator = mapOperator.filter(new FilterFunction<Map>() {
@Override
public boolean filter(Map value) throws Exception {
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get(schemaFieldName).toString());
}
});
String topic = getSinkTableName(table);
if (Asserts.isNotNullString(config.getSink().get("topic"))) {
topic = config.getSink().get("topic");
}
List<String> columnNameList = new LinkedList<>();
List<LogicalType> columnTypeList = new LinkedList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns());
SingleOutputStreamOperator<String> stringOperator =filterOperator.process(new ProcessFunction<Map, String>() {
@Override
public void processElement(Map value, Context context, Collector<String> collector) throws Exception {
Map after = null;
Map before = null;
String ts_ms = value.get("ts_ms").toString();
try {
switch (value.get("op").toString()) {
case "r":
case "c":
after = (Map) value.get("after");
convertAttr(columnNameList,columnTypeList,after,value.get("op").toString(),0,schemaName,tableName,ts_ms);
break;
case "u":
before = (Map) value.get("before");
convertAttr(columnNameList,columnTypeList,before,value.get("op").toString(),1,schemaName,tableName,ts_ms);

after = (Map) value.get("after");
convertAttr(columnNameList,columnTypeList,after,value.get("op").toString(),0,schemaName,tableName,ts_ms);
break;
case "d":
before = (Map) value.get("before");
convertAttr(columnNameList,columnTypeList,before,value.get("op").toString(),1,schemaName,tableName,ts_ms);
break;
}
} catch (Exception e) {
logger.error("SchameTable: {} - Exception:", e);
throw e;
}
if(objectMapper == null){
initializeObjectMapper();
}
if(before != null){
collector.collect(objectMapper.writeValueAsString(before));
}
if(after != null){
collector.collect(objectMapper.writeValueAsString(after));
}
}
});
stringOperator.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"),
topic,
new SimpleStringSchema()));
}
}
}
}catch (Exception ex){
logger.error("kafka sink error:",ex);
}
return dataStreamSource;
}

private void initializeObjectMapper() {
this.objectMapper = new ObjectMapper();
JavaTimeModule javaTimeModule = new JavaTimeModule();
// Hack time module to allow 'Z' at the end of string (i.e. javascript json's)
javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ISO_DATE_TIME));
objectMapper.registerModule(javaTimeModule);
objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
}

@Override
public void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
}

@Override
protected Object convertValue(Object value, LogicalType logicalType) {
return ObjectConvertUtil.convertValue(value,logicalType);
}

private void convertAttr(List<String> columnNameList,List<LogicalType> columnTypeList,Map value,String op,int is_deleted,
String schemaName,String tableName,String ts_ms){
for (int i = 0; i < columnNameList.size(); i++) {
String columnName = columnNameList.get(i);
Object columnNameValue = value.remove(columnName);
Object columnNameNewVal = convertValue(columnNameValue, columnTypeList.get(i));
value.put(columnName, columnNameNewVal);
}
value.put("__op",op);
value.put("is_deleted",Integer.valueOf(is_deleted));
value.put("db",schemaName);
value.put("table",tableName);
value.put("ts_ms",ts_ms);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {
sourceBuilder.tableList(new String[0]);
}

sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.deserializer(new MysqlJsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(debeziumProperties);
sourceBuilder.jdbcProperties(jdbcProperties);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.dlink.cdc.mysql;

import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.ConverterType;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

/**
* @version 1.0
* @className: com.dlink.cdc.mysql.MysqlJsonDebeziumDeserializationSchema
* @Description:
* @author: jack zhong
* @date 8/2/221:43 PM
*/
public class MysqlJsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
private static final long serialVersionUID = 1L;
private transient JsonConverter jsonConverter;
private final Boolean includeSchema;
private Map<String, Object> customConverterConfigs;

public MysqlJsonDebeziumDeserializationSchema() {
this(false);
}

public MysqlJsonDebeziumDeserializationSchema(Boolean includeSchema) {
this.includeSchema = includeSchema;
}

public MysqlJsonDebeziumDeserializationSchema(Boolean includeSchema, Map<String, Object> customConverterConfigs) {
this.includeSchema = includeSchema;
this.customConverterConfigs = customConverterConfigs;
}

public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
if (this.jsonConverter == null) {
this.initializeJsonConverter();
}
byte[] bytes = this.jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
out.collect(new String(bytes, StandardCharsets.UTF_8));
}

private void initializeJsonConverter() {
this.jsonConverter = new JsonConverter();
HashMap<String, Object> configs = new HashMap(2);
configs.put("converter.type", ConverterType.VALUE.getName());
configs.put("schemas.enable", this.includeSchema);
if (this.customConverterConfigs != null) {
configs.putAll(this.customConverterConfigs);
}

this.jsonConverter.configure(configs);
}

public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
Loading

0 comments on commit 20471f3

Please sign in to comment.