diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresChunkSplitter.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresChunkSplitter.java index d1dd29cd888..37e70dc7c74 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresChunkSplitter.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresChunkSplitter.java @@ -26,8 +26,8 @@ import com.ververica.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter; import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit; import com.ververica.cdc.connectors.base.utils.ObjectUtils; -import com.ververica.cdc.connectors.postgres.source.utils.PgQueryUtils; -import com.ververica.cdc.connectors.postgres.source.utils.PgTypeUtils; +import com.ververica.cdc.connectors.postgres.source.utils.PostgresQueryUtils; +import com.ververica.cdc.connectors.postgres.source.utils.PostgresTypeUtils; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; import io.debezium.relational.Table; @@ -162,14 +162,14 @@ public Collection generateSplits(TableId tableId) { @Override public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) throws SQLException { - return PgQueryUtils.queryMinMax(jdbc, tableId, columnName); + return PostgresQueryUtils.queryMinMax(jdbc, tableId, columnName); } @Override public Object queryMin( JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) throws SQLException { - return PgQueryUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound); + return PostgresQueryUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound); } @Override @@ -180,7 +180,7 @@ public Object queryNextChunkMax( int chunkSize, Object includedLowerBound) throws SQLException { - return PgQueryUtils.queryNextChunkMax( + return PostgresQueryUtils.queryNextChunkMax( jdbc, tableId, columnName, chunkSize, includedLowerBound); } @@ -190,18 +190,19 @@ public Object queryNextChunkMax( @Override public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { - return PgQueryUtils.queryApproximateRowCnt(jdbc, tableId); + return PostgresQueryUtils.queryApproximateRowCnt(jdbc, tableId); } @Override public String buildSplitScanQuery( TableId tableId, RowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) { - return PgQueryUtils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit); + return PostgresQueryUtils.buildSplitScanQuery( + tableId, splitKeyType, isFirstSplit, isLastSplit); } @Override public DataType fromDbzColumn(Column splitColumn) { - return PgTypeUtils.fromDbzColumn(splitColumn); + return PostgresTypeUtils.fromDbzColumn(splitColumn); } /** diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java index b408a449f43..1d518886811 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java @@ -32,7 +32,7 @@ import com.ververica.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask; import com.ververica.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext; import com.ververica.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask; -import com.ververica.cdc.connectors.postgres.source.utils.PgSchema; +import com.ververica.cdc.connectors.postgres.source.utils.CustomPostgresSchema; import com.ververica.cdc.connectors.postgres.source.utils.TableDiscoveryUtils; import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.connection.PostgresConnection; @@ -55,7 +55,7 @@ public class PostgresDialect implements JdbcDataSourceDialect { private static final long serialVersionUID = 1L; private final PostgresSourceConfig sourceConfig; - private transient PgSchema schema; + private transient CustomPostgresSchema schema; @Nullable private PostgresStreamFetchTask streamFetchTask; @@ -153,7 +153,7 @@ public JdbcConnectionPoolFactory getPooledDataSourceFactory() { @Override public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) { if (schema == null) { - schema = new PgSchema((PostgresConnection) jdbc, sourceConfig); + schema = new CustomPostgresSchema((PostgresConnection) jdbc, sourceConfig); } return schema.getTableSchema(tableId); } diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java index 75627aea27f..4ce0a05c18b 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java @@ -25,7 +25,7 @@ import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind; import com.ververica.cdc.connectors.base.source.reader.external.FetchTask; import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffset; -import com.ververica.cdc.connectors.postgres.source.utils.PgQueryUtils; +import com.ververica.cdc.connectors.postgres.source.utils.PostgresQueryUtils; import io.debezium.config.Configuration; import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.PostgresOffsetContext; @@ -87,8 +87,8 @@ public void execute(Context context) throws Exception { PostgresSourceFetchTaskContext ctx = (PostgresSourceFetchTaskContext) context; taskRunning = true; - SnapshotSplitReadTask snapshotSplitReadTask = - new SnapshotSplitReadTask( + PostgresSnapshotSplitReadTask snapshotSplitReadTask = + new PostgresSnapshotSplitReadTask( ctx.getConnection(), ctx.getDbzConnectorConfig(), ctx.getDatabaseSchema(), @@ -216,8 +216,9 @@ public boolean isRunning() { } /** A SnapshotChangeEventSource implementation for Postgres to read snapshot split. */ - public static class SnapshotSplitReadTask extends AbstractSnapshotChangeEventSource { - private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitReadTask.class); + public static class PostgresSnapshotSplitReadTask extends AbstractSnapshotChangeEventSource { + private static final Logger LOG = + LoggerFactory.getLogger(PostgresSnapshotSplitReadTask.class); private final PostgresConnection jdbcConnection; private final PostgresConnectorConfig connectorConfig; @@ -228,7 +229,7 @@ public static class SnapshotSplitReadTask extends AbstractSnapshotChangeEventSou private final SnapshotProgressListener snapshotProgressListener; private final Clock clock; - public SnapshotSplitReadTask( + public PostgresSnapshotSplitReadTask( PostgresConnection jdbcConnection, PostgresConnectorConfig connectorConfig, PostgresSchema databaseSchema, @@ -310,7 +311,7 @@ private void createDataEventsForTable( table.id()); final String selectSql = - PgQueryUtils.buildSplitScanQuery( + PostgresQueryUtils.buildSplitScanQuery( snapshotSplit.getTableId(), snapshotSplit.getSplitKeyType(), snapshotSplit.getSplitStart() == null, @@ -322,7 +323,7 @@ private void createDataEventsForTable( selectSql); try (PreparedStatement selectStatement = - PgQueryUtils.readTableSplitDataStatement( + PostgresQueryUtils.readTableSplitDataStatement( jdbcConnection, selectSql, snapshotSplit.getSplitStart() == null, diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java index ce42dff5fff..677089fddda 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java @@ -16,7 +16,6 @@ package com.ververica.cdc.connectors.postgres.source.fetch; -import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.types.logical.RowType; import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; @@ -25,10 +24,11 @@ import com.ververica.cdc.connectors.base.source.meta.offset.Offset; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext; +import com.ververica.cdc.connectors.postgres.source.PostgresChunkSplitter; import com.ververica.cdc.connectors.postgres.source.PostgresDialect; import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffset; import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffsetFactory; -import com.ververica.cdc.connectors.postgres.source.utils.PgTypeUtils; +import com.ververica.cdc.connectors.postgres.source.utils.PostgresTypeUtils; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.PostgresErrorHandler; @@ -59,7 +59,6 @@ import org.slf4j.LoggerFactory; import java.sql.SQLException; -import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; @@ -200,17 +199,9 @@ public PostgresSchema getDatabaseSchema() { @Override public RowType getSplitType(Table table) { - List primaryKeys = table.primaryKeyColumns(); - if (primaryKeys.isEmpty()) { - throw new ValidationException( - String.format( - "Incremental snapshot for tables requires primary key," - + " but table %s doesn't have primary key.", - table.id())); - } - - // use first field in primary key as the split key - return PgTypeUtils.getSplitType(primaryKeys.get(0)); + Column splitColumn = + PostgresChunkSplitter.getSplitColumn(table, sourceConfig.getChunkKeyColumn()); + return PostgresTypeUtils.getSplitType(splitColumn); } @Override diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PgSchema.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java similarity index 93% rename from flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PgSchema.java rename to flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java index 2c7cb9601fe..6fa15797961 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PgSchema.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java @@ -36,15 +36,16 @@ import java.util.Map; import java.util.Objects; -/** A PgSchema similar to PostgresSchema with customization. */ -public class PgSchema { +/** A CustomPostgresSchema similar to PostgresSchema with customization. */ +public class CustomPostgresSchema { // cache the schema for each table private final Map schemasByTableId = new HashMap<>(); private final PostgresConnection jdbcConnection; private final PostgresConnectorConfig dbzConfig; - public PgSchema(PostgresConnection jdbcConnection, PostgresSourceConfig sourceConfig) { + public CustomPostgresSchema( + PostgresConnection jdbcConnection, PostgresSourceConfig sourceConfig) { this.jdbcConnection = jdbcConnection; this.dbzConfig = sourceConfig.getDbzConnectorConfig(); } @@ -53,7 +54,7 @@ public TableChange getTableSchema(TableId tableId) { // read schema from cache first if (!schemasByTableId.containsKey(tableId)) { try { - schemasByTableId.put(tableId, Objects.requireNonNull(readTableSchema(tableId))); + readTableSchema(tableId); } catch (SQLException e) { throw new FlinkRuntimeException("Failed to read table schema", e); } diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PgQueryUtils.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java similarity index 98% rename from flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PgQueryUtils.java rename to flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java index b2f4d04d453..8d091cee42e 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PgQueryUtils.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java @@ -33,11 +33,11 @@ import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.rowToArray; /** Query-related Utilities for Postgres CDC source. */ -public class PgQueryUtils { +public class PostgresQueryUtils { - private static final Logger LOG = LoggerFactory.getLogger(PgQueryUtils.class); + private static final Logger LOG = LoggerFactory.getLogger(PostgresQueryUtils.class); - private PgQueryUtils() {} + private PostgresQueryUtils() {} public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) throws SQLException { diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PgTypeUtils.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PostgresTypeUtils.java similarity index 99% rename from flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PgTypeUtils.java rename to flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PostgresTypeUtils.java index 1032e41307d..4b662fad2f8 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PgTypeUtils.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PostgresTypeUtils.java @@ -27,7 +27,7 @@ import static org.apache.flink.table.api.DataTypes.ROW; /** A utility class for converting Postgres types to Flink types. */ -public class PgTypeUtils { +public class PostgresTypeUtils { private static final String PG_SMALLSERIAL = "smallserial"; private static final String PG_SERIAL = "serial"; private static final String PG_BIGSERIAL = "bigserial"; diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java index 70a0d4ea279..634159a33b3 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java @@ -24,7 +24,6 @@ import java.sql.SQLException; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -40,18 +39,13 @@ public static List listTables( Set allTableIds = jdbc.readTableNames(database, null, null, new String[] {"TABLE"}); - Set capturedTables = new HashSet<>(); - - for (TableId tableId : allTableIds) { - if (tableFilters.dataCollectionFilter().isIncluded(tableId)) { - LOG.debug("Adding table {} to the list of captured tables", tableId); - capturedTables.add(tableId); - } else { - LOG.debug( - "Ignoring table {} as it's not included in the filter configuration", - tableId); - } - } + Set capturedTables = + allTableIds.stream() + .filter(t -> tableFilters.dataCollectionFilter().isIncluded(t)) + .collect(Collectors.toSet()); + LOG.info( + "Postgres captured tables : {} .", + capturedTables.stream().map(t -> t.toString()).collect(Collectors.joining(","))); return capturedTables.stream().sorted().collect(Collectors.toCollection(ArrayList::new)); } diff --git a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectFactory.java b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectFactory.java index a2325acc357..9b91e2b553f 100644 --- a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectFactory.java +++ b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectFactory.java @@ -90,7 +90,12 @@ public static PostgresConnection.PostgresValueConverterBuilder newPostgresValueC } // modified from - // io.debezium.connector.postgresql.PostgresConnectorTask.createReplicationConnection + // io.debezium.connector.postgresql.PostgresConnectorTask.createReplicationConnection. + // pass connectorConfig instead of maxRetries and retryDelay as parameters. + // - old: ReplicationConnection createReplicationConnection(PostgresTaskContext taskContext, + // boolean doSnapshot, int maxRetries, Duration retryDelay) + // - new: ReplicationConnection createReplicationConnection(PostgresTaskContext taskContext, + // boolean doSnapshot, PostgresConnectorConfig connectorConfig) public static ReplicationConnection createReplicationConnection( PostgresTaskContext taskContext, boolean doSnapshot, diff --git a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/Utils.java b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/Utils.java index 6cde40549b6..da2c74fc23e 100644 --- a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/Utils.java +++ b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/Utils.java @@ -50,7 +50,8 @@ public static PostgresOffset currentOffset(PostgresConnection jdbcConnection) { try { jdbcConnection.commit(); } catch (SQLException e) { - throw new FlinkRuntimeException(e); + throw new FlinkRuntimeException( + "JDBC connection fails to commit: " + e.getMessage(), e); } return new PostgresOffset(lsn, txId, Instant.MIN);