diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt index 3615f0225546..bf3aef37a175 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt @@ -47,6 +47,7 @@ import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier import io.airbyte.cdk.integrations.source.relationaldb.TableInfo import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager +import io.airbyte.commons.exceptions.ConfigErrorException import io.airbyte.commons.functional.CheckedConsumer import io.airbyte.commons.functional.CheckedFunction import io.airbyte.commons.json.Jsons @@ -251,6 +252,45 @@ abstract class AbstractJdbcSource( ) } + /** + * Checks that current user can SELECT from the tables in the schemas. We can override this + * function if it takes too long to finish for a particular database source connector. + */ + @Throws(Exception::class) + protected open fun checkUserHasPrivileges(config: JsonNode?, database: JdbcDatabase) { + var schemas = ArrayList() + if (config!!.has(JdbcUtils.SCHEMAS_KEY) && config[JdbcUtils.SCHEMAS_KEY].isArray) { + for (schema in config[JdbcUtils.SCHEMAS_KEY]) { + schemas.add(schema.asText()) + } + } + // if UI has schemas specified, check if the user has select access to any table + if (schemas.isNotEmpty()) { + for (schema in schemas) { + LOGGER.info { + "Checking if the user can perform select to any table in schema: $schema" + } + val tablesOfSchema = database.metaData.getTables(null, schema, "%", null) + if (tablesOfSchema.next()) { + var privileges = + getPrivilegesTableForCurrentUser(database, schema) + if (privileges.isEmpty()) { + LOGGER.info { "No table from schema $schema is accessible for the user." } + throw ConfigErrorException( + "User lacks privileges to SELECT from any of the tables in schema $schema" + ) + } + } else { + LOGGER.info { "Schema $schema does not contain any table." } + } + } + } else { + LOGGER.info { + "No schema has been provided at the moment, skip table permission check." + } + } + } + /** * Configures a list of operations that can be used to check the connection to the source. * @@ -270,9 +310,10 @@ abstract class AbstractJdbcSource( CheckedFunction { connection: Connection -> connection.metaData.catalogs }, CheckedFunction { queryResult: ResultSet -> sourceOperations.rowToJson(queryResult) - } + }, ) - } + }, + CheckedConsumer { database: JdbcDatabase -> checkUserHasPrivileges(config, database) }, ) } diff --git a/airbyte-integrations/connectors/source-postgres/build.gradle b/airbyte-integrations/connectors/source-postgres/build.gradle index 902dbb055594..903b52838723 100644 --- a/airbyte-integrations/connectors/source-postgres/build.gradle +++ b/airbyte-integrations/connectors/source-postgres/build.gradle @@ -14,7 +14,7 @@ java { airbyteJavaConnector { cdkVersionRequired = '0.36.3' features = ['db-sources', 'datastore-postgres'] - useLocalCdk = false + useLocalCdk = true } application { diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index 14ef8038eff7..abdc49164e42 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -172,6 +172,11 @@ void testCheckReplicationAccessReplicationPrivilege() throws Exception { testdb .with("CREATE USER %s PASSWORD '%s';", cleanUserReplicationName, testdb.getPassword()) .with("ALTER USER %s REPLICATION;", cleanUserReplicationName) + // the following GRANT statements guarantees check will not fail at table permission check stage + .with("GRANT SELECT ON ALL TABLES IN SCHEMA %s TO %s;", modelsSchema(), cleanUserReplicationName) + .with("GRANT USAGE ON SCHEMA %s TO %s;", modelsSchema(), cleanUserReplicationName) + .with("GRANT SELECT ON ALL TABLES IN SCHEMA %s TO %s;", randomSchema(), cleanUserReplicationName) + .with("GRANT USAGE ON SCHEMA %s TO %s;", randomSchema(), cleanUserReplicationName) .onClose("DROP OWNED BY %s;", cleanUserReplicationName) .onClose("DROP USER %s;", cleanUserReplicationName); final JsonNode testConfig = config(); @@ -185,6 +190,11 @@ void testCheckWithoutReplicationPermission() throws Exception { final var cleanUserVanillaName = testdb.withNamespace("vanilla_user"); testdb .with("CREATE USER %s PASSWORD '%s';", cleanUserVanillaName, testdb.getPassword()) + // the following GRANT statements guarantees check will not fail at table permission check stage + .with("GRANT SELECT ON ALL TABLES IN SCHEMA %s TO %s;", modelsSchema(), cleanUserVanillaName) + .with("GRANT USAGE ON SCHEMA %s TO %s;", modelsSchema(), cleanUserVanillaName) + .with("GRANT SELECT ON ALL TABLES IN SCHEMA %s TO %s;", randomSchema(), cleanUserVanillaName) + .with("GRANT USAGE ON SCHEMA %s TO %s;", randomSchema(), cleanUserVanillaName) .onClose("DROP OWNED BY %s;", cleanUserVanillaName) .onClose("DROP USER %s;", cleanUserVanillaName); final JsonNode testConfig = config(); diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index b0954e55862a..602df0c00eae 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -11,9 +11,7 @@ import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.setEmittedAtToNull; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -33,6 +31,7 @@ import io.airbyte.commons.util.MoreIterators; import io.airbyte.integrations.source.postgres.PostgresTestDatabase.BaseImage; import io.airbyte.integrations.source.postgres.PostgresTestDatabase.ContainerModifier; +import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive; import io.airbyte.protocol.models.JsonSchemaType; @@ -59,10 +58,7 @@ import java.util.stream.Collectors; import org.jooq.DSLContext; import org.jooq.SQLDialect; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; class PostgresSourceTest { @@ -251,6 +247,23 @@ public void testCanReadUtf8() throws Exception { } } + @Test + void testCheckPrivilegesToSelectTable() throws Exception { + testdb.query(ctx -> { + ctx.execute("DROP TABLE id_and_name;"); + ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); + ctx.fetch("INSERT INTO id_and_name (id, name) VALUES (1,'John'), (2, 'Alfred'), (3, 'Alex');"); + ctx.fetch("CREATE USER test_user_0 password '123';"); + ctx.fetch("CREATE USER test_user_1 password '123';"); + ctx.fetch("GRANT SELECT ON TABLE id_and_name TO test_user_1;"); + return null; + }); + final JsonNode configUser1 = getConfig("test_user_1", "123"); + assertThat(source().check(configUser1).getStatus().equals(AirbyteConnectionStatus.Status.SUCCEEDED.toString())); + final JsonNode configUser0 = getConfig("test_user_0", "123"); + assertThat(source().check(configUser0).getMessage().contains("User lacks privileges to SELECT from any of the tables in schema public")); + } + @Test void testUserDoesntHasPrivilegesToSelectTable() throws Exception { testdb.query(ctx -> {