Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-postgres: read() throws config exception upon xmin wraparound #38875

Merged
merged 8 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.36.6 | 2024-06-04 | [\#38875](https://github.com/airbytehq/airbyte/pull/38875) | Add an overridable method to check database condition before reading data |
| 0.36.5 | 2024-06-01 | [\#38792](https://github.com/airbytehq/airbyte/pull/38792) | Throw config exception if no selectable table exists in user provided schemas |
| 0.36.4 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix |
| 0.36.2 | 2024-05-29 | [\#38538](https://github.com/airbytehq/airbyte/pull/38357) | Exit connector when encountering a config error. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.36.5
version=0.36.6
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ protected constructor(driverClassName: String) :

val database = createDatabase(config)

precheckDatabase(database, config)

logPreSyncDebugData(database, catalog)

val fullyQualifiedTableNameToInfo =
Expand Down Expand Up @@ -181,6 +183,14 @@ protected constructor(driverClassName: String) :
}
}

// Optional - perform some rechecks before read.
// For example, postgres source connector can check if the database has
// xmin wraparound
@Throws(SQLException::class)
protected open fun precheckDatabase(database: Database, config: JsonNode) {
/* no-op */
}

// Optional - perform any initialization logic before read. For example, source connector
// can choose to load up state manager here.
protected open fun initializeForStateManager(
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-postgres/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.36.5'
cdkVersionRequired = '0.36.6'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
useLocalCdk = true
}

application {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.4.12
dockerImageTag: 3.4.13
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,17 @@ public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLExcept
return database;
}

@Override
protected void precheckDatabase(final JdbcDatabase database, final JsonNode config) throws SQLException {
if (isXmin(config)) {
if (PostgresQueryUtils.getXminStatus(database).getNumWraparound() > 0) {
throw new ConfigErrorException("We detected XMIN transaction wraparound in the database, " +
"which makes this sync option inefficient and can lead to higher credit consumption. " +
"Please change the replication method to CDC or cursor based.");
}
}
}

public static Map<String, String> getConnectionProperties(final JsonNode config) {
final Map<String, String> customProperties =
config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.4.13 | 2024-06-04 | [38875](https://github.com/airbytehq/airbyte/pull/38875) | read() throws config exception upon detecting transaction ID wraparound. |
| 3.4.12 | 2024-06-04 | [38836](https://github.com/airbytehq/airbyte/pull/38836) | check() throws config error upon detecting transaction ID wraparound. |
| 3.4.11 | 2024-06-04 | [38848](https://github.com/airbytehq/airbyte/pull/38848) | Improve UI message and doc on xmin |
| 3.4.10 | 2024-05-29 | [38584](https://github.com/airbytehq/airbyte/pull/38584) | Set is_resumable flag in discover. |
Expand Down
Loading