Skip to content

Commit 33018d5

Browse files
oli2vleonardBang
authored andcommitted
[FLINK-35067][cdc-connector][postgres] Adding metadata 'row_kind' for Postgres CDC Connector
1 parent 77785c1 commit 33018d5

File tree

4 files changed

+103
-60
lines changed

4 files changed

+103
-60
lines changed

docs/content/docs/connectors/flink-sources/postgres-cdc.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,13 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a
387387
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
388388
<td>It indicates the time that the change was made in the database. <br>If the record is read from snapshot of the table instead of the change stream, the value is always 0.</td>
389389
</tr>
390+
<tr>
391+
<td>row_kind</td>
392+
<td>STRING NOT NULL</td>
393+
<td>It indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if
394+
the source operator chooses to output the 'row_kind' column for each record. It is recommended to use this metadata column only in simple synchronization jobs.
395+
<br>'+I' means INSERT message, '-D' means DELETE message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message.</td>
396+
</tr>
390397
</tbody>
391398
</table>
392399

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLReadableMetadata.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.flink.cdc.connectors.postgres.table;
1919

2020
import org.apache.flink.cdc.debezium.table.MetadataConverter;
21+
import org.apache.flink.cdc.debezium.table.RowDataMetadataConverter;
2122
import org.apache.flink.table.api.DataTypes;
23+
import org.apache.flink.table.data.RowData;
2224
import org.apache.flink.table.data.StringData;
2325
import org.apache.flink.table.data.TimestampData;
2426
import org.apache.flink.table.types.DataType;
@@ -95,6 +97,28 @@ public Object read(SourceRecord record) {
9597
return TimestampData.fromEpochMillis(
9698
(Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
9799
}
100+
}),
101+
102+
/**
103+
* It indicates the row kind of the changelog. '+I' means INSERT message, '-D' means DELETE
104+
* message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message
105+
*/
106+
ROW_KIND(
107+
"row_kind",
108+
DataTypes.STRING().notNull(),
109+
new RowDataMetadataConverter() {
110+
private static final long serialVersionUID = 1L;
111+
112+
@Override
113+
public Object read(RowData rowData) {
114+
return StringData.fromString(rowData.getRowKind().shortString());
115+
}
116+
117+
@Override
118+
public Object read(SourceRecord record) {
119+
throw new UnsupportedOperationException(
120+
"Please call read(RowData rowData) method instead.");
121+
}
98122
});
99123

100124
private final String key;

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java

Lines changed: 61 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -165,20 +165,24 @@ public void testConsumingAllEvents()
165165
* The final database table looks like this:
166166
*
167167
* > SELECT * FROM products;
168-
* +-----+--------------------+---------------------------------------------------------+--------+
169-
* | id | name | description | weight |
170-
* +-----+--------------------+---------------------------------------------------------+--------+
171-
* | 101 | scooter | Small 2-wheel scooter | 3.14 |
172-
* | 102 | car battery | 12V car battery | 8.1 |
173-
* | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 |
174-
* | 104 | hammer | 12oz carpenter's hammer | 0.75 |
175-
* | 105 | hammer | 14oz carpenter's hammer | 0.875 |
176-
* | 106 | hammer | 18oz carpenter hammer | 1 |
177-
* | 107 | rocks | box of assorted rocks | 5.1 |
178-
* | 108 | jacket | water resistent black wind breaker | 0.1 |
179-
* | 109 | spare tire | 24 inch spare tire | 22.2 |
180-
* | 110 | jacket | new water resistent white wind breaker | 0.5 |
181-
* +-----+--------------------+---------------------------------------------------------+--------+
168+
* +-----+--------------------+-------------------------------------------------
169+
* --------+--------+
170+
* | id | name | description | weight |
171+
* +-----+--------------------+-------------------------------------------------
172+
* --------+--------+
173+
* | 101 | scooter | Small 2-wheel scooter | 3.14 |
174+
* | 102 | car battery | 12V car battery | 8.1 |
175+
* | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from
176+
* #40 to #3 | 0.8 |
177+
* | 104 | hammer | 12oz carpenter's hammer | 0.75 |
178+
* | 105 | hammer | 14oz carpenter's hammer | 0.875 |
179+
* | 106 | hammer | 18oz carpenter hammer | 1 |
180+
* | 107 | rocks | box of assorted rocks | 5.1 |
181+
* | 108 | jacket | water resistent black wind breaker | 0.1 |
182+
* | 109 | spare tire | 24 inch spare tire | 22.2 |
183+
* | 110 | jacket | new water resistent white wind breaker | 0.5 |
184+
* +-----+--------------------+-------------------------------------------------
185+
* --------+--------+
182186
* </pre>
183187
*/
184188

