Skip to content

Commit

Permalink
apply review comment suggestion
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaom committed Jun 7, 2023
1 parent 00b3db2 commit 0abdc1e
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import com.ververica.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.utils.ObjectUtils;
import com.ververica.cdc.connectors.postgres.source.utils.PgQueryUtils;
import com.ververica.cdc.connectors.postgres.source.utils.PgTypeUtils;
import com.ververica.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
import com.ververica.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
Expand Down Expand Up @@ -162,14 +162,14 @@ public Collection<SnapshotSplit> generateSplits(TableId tableId) {
@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
throws SQLException {
return PgQueryUtils.queryMinMax(jdbc, tableId, columnName);
return PostgresQueryUtils.queryMinMax(jdbc, tableId, columnName);
}

@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
throws SQLException {
return PgQueryUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound);
return PostgresQueryUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound);
}

@Override
Expand All @@ -180,7 +180,7 @@ public Object queryNextChunkMax(
int chunkSize,
Object includedLowerBound)
throws SQLException {
return PgQueryUtils.queryNextChunkMax(
return PostgresQueryUtils.queryNextChunkMax(
jdbc, tableId, columnName, chunkSize, includedLowerBound);
}

Expand All @@ -190,18 +190,19 @@ public Object queryNextChunkMax(

@Override
public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException {
return PgQueryUtils.queryApproximateRowCnt(jdbc, tableId);
return PostgresQueryUtils.queryApproximateRowCnt(jdbc, tableId);
}

@Override
public String buildSplitScanQuery(
TableId tableId, RowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) {
return PgQueryUtils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit);
return PostgresQueryUtils.buildSplitScanQuery(
tableId, splitKeyType, isFirstSplit, isLastSplit);
}

@Override
public DataType fromDbzColumn(Column splitColumn) {
return PgTypeUtils.fromDbzColumn(splitColumn);
return PostgresTypeUtils.fromDbzColumn(splitColumn);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import com.ververica.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask;
import com.ververica.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext;
import com.ververica.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask;
import com.ververica.cdc.connectors.postgres.source.utils.PgSchema;
import com.ververica.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
import com.ververica.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.connection.PostgresConnection;
Expand All @@ -55,7 +55,7 @@ public class PostgresDialect implements JdbcDataSourceDialect {
private static final long serialVersionUID = 1L;
private final PostgresSourceConfig sourceConfig;

private transient PgSchema schema;
private transient CustomPostgresSchema schema;

@Nullable private PostgresStreamFetchTask streamFetchTask;

Expand Down Expand Up @@ -153,7 +153,7 @@ public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
@Override
public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
if (schema == null) {
schema = new PgSchema((PostgresConnection) jdbc, sourceConfig);
schema = new CustomPostgresSchema((PostgresConnection) jdbc, sourceConfig);
}
return schema.getTableSchema(tableId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffset;
import com.ververica.cdc.connectors.postgres.source.utils.PgQueryUtils;
import com.ververica.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresOffsetContext;
Expand Down Expand Up @@ -87,8 +87,8 @@ public void execute(Context context) throws Exception {
PostgresSourceFetchTaskContext ctx = (PostgresSourceFetchTaskContext) context;
taskRunning = true;

SnapshotSplitReadTask snapshotSplitReadTask =
new SnapshotSplitReadTask(
PostgresSnapshotSplitReadTask snapshotSplitReadTask =
new PostgresSnapshotSplitReadTask(
ctx.getConnection(),
ctx.getDbzConnectorConfig(),
ctx.getDatabaseSchema(),
Expand Down Expand Up @@ -216,8 +216,9 @@ public boolean isRunning() {
}

/** A SnapshotChangeEventSource implementation for Postgres to read snapshot split. */
public static class SnapshotSplitReadTask extends AbstractSnapshotChangeEventSource {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitReadTask.class);
public static class PostgresSnapshotSplitReadTask extends AbstractSnapshotChangeEventSource {
private static final Logger LOG =
LoggerFactory.getLogger(PostgresSnapshotSplitReadTask.class);

private final PostgresConnection jdbcConnection;
private final PostgresConnectorConfig connectorConfig;
Expand All @@ -228,7 +229,7 @@ public static class SnapshotSplitReadTask extends AbstractSnapshotChangeEventSou
private final SnapshotProgressListener snapshotProgressListener;
private final Clock clock;

public SnapshotSplitReadTask(
public PostgresSnapshotSplitReadTask(
PostgresConnection jdbcConnection,
PostgresConnectorConfig connectorConfig,
PostgresSchema databaseSchema,
Expand Down Expand Up @@ -310,7 +311,7 @@ private void createDataEventsForTable(
table.id());

final String selectSql =
PgQueryUtils.buildSplitScanQuery(
PostgresQueryUtils.buildSplitScanQuery(
snapshotSplit.getTableId(),
snapshotSplit.getSplitKeyType(),
snapshotSplit.getSplitStart() == null,
Expand All @@ -322,7 +323,7 @@ private void createDataEventsForTable(
selectSql);

try (PreparedStatement selectStatement =
PgQueryUtils.readTableSplitDataStatement(
PostgresQueryUtils.readTableSplitDataStatement(
jdbcConnection,
selectSql,
snapshotSplit.getSplitStart() == null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.ververica.cdc.connectors.postgres.source.fetch;

import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;

import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
Expand All @@ -25,10 +24,11 @@
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import com.ververica.cdc.connectors.postgres.source.PostgresChunkSplitter;
import com.ververica.cdc.connectors.postgres.source.PostgresDialect;
import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffset;
import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
import com.ververica.cdc.connectors.postgres.source.utils.PgTypeUtils;
import com.ververica.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresErrorHandler;
Expand Down Expand Up @@ -59,7 +59,6 @@
import org.slf4j.LoggerFactory;

import java.sql.SQLException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -200,17 +199,9 @@ public PostgresSchema getDatabaseSchema() {

@Override
public RowType getSplitType(Table table) {
List<Column> primaryKeys = table.primaryKeyColumns();
if (primaryKeys.isEmpty()) {
throw new ValidationException(
String.format(
"Incremental snapshot for tables requires primary key,"
+ " but table %s doesn't have primary key.",
table.id()));
}

// use first field in primary key as the split key
return PgTypeUtils.getSplitType(primaryKeys.get(0));
Column splitColumn =
PostgresChunkSplitter.getSplitColumn(table, sourceConfig.getChunkKeyColumn());
return PostgresTypeUtils.getSplitType(splitColumn);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@
import java.util.Map;
import java.util.Objects;

/** A PgSchema similar to PostgresSchema with customization. */
public class PgSchema {
/** A CustomPostgresSchema similar to PostgresSchema with customization. */
public class CustomPostgresSchema {

// cache the schema for each table
private final Map<TableId, TableChange> schemasByTableId = new HashMap<>();
private final PostgresConnection jdbcConnection;
private final PostgresConnectorConfig dbzConfig;

public PgSchema(PostgresConnection jdbcConnection, PostgresSourceConfig sourceConfig) {
public CustomPostgresSchema(
PostgresConnection jdbcConnection, PostgresSourceConfig sourceConfig) {
this.jdbcConnection = jdbcConnection;
this.dbzConfig = sourceConfig.getDbzConnectorConfig();
}
Expand All @@ -53,7 +54,7 @@ public TableChange getTableSchema(TableId tableId) {
// read schema from cache first
if (!schemasByTableId.containsKey(tableId)) {
try {
schemasByTableId.put(tableId, Objects.requireNonNull(readTableSchema(tableId)));
readTableSchema(tableId);
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to read table schema", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;

/** Query-related Utilities for Postgres CDC source. */
public class PgQueryUtils {
public class PostgresQueryUtils {

private static final Logger LOG = LoggerFactory.getLogger(PgQueryUtils.class);
private static final Logger LOG = LoggerFactory.getLogger(PostgresQueryUtils.class);

private PgQueryUtils() {}
private PostgresQueryUtils() {}

public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import static org.apache.flink.table.api.DataTypes.ROW;

/** A utility class for converting Postgres types to Flink types. */
public class PgTypeUtils {
public class PostgresTypeUtils {
private static final String PG_SMALLSERIAL = "smallserial";
private static final String PG_SERIAL = "serial";
private static final String PG_BIGSERIAL = "bigserial";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -40,18 +39,13 @@ public static List<TableId> listTables(
Set<TableId> allTableIds =
jdbc.readTableNames(database, null, null, new String[] {"TABLE"});

Set<TableId> capturedTables = new HashSet<>();

for (TableId tableId : allTableIds) {
if (tableFilters.dataCollectionFilter().isIncluded(tableId)) {
LOG.debug("Adding table {} to the list of captured tables", tableId);
capturedTables.add(tableId);
} else {
LOG.debug(
"Ignoring table {} as it's not included in the filter configuration",
tableId);
}
}
Set<TableId> capturedTables =
allTableIds.stream()
.filter(t -> tableFilters.dataCollectionFilter().isIncluded(t))
.collect(Collectors.toSet());
LOG.info(
"Postgres captured tables : {} .",
capturedTables.stream().map(t -> t.toString()).collect(Collectors.joining(",")));

return capturedTables.stream().sorted().collect(Collectors.toCollection(ArrayList::new));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,12 @@ public static PostgresConnection.PostgresValueConverterBuilder newPostgresValueC
}

// modified from
// io.debezium.connector.postgresql.PostgresConnectorTask.createReplicationConnection
// io.debezium.connector.postgresql.PostgresConnectorTask.createReplicationConnection.
// pass connectorConfig instead of maxRetries and retryDelay as parameters.
// - old: ReplicationConnection createReplicationConnection(PostgresTaskContext taskContext,
// boolean doSnapshot, int maxRetries, Duration retryDelay)
// - new: ReplicationConnection createReplicationConnection(PostgresTaskContext taskContext,
// boolean doSnapshot, PostgresConnectorConfig connectorConfig)
public static ReplicationConnection createReplicationConnection(
PostgresTaskContext taskContext,
boolean doSnapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public static PostgresOffset currentOffset(PostgresConnection jdbcConnection) {
try {
jdbcConnection.commit();
} catch (SQLException e) {
throw new FlinkRuntimeException(e);
throw new FlinkRuntimeException(
"JDBC connection fails to commit: " + e.getMessage(), e);
}

return new PostgresOffset(lsn, txId, Instant.MIN);
Expand Down

0 comments on commit 0abdc1e

Please sign in to comment.