Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/flink-sources/postgres-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,13 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
<td>It indicates the time that the change was made in the database. <br>If the record is read from snapshot of the table instead of the change stream, the value is always 0.</td>
</tr>
<tr>
<td>row_kind</td>
<td>STRING NOT NULL</td>
<td>It indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if
the source operator chooses to output the 'row_kind' column for each record. It is recommended to use this metadata column only in simple synchronization jobs.
<br>'+I' means INSERT message, '-D' means DELETE message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message.</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.flink.cdc.connectors.postgres.table;

import org.apache.flink.cdc.debezium.table.MetadataConverter;
import org.apache.flink.cdc.debezium.table.RowDataMetadataConverter;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
Expand Down Expand Up @@ -95,6 +97,28 @@ public Object read(SourceRecord record) {
return TimestampData.fromEpochMillis(
(Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
}
}),

/**
* It indicates the row kind of the changelog. '+I' means INSERT message, '-D' means DELETE
* message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message
*/
ROW_KIND(
"row_kind",
DataTypes.STRING().notNull(),
new RowDataMetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object read(RowData rowData) {
return StringData.fromString(rowData.getRowKind().shortString());
}

@Override
public Object read(SourceRecord record) {
throw new UnsupportedOperationException(
"Please call read(RowData rowData) method instead.");
}
});

private final String key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,20 +165,24 @@ public void testConsumingAllEvents()
* The final database table looks like this:
*
* > SELECT * FROM products;
* +-----+--------------------+---------------------------------------------------------+--------+
* | id | name | description | weight |
* +-----+--------------------+---------------------------------------------------------+--------+
* | 101 | scooter | Small 2-wheel scooter | 3.14 |
* | 102 | car battery | 12V car battery | 8.1 |
* | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 |
* | 104 | hammer | 12oz carpenter's hammer | 0.75 |
* | 105 | hammer | 14oz carpenter's hammer | 0.875 |
* | 106 | hammer | 18oz carpenter hammer | 1 |
* | 107 | rocks | box of assorted rocks | 5.1 |
* | 108 | jacket | water resistent black wind breaker | 0.1 |
* | 109 | spare tire | 24 inch spare tire | 22.2 |
* | 110 | jacket | new water resistent white wind breaker | 0.5 |
* +-----+--------------------+---------------------------------------------------------+--------+
* +-----+--------------------+-------------------------------------------------
* --------+--------+
* | id | name | description | weight |
* +-----+--------------------+-------------------------------------------------
* --------+--------+
* | 101 | scooter | Small 2-wheel scooter | 3.14 |
* | 102 | car battery | 12V car battery | 8.1 |
* | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from
* #40 to #3 | 0.8 |
* | 104 | hammer | 12oz carpenter's hammer | 0.75 |
* | 105 | hammer | 14oz carpenter's hammer | 0.875 |
* | 106 | hammer | 18oz carpenter hammer | 1 |
* | 107 | rocks | box of assorted rocks | 5.1 |
* | 108 | jacket | water resistent black wind breaker | 0.1 |
* | 109 | spare tire | 24 inch spare tire | 22.2 |
* | 110 | jacket | new water resistent white wind breaker | 0.5 |
* +-----+--------------------+-------------------------------------------------
* --------+--------+
* </pre>
*/

