Skip to content

Commit 956c4e0

Browse files
feat: support per-table chunk key columns for incremental snapshot splitting
1 parent 83ef3de commit 956c4e0

File tree

14 files changed

+488
-49
lines changed

14 files changed

+488
-49
lines changed

docs/content.zh/docs/connectors/pipeline-connectors/postgres.md

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ Postgres CDC Pipeline 连接器允许从 Postgres 数据库读取快照数据和
3131

3232
## 示例
3333

34-
从 Postgres 读取数据同步到 Doris 的 Pipeline 可以定义如下:
34+
从 Postgres 读取数据同步到 Fluss 的 Pipeline 可以定义如下:
3535

3636
```yaml
3737
source:
@@ -41,19 +41,23 @@ source:
4141
port: 5432
4242
username: admin
4343
password: pass
44-
tables: adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.*
44+
# 需要确保所有的表来自同一个database
45+
tables: adb.\.*.\.*
4546
decoding.plugin.name: pgoutput
4647
slot.name: pgtest
4748

4849
sink:
49-
type: doris
50-
name: Doris Sink
51-
fenodes: 127.0.0.1:8030
52-
username: root
53-
password: pass
50+
type: fluss
51+
name: Fluss Sink
52+
bootstrap.servers: localhost:9123
53+
# Security-related properties for the Fluss client
54+
properties.client.security.protocol: sasl
55+
properties.client.security.sasl.mechanism: PLAIN
56+
properties.client.security.sasl.username: developer
57+
properties.client.security.sasl.password: developer-pass
5458

5559
pipeline:
56-
name: Postgres to Doris Pipeline
60+
name: Postgres to Fluss Pipeline
5761
parallelism: 4
5862
```
5963
@@ -105,8 +109,9 @@ pipeline:
105109
<td style="word-wrap: break-word;">(none)</td>
106110
<td>String</td>
107111
<td>需要监视的 Postgres 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。<br>
108-
需要注意的是,点号(.)被视为数据库和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。<br>
109-
例如,adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.*</td>
112+
需要确保所有的表来自同一个数据库。<br>
113+
需要注意的是,点号(.)被视为数据库、模式和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。<br>
114+
例如,bdb.user_schema_[0-9].user_table_[0-9]+, bdb.schema_\.*.order_\.*</td>
110115
</tr>
111116
<tr>
112117
<td>slot.name</td>

docs/content/docs/connectors/pipeline-connectors/postgres.md

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,29 +32,33 @@ Note: Since the Postgres WAL log cannot parse table structure change records, Po
3232

3333
## Example
3434

35-
An example of the pipeline for reading data from Postgres and sink to Doris can be defined as follows:
35+
An example of the pipeline for reading data from Postgres and sink to Fluss can be defined as follows:
3636

3737
```yaml
3838
source:
39-
type: posgtres
39+
type: postgres
4040
name: Postgres Source
4141
hostname: 127.0.0.1
4242
port: 5432
4343
username: admin
4444
password: pass
45-
tables: adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.*
45+
# make sure all the tables share same database.
46+
tables: adb.\.*.\.*
4647
decoding.plugin.name: pgoutput
4748
slot.name: pgtest
4849

4950
sink:
50-
type: doris
51-
name: Doris Sink
52-
fenodes: 127.0.0.1:8030
53-
username: root
54-
password: pass
51+
type: fluss
52+
name: Fluss Sink
53+
bootstrap.servers: localhost:9123
54+
# Security-related properties for the Fluss client
55+
properties.client.security.protocol: sasl
56+
properties.client.security.sasl.mechanism: PLAIN
57+
properties.client.security.sasl.username: developer
58+
properties.client.security.sasl.password: developer-pass
5559

5660
pipeline:
57-
name: Postgres to Doris Pipeline
61+
name: Postgres to Fluss Pipeline
5862
parallelism: 4
5963
```
6064
@@ -106,9 +110,10 @@ pipeline:
106110
<td style="word-wrap: break-word;">(none)</td>
107111
<td>String</td>
108112
<td>Table name of the Postgres database to monitor. The table-name also supports regular expressions to monitor multiple tables that satisfy the regular expressions. <br>
109-
It is important to note that the dot (.) is treated as a delimiter for database and table names.
113+
All the tables are required to share same database. <br>
114+
It is important to note that the dot (.) is treated as a delimiter for database, schema and table names.
110115
If there is a need to use a dot (.) in a regular expression to match any character, it is necessary to escape the dot with a backslash.<br>
111-
例如,adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.*</td>
116+
for example: bdb.user_schema_[0-9].user_table_[0-9]+, bdb.schema_\.*.order_\.*</td>
112117
</tr>
113118
<tr>
114119
<td>slot.name</td>

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfig.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919

2020
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
2121
import org.apache.flink.cdc.connectors.base.source.IncrementalSource;
22+
import org.apache.flink.table.catalog.ObjectPath;
2223

2324
import io.debezium.config.Configuration;
2425
import io.debezium.relational.RelationalDatabaseConnectorConfig;
2526
import io.debezium.relational.RelationalTableFilters;
2627

