Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Postgres] Use Incremental Snapshot Framework for Postgres CDC Connector #1823

Closed
wants to merge 7 commits into from

Conversation

xiaom
Copy link
Contributor

@xiaom xiaom commented Dec 12, 2022

Co-Authored-By: Yaroslav Tkachenko 260702+sap1ens@users.noreply.github.com

Hey @leonardBang,

This is our first PR resolving #1163 😄

The core functionality of DataStream is implemented under the package com.ververica.cdc.connectors.postgres.source with a similar layout to MySQL/Oracle's incremental snapshotting implementation.

source
├── PostgresChunkSplitter.java
├── PostgresConnectionPoolFactory.java
├── PostgresDialect.java
├── PostgresSourceBuilder.java
├── config
│   ├── PostgresSourceConfig.java
│   ├── PostgresSourceConfigFactory.java
│   └── PostgresSourceOptions.java
├── fetch
│   ├── PostgresScanFetchTask.java
│   ├── PostgresSourceFetchTaskContext.java
│   └── PostgresStreamFetchTask.java
├── offset
│   ├── PostgresOffset.java
│   └── PostgresOffsetFactory.java
└── utils
    ├── PgQueryUtils.java
    ├── PgSchema.java
    ├── PgTypeUtils.java
    └── TableDiscoveryUtils.java

The corresponding Table API can be enabled by setting scan.incremental.snapshot.enabled=true.

A few notes:

package io.debezium.connector.postgresql

The package is mostly getting around the limitation of using some Debezium classes directly.

  • Utils.java: a utility class to access some package-private methods of Debezium
  • PostgresObjectFactory.java: a factory to create various Debezium object constructor which needs package private access
  • ``PostgresConnection.java: copied from Debezium 1.6.4-final and modified to support injecting connection factory with Hikari connection pool.

Major changes to CDC-base

  • JdbcSourceConfig: add a new field List<String> schemaList and make it compatible with PostgreSQL and Debezium’s terminology. (see Debezium’s TableId).
  • DataSourceDialect: extend the CheckpointListener interface such that we can add a customized hook to commit offset
  • SourceSplitSerializer: fixed a deserialization bug and check the useCatalogBeforeSchema flag (true by default)

Other changes not related to PostgreSQL

  • Fix a bug where the high watermark is not set properly in various *ScanFetchTask

Notes on Approximate Count query for Postgres

We use the following query in chunk splitter to estimate the approximate count of rows

SELECT reltuples::bigint FROM pg_class WHERE oid = to_regclass('your_table_id')

The query requires a prior run of VACCUM or ANALYZE to get a good estimation. For any PostgreSQL instances with autovacuum on, you won’t need to worry about it.

We are also actively working on supporting the scan.newly-added-table.enabled feature as @sap1ens mentioned here.

Appreciate any feedback!

@leonardBang leonardBang self-requested a review December 12, 2022 06:20
@xiaom
Copy link
Contributor Author

xiaom commented Dec 16, 2022

Update:

  • fix some issues of JdbcSourceFetchTaskContext and set proper default implementation of several methods (see e040533)

@leonardBang
Copy link
Contributor

@xiaom We planed to open a contributor sync meeting to discuss the 2.4 roadmap, are you interested to join? please contact me if you'd like to.

@xiaom
Copy link
Contributor Author

xiaom commented Feb 8, 2023

Hey @leonardBang, thanks for the invitation! Yeah, I am interested. will DM you on Twitter.

@1032851561
Copy link

Does it support snapshot the new added tables? I need this function, is it works good?

@sap1ens
Copy link
Contributor

sap1ens commented Apr 10, 2023

Does it support snapshot the new added tables? I need this function, is it works good?

There is a separate PR for that: #1838

@1032851561
Copy link

Does it support snapshot the new added tables? I need this function, is it works good?

There is a separate PR for that: #1838

Nice, hope to merge as soon as possible.

@ruanhang1993 ruanhang1993 self-requested a review April 20, 2023 06:27
Copy link
Contributor

