Skip to content

Commit

Permalink
CDK: throw config exception if no selectable table exists in user pro…
Browse files Browse the repository at this point in the history
…vided schemas (#38792)

fixes airbytehq/airbyte-internal-issues#2252

"When we don't have permission to access the schema, the check succeeds, but refreshing the schema returns no tables. We should probably throw an error if the user configures the source schema as, e.g., public but doesn't grant our user permission to access that schema."

This patch includes a check that will throw a configuration error if a user does not have selectable tables from any of the provided schemas (in UI).If schemas actually contain no table, we would let the user proceed.
  • Loading branch information
theyueli authored May 31, 2024
1 parent 82aca52 commit 06b1b55
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager
import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.commons.functional.CheckedConsumer
import io.airbyte.commons.functional.CheckedFunction
import io.airbyte.commons.json.Jsons
Expand Down Expand Up @@ -251,6 +252,45 @@ abstract class AbstractJdbcSource<Datatype>(
)
}

/**
* Checks that current user can SELECT from the tables in the schemas. We can override this
* function if it takes too long to finish for a particular database source connector.
*/
@Throws(Exception::class)
protected open fun checkUserHasPrivileges(config: JsonNode?, database: JdbcDatabase) {
var schemas = ArrayList<String>()
if (config!!.has(JdbcUtils.SCHEMAS_KEY) && config[JdbcUtils.SCHEMAS_KEY].isArray) {
for (schema in config[JdbcUtils.SCHEMAS_KEY]) {
schemas.add(schema.asText())
}
}
// if UI has schemas specified, check if the user has select access to any table
if (schemas.isNotEmpty()) {
for (schema in schemas) {
LOGGER.info {
"Checking if the user can perform select to any table in schema: $schema"
}
val tablesOfSchema = database.metaData.getTables(null, schema, "%", null)
if (tablesOfSchema.next()) {
var privileges =
getPrivilegesTableForCurrentUser<JdbcPrivilegeDto>(database, schema)
if (privileges.isEmpty()) {
LOGGER.info { "No table from schema $schema is accessible for the user." }
throw ConfigErrorException(
"User lacks privileges to SELECT from any of the tables in schema $schema"
)
}
} else {
LOGGER.info { "Schema $schema does not contain any table." }
}
}
} else {
LOGGER.info {
"No schema has been provided at the moment, skip table permission check."
}
}
}

/**
* Configures a list of operations that can be used to check the connection to the source.
*
Expand All @@ -270,9 +310,10 @@ abstract class AbstractJdbcSource<Datatype>(
CheckedFunction { connection: Connection -> connection.metaData.catalogs },
CheckedFunction { queryResult: ResultSet ->
sourceOperations.rowToJson(queryResult)
}
},
)
}
},
CheckedConsumer { database: JdbcDatabase -> checkUserHasPrivileges(config, database) },
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ java {
airbyteJavaConnector {
cdkVersionRequired = '0.36.3'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
useLocalCdk = true
}

application {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ void testCheckReplicationAccessReplicationPrivilege() throws Exception {
testdb
.with("CREATE USER %s PASSWORD '%s';", cleanUserReplicationName, testdb.getPassword())
.with("ALTER USER %s REPLICATION;", cleanUserReplicationName)
// the following GRANT statements guarantees check will not fail at table permission check stage
.with("GRANT SELECT ON ALL TABLES IN SCHEMA %s TO %s;", modelsSchema(), cleanUserReplicationName)
.with("GRANT USAGE ON SCHEMA %s TO %s;", modelsSchema(), cleanUserReplicationName)
.with("GRANT SELECT ON ALL TABLES IN SCHEMA %s TO %s;", randomSchema(), cleanUserReplicationName)
.with("GRANT USAGE ON SCHEMA %s TO %s;", randomSchema(), cleanUserReplicationName)
.onClose("DROP OWNED BY %s;", cleanUserReplicationName)
.onClose("DROP USER %s;", cleanUserReplicationName);
final JsonNode testConfig = config();
Expand All @@ -185,6 +190,11 @@ void testCheckWithoutReplicationPermission() throws Exception {
final var cleanUserVanillaName = testdb.withNamespace("vanilla_user");
testdb
.with("CREATE USER %s PASSWORD '%s';", cleanUserVanillaName, testdb.getPassword())
// the following GRANT statements guarantees check will not fail at table permission check stage
.with("GRANT SELECT ON ALL TABLES IN SCHEMA %s TO %s;", modelsSchema(), cleanUserVanillaName)
.with("GRANT USAGE ON SCHEMA %s TO %s;", modelsSchema(), cleanUserVanillaName)
.with("GRANT SELECT ON ALL TABLES IN SCHEMA %s TO %s;", randomSchema(), cleanUserVanillaName)
.with("GRANT USAGE ON SCHEMA %s TO %s;", randomSchema(), cleanUserVanillaName)
.onClose("DROP OWNED BY %s;", cleanUserVanillaName)
.onClose("DROP USER %s;", cleanUserVanillaName);
final JsonNode testConfig = config();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.setEmittedAtToNull;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -33,6 +31,7 @@
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase.BaseImage;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase.ContainerModifier;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaType;
Expand All @@ -59,10 +58,7 @@
import java.util.stream.Collectors;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;

class PostgresSourceTest {

Expand Down Expand Up @@ -251,6 +247,23 @@ public void testCanReadUtf8() throws Exception {
}
}

@Test
void testCheckPrivilegesToSelectTable() throws Exception {
testdb.query(ctx -> {
ctx.execute("DROP TABLE id_and_name;");
ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));");
ctx.fetch("INSERT INTO id_and_name (id, name) VALUES (1,'John'), (2, 'Alfred'), (3, 'Alex');");
ctx.fetch("CREATE USER test_user_0 password '123';");
ctx.fetch("CREATE USER test_user_1 password '123';");
ctx.fetch("GRANT SELECT ON TABLE id_and_name TO test_user_1;");
return null;
});
final JsonNode configUser1 = getConfig("test_user_1", "123");
assertThat(source().check(configUser1).getStatus().equals(AirbyteConnectionStatus.Status.SUCCEEDED.toString()));
final JsonNode configUser0 = getConfig("test_user_0", "123");
assertThat(source().check(configUser0).getMessage().contains("User lacks privileges to SELECT from any of the tables in schema public"));
}

@Test
void testUserDoesntHasPrivilegesToSelectTable() throws Exception {
testdb.query(ctx -> {
Expand Down

0 comments on commit 06b1b55

Please sign in to comment.