@@ -193,7 +197,7 @@ public void testConsumingAllEvents()
193197
"spare tire,22.200"
194198
};
195199

196-
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
200+
List<String> actual = TestValuesTableFactory.getResults("sink");
197201
assertThat(actual, containsInAnyOrder(expected));
198202

199203
result.getJobClient().get().cancel().get();
@@ -246,7 +250,8 @@ public void testStartupFromLatestOffset() throws Exception {
246250

247251
// async submit job
248252
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
249-
// wait for the source startup, we don't have a better way to wait it, use sleep for now
253+
// wait for the source startup, we don't have a better way to wait it, use sleep
254+
// for now
250255
Thread.sleep(10000L);
251256

252257
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
@@ -266,7 +271,7 @@ public void testStartupFromLatestOffset() throws Exception {
266271
String[] expected =
267272
new String[] {"110,jacket,new water resistent white wind breaker,0.500"};
268273

269-
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
274+
List<String> actual = TestValuesTableFactory.getResults("sink");
270275
assertThat(actual, containsInAnyOrder(expected));
271276

272277
result.getJobClient().get().cancel().get();
@@ -454,7 +459,7 @@ public void testAllTypes() throws Throwable {
454459
"+I(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})",
455460
"-D(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})",
456461
"+I(1,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})");
457-
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
462+
List<String> actual = TestValuesTableFactory.getRawResults("sink");
458463
assertEquals(expected, actual);
459464

460465
result.getJobClient().get().cancel().get();
@@ -469,6 +474,7 @@ public void testMetadataColumns() throws Throwable {
469474
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
470475
+ " schema_name STRING METADATA VIRTUAL,"
471476
+ " table_name STRING METADATA VIRTUAL,"
477+
+ " row_kind STRING METADATA FROM 'row_kind' VIRTUAL,"
472478
+ " id INT NOT NULL,"
473479
+ " name STRING,"
474480
+ " description STRING,"
@@ -501,6 +507,7 @@ public void testMetadataColumns() throws Throwable {
501507
+ " database_name STRING,"
502508
+ " schema_name STRING,"
503509
+ " table_name STRING,"
510+
+ " row_kind STRING,"
504511
+ " id INT,"
505512
+ " name STRING,"
506513
+ " description STRING,"
@@ -546,53 +553,53 @@ public void testMetadataColumns() throws Throwable {
546553
Arrays.asList(
547554
"+I("
548555
+ databaseName
549-
+ ",inventory,products,101,scooter,Small 2-wheel scooter,3.140)",
556+
+ ",inventory,products,+I,101,scooter,Small 2-wheel scooter,3.140)",
550557
"+I("
551558
+ databaseName
552-
+ ",inventory,products,102,car battery,12V car battery,8.100)",
559+
+ ",inventory,products,+I,102,car battery,12V car battery,8.100)",
553560
"+I("
554561
+ databaseName
555-
+ ",inventory,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)",
562+
+ ",inventory,products,+I,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)",
556563
"+I("
557564
+ databaseName
558-
+ ",inventory,products,104,hammer,12oz carpenter's hammer,0.750)",
565+
+ ",inventory,products,+I,104,hammer,12oz carpenter's hammer,0.750)",
559566
"+I("
560567
+ databaseName
561-
+ ",inventory,products,105,hammer,14oz carpenter's hammer,0.875)",
568+
+ ",inventory,products,+I,105,hammer,14oz carpenter's hammer,0.875)",
562569
"+I("
563570
+ databaseName
564-
+ ",inventory,products,106,hammer,16oz carpenter's hammer,1.000)",
571+
+ ",inventory,products,+I,106,hammer,16oz carpenter's hammer,1.000)",
565572
"+I("
566573
+ databaseName
567-
+ ",inventory,products,107,rocks,box of assorted rocks,5.300)",
574+
+ ",inventory,products,+I,107,rocks,box of assorted rocks,5.300)",
568575
"+I("
569576
+ databaseName
570-
+ ",inventory,products,108,jacket,water resistent black wind breaker,0.100)",
577+
+ ",inventory,products,+I,108,jacket,water resistent black wind breaker,0.100)",
571578
"+I("
572579
+ databaseName
573-
+ ",inventory,products,109,spare tire,24 inch spare tire,22.200)",
580+
+ ",inventory,products,+I,109,spare tire,24 inch spare tire,22.200)",
574581
"+I("
575582
+ databaseName
576-
+ ",inventory,products,110,jacket,water resistent white wind breaker,0.200)",
583+
+ ",inventory,products,+I,110,jacket,water resistent white wind breaker,0.200)",
577584
"+I("
578585
+ databaseName
579-
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.180)",
586+
+ ",inventory,products,+I,111,scooter,Big 2-wheel scooter ,5.180)",
580587
"+U("
581588
+ databaseName
582-
+ ",inventory,products,106,hammer,18oz carpenter hammer,1.000)",
589+
+ ",inventory,products,+U,106,hammer,18oz carpenter hammer,1.000)",
583590
"+U("
584591
+ databaseName
585-
+ ",inventory,products,107,rocks,box of assorted rocks,5.100)",
592+
+ ",inventory,products,+U,107,rocks,box of assorted rocks,5.100)",
586593
"+U("
587594
+ databaseName
588-
+ ",inventory,products,110,jacket,new water resistent white wind breaker,0.500)",
595+
+ ",inventory,products,+U,110,jacket,new water resistent white wind breaker,0.500)",
589596
"+U("
590597
+ databaseName
591-
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)",
598+
+ ",inventory,products,+U,111,scooter,Big 2-wheel scooter ,5.170)",
592599
"-D("
593600
+ databaseName
594-
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)");
595-
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
601+
+ ",inventory,products,-D,111,scooter,Big 2-wheel scooter ,5.170)");
602+
List<String> actual = TestValuesTableFactory.getRawResults("sink");
596603
Collections.sort(actual);
597604
Collections.sort(expected);
598605
assertEquals(expected, actual);
@@ -679,20 +686,24 @@ public void testUpsertMode() throws Exception {
679686
* The final database table looks like this:
680687
*
681688
* > SELECT * FROM products;
682-
* +-----+--------------------+---------------------------------------------------------+--------+
683-
* | id | name | description | weight |
684-
* +-----+--------------------+---------------------------------------------------------+--------+
685-
* | 101 | scooter | Small 2-wheel scooter | 3.14 |
686-
* | 102 | car battery | 12V car battery | 8.1 |
687-
* | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 |
688-
* | 104 | hammer | 12oz carpenter's hammer | 0.75 |
689-
* | 105 | hammer | 14oz carpenter's hammer | 0.875 |
690-
* | 106 | hammer | 18oz carpenter hammer | 1 |
691-
* | 107 | rocks | box of assorted rocks | 5.1 |
692-
* | 108 | jacket | water resistent black wind breaker | 0.1 |
693-
* | 109 | spare tire | 24 inch spare tire | 22.2 |
694-
* | 110 | jacket | new water resistent white wind breaker | 0.5 |
695-
* +-----+--------------------+---------------------------------------------------------+--------+
689+
* +-----+--------------------+-------------------------------------------------
690+
* --------+--------+
691+
* | id | name | description | weight |
692+
* +-----+--------------------+-------------------------------------------------
693+
* --------+--------+
694+
* | 101 | scooter | Small 2-wheel scooter | 3.14 |
695+
* | 102 | car battery | 12V car battery | 8.1 |
696+
* | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from
697+
* #40 to #3 | 0.8 |
698+
* | 104 | hammer | 12oz carpenter's hammer | 0.75 |
699+
* | 105 | hammer | 14oz carpenter's hammer | 0.875 |
700+
* | 106 | hammer | 18oz carpenter hammer | 1 |
701+
* | 107 | rocks | box of assorted rocks | 5.1 |
702+
* | 108 | jacket | water resistent black wind breaker | 0.1 |
703+
* | 109 | spare tire | 24 inch spare tire | 22.2 |
704+
* | 110 | jacket | new water resistent white wind breaker | 0.5 |
705+
* +-----+--------------------+-------------------------------------------------
706+
* --------+--------+
696707
* </pre>
697708
*/
698709

@@ -707,7 +718,7 @@ public void testUpsertMode() throws Exception {
707718
"spare tire,22.200"
708719
};
709720

710-
List<String> actual = TestValuesTableFactory.getResultsAsStrings("sink");
721+
List<String> actual = TestValuesTableFactory.getResults("sink");
711722
assertThat(actual, containsInAnyOrder(expected));
712723

713724
result.getJobClient().get().cancel().get();

0 commit comments

Comments
 (0)