Skip to content

Commit

Permalink
feat: flink1.15 add CDCSOURCE for pg2sr (DataLinkDC#1933)
Browse files Browse the repository at this point in the history
  • Loading branch information
boolean-dev authored May 5, 2023
1 parent 6cf1681 commit cfd3ad0
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.assertion.Asserts;
import org.dinky.cdc.mysql.MysqlCDCBuilder;
import org.dinky.cdc.oracle.OracleCDCBuilder;
import org.dinky.cdc.postgres.PostgresCDCBuilder;
import org.dinky.exception.FlinkClientException;
import org.dinky.model.FlinkCDCConfig;

Expand All @@ -41,6 +42,7 @@ public class CDCBuilderFactory {
{
put(MysqlCDCBuilder.KEY_WORD, () -> new MysqlCDCBuilder());
put(OracleCDCBuilder.KEY_WORD, () -> new OracleCDCBuilder());
put(PostgresCDCBuilder.KEY_WORD, () -> new PostgresCDCBuilder());
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.dinky.cdc.kafka.KafkaSinkBuilder;
import org.dinky.cdc.kafka.KafkaSinkJsonBuilder;
import org.dinky.cdc.sql.SQLSinkBuilder;
import org.dinky.cdc.starrocks.StarrocksSinkBuilder;
import org.dinky.exception.FlinkClientException;
import org.dinky.model.FlinkCDCConfig;

Expand All @@ -51,6 +52,7 @@ public class SinkBuilderFactory {
put(
DorisSchemaEvolutionSinkBuilder.KEY_WORD,
() -> new DorisSchemaEvolutionSinkBuilder());
put(StarrocksSinkBuilder.KEY_WORD, () -> new StarrocksSinkBuilder());
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.cdc.postgres;

import org.dinky.assertion.Asserts;
import org.dinky.cdc.AbstractCDCBuilder;
import org.dinky.cdc.CDCBuilder;
import org.dinky.constant.ClientConstant;
import org.dinky.constant.FlinkParamConstant;
import org.dinky.model.FlinkCDCConfig;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;

/**
* postgresCDCBuilder
*
* @author mengyejiang
* @since 2022/8/21 10:00
*/
public class PostgresCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {

public static final String KEY_WORD = "postgres-cdc";
private static final String METADATA_TYPE = "PostgreSql";

public PostgresCDCBuilder() {}

public PostgresCDCBuilder(FlinkCDCConfig config) {
super(config);
}

@Override
public String getHandle() {
return KEY_WORD;
}

@Override
public CDCBuilder create(FlinkCDCConfig config) {
return new PostgresCDCBuilder(config);
}

@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {

String decodingPluginName = config.getSource().get("decoding.plugin.name");
String slotName = config.getSource().get("slot.name");

Properties debeziumProperties = new Properties();
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey())
&& Asserts.isNotNullString(entry.getValue())) {
debeziumProperties.setProperty(entry.getKey(), entry.getValue());
}
}

PostgreSQLSource.Builder<String> sourceBuilder =
PostgreSQLSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.database(config.getDatabase())
.username(config.getUsername())
.password(config.getPassword());
String schema = config.getSchema();
if (Asserts.isNotNullString(schema)) {
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
sourceBuilder.schemaList(schemas);
} else {
sourceBuilder.schemaList(new String[0]);
}
List<String> schemaTableNameList = config.getSchemaTableNameList();
if (Asserts.isNotNullCollection(schemaTableNameList)) {
sourceBuilder.tableList(
schemaTableNameList.toArray(new String[schemaTableNameList.size()]));
} else {
sourceBuilder.tableList(new String[0]);
}

sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(debeziumProperties);

if (Asserts.isNotNullString(decodingPluginName)) {
sourceBuilder.decodingPluginName(decodingPluginName);
}

if (Asserts.isNotNullString(slotName)) {
sourceBuilder.slotName(slotName);
}

return env.addSource(sourceBuilder.build(), "Postgres CDC Source");
}

public Map<String, Map<String, String>> parseMetaDataConfigs() {
Map<String, Map<String, String>> allConfigMap = new HashMap<>();
List<String> schemaList = getSchemaList();
for (String schema : schemaList) {
Map<String, String> configMap = new HashMap<>();
configMap.put(ClientConstant.METADATA_TYPE, METADATA_TYPE);
StringBuilder sb = new StringBuilder("jdbc:postgresql://");
sb.append(config.getHostname());
sb.append(":");
sb.append(config.getPort());
sb.append("/");
sb.append(config.getDatabase());
configMap.put(ClientConstant.METADATA_NAME, sb.toString());
configMap.put(ClientConstant.METADATA_URL, sb.toString());
configMap.put(ClientConstant.METADATA_USERNAME, config.getUsername());
configMap.put(ClientConstant.METADATA_PASSWORD, config.getPassword());
allConfigMap.put(schema, configMap);
}
return allConfigMap;
}

@Override
public String getSchema() {
return config.getSchema();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.cdc.starrocks;

import org.dinky.cdc.AbstractSinkBuilder;
import org.dinky.cdc.SinkBuilder;
import org.dinky.model.Column;
import org.dinky.model.FlinkCDCConfig;
import org.dinky.model.Table;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.utils.TypeConversions;

import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer;
import com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;

public class StarrocksSinkBuilder extends AbstractSinkBuilder implements Serializable {

public static final String KEY_WORD = "datastream-starrocks";
private static final long serialVersionUID = 8330362249137431824L;

public StarrocksSinkBuilder() {}

public StarrocksSinkBuilder(FlinkCDCConfig config) {
super(config);
}

@Override
public String getHandle() {
return KEY_WORD;
}

@Override
public SinkBuilder create(FlinkCDCConfig config) {
return new StarrocksSinkBuilder(config);
}

@Override
public void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList) {
try {
List<Column> columns = table.getColumns();
List<String> primaryKeys = new LinkedList<>();
String[] columnNames = new String[columns.size()];
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
if (column.isKeyFlag()) {
primaryKeys.add(column.getName());
}
columnNames[i] = column.getName();
}
String[] primaryKeyArrays = primaryKeys.stream().toArray(String[]::new);
DataType[] dataTypes = new DataType[columnTypeList.size()];
for (int i = 0; i < columnTypeList.size(); i++) {
LogicalType logicalType = columnTypeList.get(i);
String columnName = columnNameList.get(i);
if (primaryKeys.contains(columnName)) {
logicalType = logicalType.copy(false);
}
dataTypes[i] = TypeConversions.fromLogicalToDataType(logicalType);
}
TableSchema tableSchema =
TableSchema.builder()
.primaryKey(primaryKeyArrays)
.fields(columnNames, dataTypes)
.build();
Map<String, String> sink = config.getSink();
StarRocksSinkOptions.Builder builder =
StarRocksSinkOptions.builder()
.withProperty("jdbc-url", sink.get("jdbc-url"))
.withProperty("load-url", sink.get("load-url"))
.withProperty("username", sink.get("username"))
.withProperty("password", sink.get("password"))
.withProperty("table-name", getSinkTableName(table))
.withProperty("database-name", getSinkSchemaName(table))
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
// 设置并行度,多并行度情况下需要考虑如何保证数据有序性
.withProperty("sink.parallelism", "1");
sink.forEach(
(key, value) -> {
if (key.startsWith("sink.")) {
builder.withProperty(key, value);
}
});
StarRocksDynamicSinkFunction<RowData> starrocksSinkFunction =
new StarRocksDynamicSinkFunction<RowData>(
builder.build(),
tableSchema,
new StarRocksTableRowTransformer(TypeInformation.of(RowData.class)));
rowDataDataStream.addSink(starrocksSinkFunction);
logger.info("handler connector name:{} sink successful.....", getHandle());
} catch (Exception ex) {
logger.error("handler connector name:{} sink ex:", getHandle(), ex);
}
}
}
20 changes: 20 additions & 0 deletions dinky-flink/dinky-flink-1.15/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,26 @@
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>

<!-- starrocks-connector-->
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>1.2.6_flink-1.15</version>
<exclusions>
<exclusion>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- postgres-connector-->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down

0 comments on commit cfd3ad0

Please sign in to comment.