Expand Down Expand Up @@ -246,7 +250,8 @@ public void testStartupFromLatestOffset() throws Exception {

// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
// wait for the source startup, we don't have a better way to wait it, use sleep for now
// wait for the source startup, we don't have a better way to wait it, use sleep
// for now
Thread.sleep(10000L);

try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Expand Down Expand Up @@ -469,6 +474,7 @@ public void testMetadataColumns() throws Throwable {
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
+ " schema_name STRING METADATA VIRTUAL,"
+ " table_name STRING METADATA VIRTUAL,"
+ " row_kind STRING METADATA FROM 'row_kind' VIRTUAL,"
+ " id INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
Expand Down Expand Up @@ -501,6 +507,7 @@ public void testMetadataColumns() throws Throwable {
+ " database_name STRING,"
+ " schema_name STRING,"
+ " table_name STRING,"
+ " row_kind STRING,"
+ " id INT,"
+ " name STRING,"
+ " description STRING,"
Expand Down Expand Up @@ -546,52 +553,52 @@ public void testMetadataColumns() throws Throwable {
Arrays.asList(
"+I("
+ databaseName
+ ",inventory,products,101,scooter,Small 2-wheel scooter,3.140)",
+ ",inventory,products,+I,101,scooter,Small 2-wheel scooter,3.140)",
"+I("
+ databaseName
+ ",inventory,products,102,car battery,12V car battery,8.100)",
+ ",inventory,products,+I,102,car battery,12V car battery,8.100)",
"+I("
+ databaseName
+ ",inventory,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)",
+ ",inventory,products,+I,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)",
"+I("
+ databaseName
+ ",inventory,products,104,hammer,12oz carpenter's hammer,0.750)",
+ ",inventory,products,+I,104,hammer,12oz carpenter's hammer,0.750)",
"+I("
+ databaseName
+ ",inventory,products,105,hammer,14oz carpenter's hammer,0.875)",
+ ",inventory,products,+I,105,hammer,14oz carpenter's hammer,0.875)",
"+I("
+ databaseName
+ ",inventory,products,106,hammer,16oz carpenter's hammer,1.000)",
+ ",inventory,products,+I,106,hammer,16oz carpenter's hammer,1.000)",
"+I("
+ databaseName
+ ",inventory,products,107,rocks,box of assorted rocks,5.300)",
+ ",inventory,products,+I,107,rocks,box of assorted rocks,5.300)",
"+I("
+ databaseName
+ ",inventory,products,108,jacket,water resistent black wind breaker,0.100)",
+ ",inventory,products,+I,108,jacket,water resistent black wind breaker,0.100)",
"+I("
+ databaseName
+ ",inventory,products,109,spare tire,24 inch spare tire,22.200)",
+ ",inventory,products,+I,109,spare tire,24 inch spare tire,22.200)",
"+I("
+ databaseName
+ ",inventory,products,110,jacket,water resistent white wind breaker,0.200)",
+ ",inventory,products,+I,110,jacket,water resistent white wind breaker,0.200)",
"+I("
+ databaseName
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.180)",
+ ",inventory,products,+I,111,scooter,Big 2-wheel scooter ,5.180)",
"+U("
+ databaseName
+ ",inventory,products,106,hammer,18oz carpenter hammer,1.000)",
+ ",inventory,products,+U,106,hammer,18oz carpenter hammer,1.000)",
"+U("
+ databaseName
+ ",inventory,products,107,rocks,box of assorted rocks,5.100)",
+ ",inventory,products,+U,107,rocks,box of assorted rocks,5.100)",
"+U("
+ databaseName
+ ",inventory,products,110,jacket,new water resistent white wind breaker,0.500)",
+ ",inventory,products,+U,110,jacket,new water resistent white wind breaker,0.500)",
"+U("
+ databaseName
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)",
+ ",inventory,products,+U,111,scooter,Big 2-wheel scooter ,5.170)",
"-D("
+ databaseName
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)");
+ ",inventory,products,-D,111,scooter,Big 2-wheel scooter ,5.170)");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
Collections.sort(actual);
Collections.sort(expected);
Expand Down Expand Up @@ -679,20 +686,24 @@ public void testUpsertMode() throws Exception {
* The final database table looks like this:
*
* > SELECT * FROM products;
* +-----+--------------------+---------------------------------------------------------+--------+
* | id | name | description | weight |
* +-----+--------------------+---------------------------------------------------------+--------+
* | 101 | scooter | Small 2-wheel scooter | 3.14 |
* | 102 | car battery | 12V car battery | 8.1 |
* | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 |
* | 104 | hammer | 12oz carpenter's hammer | 0.75 |
* | 105 | hammer | 14oz carpenter's hammer | 0.875 |
* | 106 | hammer | 18oz carpenter hammer | 1 |
* | 107 | rocks | box of assorted rocks | 5.1 |
* | 108 | jacket | water resistent black wind breaker | 0.1 |
* | 109 | spare tire | 24 inch spare tire | 22.2 |
* | 110 | jacket | new water resistent white wind breaker | 0.5 |
* +-----+--------------------+---------------------------------------------------------+--------+
* +-----+--------------------+-------------------------------------------------
* --------+--------+
* | id | name | description | weight |
* +-----+--------------------+-------------------------------------------------
* --------+--------+
* | 101 | scooter | Small 2-wheel scooter | 3.14 |
* | 102 | car battery | 12V car battery | 8.1 |
* | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from
* #40 to #3 | 0.8 |
* | 104 | hammer | 12oz carpenter's hammer | 0.75 |
* | 105 | hammer | 14oz carpenter's hammer | 0.875 |
* | 106 | hammer | 18oz carpenter hammer | 1 |
* | 107 | rocks | box of assorted rocks | 5.1 |
* | 108 | jacket | water resistent black wind breaker | 0.1 |
* | 109 | spare tire | 24 inch spare tire | 22.2 |
* | 110 | jacket | new water resistent white wind breaker | 0.5 |
* +-----+--------------------+-------------------------------------------------
* --------+--------+
* </pre>
*/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,19 @@
import java.util.Map;
import java.util.Properties;

import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHUNK_META_GROUP_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECTION_POOL_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_MAX_RETRIES;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -100,6 +100,7 @@ public class PostgreSQLTableFactoryTest {
Column.physical("name", DataTypes.STRING()),
Column.physical("count", DataTypes.DECIMAL(38, 18)),
Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true),
Column.metadata("row_kind", DataTypes.STRING(), "row_kind", true),
Column.metadata(
"database_name", DataTypes.STRING(), "database_name", true),
Column.metadata("schema_name", DataTypes.STRING(), "schema_name", true),
Expand Down Expand Up @@ -211,7 +212,7 @@ public void testMetadataColumns() {
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
PostgreSQLTableSource postgreSQLTableSource = (PostgreSQLTableSource) actualSource;
postgreSQLTableSource.applyReadableMetadata(
Arrays.asList("op_ts", "database_name", "schema_name", "table_name"),
Arrays.asList("row_kind", "op_ts", "database_name", "schema_name", "table_name"),
SCHEMA_WITH_METADATA.toSourceRowDataType());
actualSource = postgreSQLTableSource.copy();
PostgreSQLTableSource expectedSource =
Expand Down Expand Up @@ -246,7 +247,7 @@ public void testMetadataColumns() {
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
Arrays.asList("op_ts", "database_name", "schema_name", "table_name");
Arrays.asList("row_kind", "op_ts", "database_name", "schema_name", "table_name");

assertEquals(expectedSource, actualSource);

Expand Down
Loading