2728
import java.time.Duration;
2829
import java.util.List;
30+
import java.util.Map;
2931
import java.util.Properties;
3032

3133
/**
@@ -46,7 +48,7 @@ public abstract class JdbcSourceConfig extends BaseSourceConfig {
4648
protected final Duration connectTimeout;
4749
protected final int connectMaxRetries;
4850
protected final int connectionPoolSize;
49-
protected final String chunkKeyColumn;
51+
protected final Map<ObjectPath, String> chunkKeyColumns;
5052

5153
public JdbcSourceConfig(
5254
StartupOptions startupOptions,
@@ -71,7 +73,7 @@ public JdbcSourceConfig(
7173
Duration connectTimeout,
7274
int connectMaxRetries,
7375
int connectionPoolSize,
74-
String chunkKeyColumn,
76+
Map<ObjectPath, String> chunkKeyColumns,
7577
boolean skipSnapshotBackfill,
7678
boolean isScanNewlyAddedTableEnabled,
7779
boolean assignUnboundedChunkFirst) {
@@ -101,7 +103,7 @@ public JdbcSourceConfig(
101103
this.connectTimeout = connectTimeout;
102104
this.connectMaxRetries = connectMaxRetries;
103105
this.connectionPoolSize = connectionPoolSize;
104-
this.chunkKeyColumn = chunkKeyColumn;
106+
this.chunkKeyColumns = chunkKeyColumns;
105107
}
106108

107109
public abstract RelationalDatabaseConnectorConfig getDbzConnectorConfig();
@@ -154,8 +156,8 @@ public int getConnectionPoolSize() {
154156
return connectionPoolSize;
155157
}
156158

157-
public String getChunkKeyColumn() {
158-
return chunkKeyColumn;
159+
public Map<ObjectPath, String> getChunkKeyColumns() {
160+
return chunkKeyColumns;
159161
}
160162

161163
@Override

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@
2222
import org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions;
2323
import org.apache.flink.cdc.connectors.base.options.SourceOptions;
2424
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
25+
import org.apache.flink.table.catalog.ObjectPath;
2526

2627
import java.time.Duration;
2728
import java.util.Arrays;
29+
import java.util.HashMap;
2830
import java.util.List;
31+
import java.util.Map;
2932
import java.util.Properties;
3033

3134
/** A {@link Factory} to provide {@link SourceConfig} of JDBC data source. */
@@ -55,7 +58,7 @@ public abstract class JdbcSourceConfigFactory implements Factory<JdbcSourceConfi
5558
protected int connectMaxRetries = JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue();
5659
protected int connectionPoolSize = JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue();
5760
protected Properties dbzProperties;
58-
protected String chunkKeyColumn;
61+
protected Map<ObjectPath, String> chunkKeyColumns = new HashMap<>();
5962
protected boolean skipSnapshotBackfill =
6063
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue();
6164
protected boolean scanNewlyAddedTableEnabled =
@@ -198,8 +201,17 @@ public JdbcSourceConfigFactory debeziumProperties(Properties properties) {
198201
* The chunk key of table snapshot, captured tables are split into multiple chunks by the chunk
199202
* key column when read the snapshot of table.
200203
*/
201-
public JdbcSourceConfigFactory chunkKeyColumn(String chunkKeyColumn) {
202-
this.chunkKeyColumn = chunkKeyColumn;
204+
public JdbcSourceConfigFactory chunkKeyColumn(ObjectPath objectPath, String chunkKeyColumn) {
205+
this.chunkKeyColumns.put(objectPath, chunkKeyColumn);
206+
return this;
207+
}
208+
209+
/**
210+
* The chunk key of table snapshot, captured tables are split into multiple chunks by the chunk
211+
* key column when read the snapshot of table.
212+
*/
213+
public JdbcSourceConfigFactory chunkKeyColumn(Map<ObjectPath, String> chunkKeyColumns) {
214+
this.chunkKeyColumns.putAll(chunkKeyColumns);
203215
return this;
204216
}
205217

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
2525
import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils;
2626
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
27+
import org.apache.flink.table.catalog.ObjectPath;
2728
import org.apache.flink.table.types.DataType;
2829
import org.apache.flink.table.types.logical.LogicalTypeRoot;
2930
import org.apache.flink.table.types.logical.RowType;
@@ -253,11 +254,13 @@ protected double calculateDistributionFactor(
253254
* Get the column which is seen as chunk key.
254255
*
255256
* @param table table identity.
256-
* @param chunkKeyColumn column name which is seen as chunk key, if chunkKeyColumn is null, use
257-
* primary key instead. @Column the column which is seen as chunk key.
257+
* @param chunkKeyColumns column name which is seen as chunk key, if chunkKeyColumns is null,
258+
* use primary key instead.
259+
* @return the column which is seen as chunk key.
258260
*/
259-
protected Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) {
260-
return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn);
261+
protected Column getSplitColumn(
262+
Table table, @Nullable Map<ObjectPath, String> chunkKeyColumns) {
263+
return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumns);
261264
}
262265

263266
/** ChunkEnd less than or equal to max. */
@@ -360,7 +363,7 @@ private void analyzeTable(TableId tableId) {
360363
try {
361364
currentSchema = dialect.queryTableSchema(jdbcConnection, tableId);
362365
currentSplittingTable = Objects.requireNonNull(currentSchema).getTable();
363-
splitColumn = getSplitColumn(currentSplittingTable, sourceConfig.getChunkKeyColumn());
366+
splitColumn = getSplitColumn(currentSplittingTable, sourceConfig.getChunkKeyColumns());
364367
splitType = getSplitType(splitColumn);
365368
minMaxOfSplitColumn = queryMinMax(jdbcConnection, tableId, splitColumn);
366369
approximateRowCnt = queryApproximateRowCnt(jdbcConnection, tableId);

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import io.debezium.util.SchemaNameAdjuster;
3838
import org.apache.kafka.connect.data.Struct;
3939
import org.apache.kafka.connect.source.SourceRecord;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
4042

4143
import java.time.Instant;
4244
import java.util.Collection;
@@ -47,7 +49,7 @@
4749
/** The context for fetch task that fetching data of snapshot split from JDBC data source. */
4850
@Internal
4951
public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context {
50-
52+
private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceFetchTaskContext.class);
5153
protected final JdbcSourceConfig sourceConfig;
5254
protected final JdbcDataSourceDialect dataSourceDialect;
5355
protected CommonConnectorConfig dbzConnectorConfig;

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,21 @@
1818
package org.apache.flink.cdc.connectors.base.source.utils;
1919

2020
import org.apache.flink.table.api.ValidationException;
21+
import org.apache.flink.table.catalog.ObjectPath;
2122

2223
import io.debezium.jdbc.JdbcConnection;
2324
import io.debezium.relational.Column;
2425
import io.debezium.relational.Table;
26+
import io.debezium.relational.TableId;
27+
import io.debezium.relational.Tables;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2530

2631
import javax.annotation.Nullable;
2732

2833
import java.sql.SQLException;
2934
import java.util.List;
35+
import java.util.Map;
3036
import java.util.Optional;
3137
import java.util.stream.Collectors;
3238

@@ -35,6 +41,8 @@
3541
/** Utilities to split chunks of table. */
3642
public class JdbcChunkUtils {
3743

44+
private static final Logger LOG = LoggerFactory.getLogger(JdbcChunkUtils.class);
45+
3846
/**
3947
* Query the maximum and minimum value of the column in the table. e.g. query string <code>
4048
* SELECT MIN(%s) FROM %s WHERE %s > ?</code>
@@ -100,15 +108,55 @@ public static Object queryMin(
100108
});
101109
}
102110

111+
// Write createTableFilter method here to avoid the dependency on DebeziumUtils
112+
private static Tables.TableFilter createTableFilter(String schemaName, String tableName) {
113+
return new Tables.TableFilter() {
114+
@Override
115+
public boolean isIncluded(TableId tableId) {
116+
final String catalog = tableId.catalog();
117+
final String schema = tableId.schema();
118+
final String table = tableId.table();
119+
120+
if (schemaName != null && !schemaName.equalsIgnoreCase(schema)) {
121+
return false;
122+
}
123+
124+
if (tableName != null && !tableName.equalsIgnoreCase(table)) {
125+
return false;
126+
}
127+
128+
return true;
129+
}
130+
};
131+
}
132+
133+
@Nullable
134+
private static String findChunkKeyColumn(
135+
TableId tableId, Map<ObjectPath, String> chunkKeyColumns) {
136+
String schemaName = tableId.schema();
137+
138+
for (ObjectPath table : chunkKeyColumns.keySet()) {
139+
Tables.TableFilter filter = createTableFilter(schemaName, table.getObjectName());
140+
if (filter.isIncluded(tableId)) {
141+
String chunkKeyColumn = chunkKeyColumns.get(table);
142+
return chunkKeyColumn;
143+
}
144+
}
145+
146+
return null;
147+
}
148+
103149
/**
104150
* Get the column which is seen as chunk key.
105151
*
106152
* @param table table identity.
107-
* @param chunkKeyColumn column name which is seen as chunk key, if chunkKeyColumn is null, use
153+
* @param chunkKeyColumns column name which is seen as chunk key, if chunkKeyColumn is null, use
108154
* primary key instead. @Column the column which is seen as chunk key.
109155
*/
110-
public static Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) {
156+
public static Column getSplitColumn(
157+
Table table, @Nullable Map<ObjectPath, String> chunkKeyColumns) {
111158
List<Column> primaryKeys = table.primaryKeyColumns();
159+
String chunkKeyColumn = findChunkKeyColumn(table.id(), chunkKeyColumns);
112160
if (primaryKeys.isEmpty() && chunkKeyColumn == null) {
113161
throw new ValidationException(
114162
"To use incremental snapshot, 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.");

0 commit comments

Comments
 (0)