@ruanhang1993 ruanhang1993 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiaom , thanks for your work. I left some comments.

@ruanhang1993
Copy link
Contributor

ruanhang1993 commented May 23, 2023

Hi, @xiaom.
Do you have time to rebase the master branch and make some updates?
Please @ me when you update this PR and need me to review. Thanks ~

@xiaom
Copy link
Contributor Author

xiaom commented May 24, 2023

Hi @ruanhang1993, thanks for the review! I will find some time to update the PR either later this week or next week.

@ruanhang1993
Copy link
Contributor

Hi, @xiaom .

Is there any update about this PR?
We plan to release version 2.4.0 at June 14th. This feature is in this version.
If any update is pushed, I will review again as soon as possible.

Thanks a lot~

@xiaom
Copy link
Contributor Author

xiaom commented Jun 5, 2023

Apologize for the delay in updating the PR. I've had some unexpected personal commitments come up.
I'll do my best to get this PR updated.

@xiaom
Copy link
Contributor Author

xiaom commented Jun 7, 2023

Hey @ruanhang1993, I've addressed some comments in this commit for you to review. Let me know what you think. I have not rebased the branch yet. I will do it in the next update.


Also, I'd like to point out a caveat of this feature for any potential users: its scalability with large tables is not ideal.

In the snapshotting phase, backfill tasks are created to capture new data changes. However, for larger tables, since snapshotting takes longer, WAL also grows larger and backfilling tasks will take significantly more time.

Contrary to MySQL, where the process can be parallelized through additional binlog readers, this isn't straightforward for Postgres. To achieve similar parallelism, we would require more replication slots, a resource that is not advisable to overuse due to its limited availability.

In light of this, we implement a snapshot-only reader (with option snapshot.mode=initial_only) coupled with a stream reader (snapshot.mode=never) and some dedupe processes in our production environment. This approach allows us to parallelize snapshotting without increasing the number of replication slots. Just want to mention this in case anyone wants to use similar strategies.

@ruanhang1993
Copy link
Contributor

Hi, @xiaom .
Thanks for the quick reply. I have replied the unclear comments.

About the problem you mentioned, the snapshot phase for the big table is actually a common pain point.
Snapshot only + start from a specific binlog position is a good idea. But this way also have some limits.

  • If there is a single parallelism in snapshot phase, we have to set an enough long alive time for binlogs.
  • If there are multi parallelisms in snapshot phase, we can only provide the at least once semantic. The sink must support idempotent operations. And we also need to make sure the binlogs be alive.

The issue #1687 for mysql aims to the usage.

@xiaom
Copy link
Contributor Author

xiaom commented Jun 9, 2023

Hey @ruanhang1993,

I've rebased the PR.

Also, thanks for mentioning various solutions for parallelized snapshotting. Good to know that this is a common pain point.

Copy link
Contributor

@ruanhang1993 ruanhang1993 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @xiaom . I have reviewed the cdc-base and will review pg cdc part later.
Would you mind take a look at the failed CI ? Thanks ~

docs/content/connectors/postgres-cdc.md Outdated Show resolved Hide resolved
@@ -358,6 +359,8 @@ private void writeTableIds(Collection<TableId> tableIds, DataOutputSerializer ou
final int size = tableIds.size();
out.writeInt(size);
for (TableId tableId : tableIds) {
boolean useCatalogBeforeSchema = SerializerUtils.shouldUseCatalogBeforeSchema(tableId);
out.writeBoolean(useCatalogBeforeSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make the state in 2.3.0 not be able to be used in 2.4.0.
We should update the state serializer version and use a different logic.

@ruanhang1993
Copy link
Contributor

Hi, @xiaom .
I have finished reviewing the PR. Please take a look at it and make the CI succeed. Then we could merge this PR.
Thanks ~

@xiaom
Copy link
Contributor Author

xiaom commented Jun 13, 2023

I've addressed some review feedback ( emoji "👍" marked) as part 1 0861f46. will continue the left one and fix CI later

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants