Skip to content

Commit

Permalink
[DB source errors] : Handle common transient errors (#38104)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored May 14, 2024
1 parent 59cdc36 commit fcbfaa3
Show file tree
Hide file tree
Showing 11 changed files with 29 additions and 6 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.35.2 | 2024-05-13 | [\#38104](https://github.com/airbytehq/airbyte/pull/38104) | Handle transient error messages |
| 0.35.0 | 2024-05-13 | [\#38127](https://github.com/airbytehq/airbyte/pull/38127) | Destinations: Populate generation/sync ID on StreamConfig |
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.commons.exceptions.ConnectionErrorException
import io.airbyte.commons.exceptions.TransientErrorException
import io.airbyte.commons.functional.Either
import java.io.EOFException
import java.sql.SQLException
import java.sql.SQLSyntaxErrorException
import java.util.stream.Collectors
Expand All @@ -25,6 +26,8 @@ object ConnectorExceptionUtil {
const val RECOVERY_CONNECTION_ERROR_MESSAGE: String =
"We're having issues syncing from a Postgres replica that is configured as a hot standby server. " +
"Please see https://go.airbyte.com/pg-hot-standby-error-message for options and workarounds"
const val DATABASE_CONNECTION_ERROR: String =
"Encountered an error while connecting to the database error"

@JvmField val HTTP_AUTHENTICATION_ERROR_CODES: List<Int> = ImmutableList.of(401, 403)

Expand All @@ -35,7 +38,10 @@ object ConnectorExceptionUtil {
}

fun isTransientError(e: Throwable?): Boolean {
return isTransientErrorException(e) || isRecoveryConnectionException(e)
return isTransientErrorException(e) ||
isRecoveryConnectionException(e) ||
isTransientEOFException(e) ||
isTransientSQLException(e)
}

fun getDisplayMessage(e: Throwable?): String? {
Expand All @@ -49,6 +55,8 @@ object ConnectorExceptionUtil {
RECOVERY_CONNECTION_ERROR_MESSAGE
} else if (isUnknownColumnInFieldListException(e)) {
e!!.message
} else if (isTransientError(e)) {
DATABASE_CONNECTION_ERROR
} else {
String.format(
COMMON_EXCEPTION_MESSAGE_TEMPLATE,
Expand Down Expand Up @@ -137,6 +145,16 @@ object ConnectorExceptionUtil {
return e is ConnectionErrorException
}

private fun isTransientEOFException(e: Throwable?): Boolean {
return (e is EOFException) &&
e.message!!.lowercase().contains("connection was unexpectedly lost")
}

private fun isTransientSQLException(e: Throwable?): Boolean {
return (e is SQLException) &&
e.message!!.lowercase().contains("An I/O error occurred while sending to the backend")
}

private fun isRecoveryConnectionException(e: Throwable?): Boolean {
return e is SQLException &&
e.message!!.lowercase().contains("due to conflict with recovery")
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.35.1
version=0.35.2
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.33.1'
cdkVersionRequired = '0.35.2'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.4.2
dockerImageTag: 3.4.3
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.junit.jupiter.api.Test;

@Order(2)
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH")
class MySqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest<MySqlSource, MySQLTestDatabase> {

protected static final String USERNAME_WITHOUT_PERMISSION = "new_user";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.junit.jupiter.api.Order;

@Order(3)
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH")
class MySqlSslJdbcSourceAcceptanceTest extends MySqlJdbcSourceAcceptanceTest {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.35.1'
cdkVersionRequired = '0.35.2'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.4.2
dockerImageTag: 3.4.3
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--------------------------------------------------------- |:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.4.3 | 2024-05-13 | [38104](https://github.com/airbytehq/airbyte/pull/38104) | Handle transient error messages. |
| 3.4.2 | 2024-05-07 | [38046](https://github.com/airbytehq/airbyte/pull/38046) | Resumeable refresh should run only if there is source defined pk. |
| 3.4.1 | 2024-05-03 | [37824](https://github.com/airbytehq/airbyte/pull/37824) | Fixed a bug on Resumeable full refresh where cursor based source throw NPE. |
| 3.4.0 | 2024-05-02 | [36932](https://github.com/airbytehq/airbyte/pull/36932) | Resumeable full refresh. Note please upgrade your platform - minimum platform version is 0.58.0. |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.4.3 | 2024-05-13 | [38104](https://github.com/airbytehq/airbyte/pull/38104) | Handle transient error messages. |
| 3.4.2 | 2024-05-10 | [38171](https://github.com/airbytehq/airbyte/pull/38171) | Bug fix on final state setup. |
| 3.4.1 | 2024-05-10 | [38130](https://github.com/airbytehq/airbyte/pull/38130) | Bug fix on old PG where ctid column not found when stream is a view. |
| 3.4.0 | 2024-04-29 | [37112](https://github.com/airbytehq/airbyte/pull/37112) | resumeable full refresh. |
Expand Down

0 comments on commit fcbfaa3

Please sign in to comment.