From f519db1dba3f0ade8a1775d6c75fd279f5271b12 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Wed, 22 May 2024 15:56:22 -0700 Subject: [PATCH] Destinations CDK: Minor cleanup for snowflake (#38572) --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../src/main/resources/version.properties | 2 +- .../destination/jdbc/JdbcCheckOperations.kt | 112 ++++++++++++++++++ .../jdbc/copy/CopyConsumerFactory.kt | 1 + .../destination/jdbc/copy/CopyDestination.kt | 1 + 5 files changed, 116 insertions(+), 1 deletion(-) create mode 100644 airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcCheckOperations.kt diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 002c787d89ce..486498725aa6 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,6 +174,7 @@ corresponds to that version. | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.35.8 | 2024-05-22 | [\#38572](https://github.com/airbytehq/airbyte/pull/38572) | Add a temporary static method to decouple SnowflakeDestination from AbstractJdbcDestination | | 0.35.7 | 2024-05-20 | [\#38357](https://github.com/airbytehq/airbyte/pull/38357) | Decouple create namespace from per stream operation interface. | | 0.35.6 | 2024-05-17 | [\#38107](https://github.com/airbytehq/airbyte/pull/38107) | New interfaces for Destination connectors to plug into AsyncStreamConsumer | | 0.35.5 | 2024-05-17 | [\#38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume-role authentication to s3 | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index bc89ad19a78c..979d376b3a22 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.35.7 +version=0.35.8 diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcCheckOperations.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcCheckOperations.kt new file mode 100644 index 000000000000..9eed1c4b195e --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcCheckOperations.kt @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.destination.jdbc + +import io.airbyte.cdk.db.jdbc.JdbcDatabase +import io.airbyte.cdk.db.jdbc.JdbcUtils +import io.airbyte.cdk.integrations.destination.NamingConventionTransformer +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage +import io.airbyte.commons.exceptions.ConnectionErrorException +import io.airbyte.commons.json.Jsons +import java.sql.Connection +import java.sql.ResultSet +import java.sql.SQLException +import java.util.* + +object JdbcCheckOperations { + + /** + * Verifies if provided creds has enough permissions. Steps are: 1. Create schema if not exists. + * 2. Create test table. 3. Insert dummy record to newly created table if "attemptInsert" set to + * true. + * 4. Delete table created on step 2. + * + * @param outputSchema + * - schema to tests against. + * @param database + * - database to tests against. + * @param namingResolver + * - naming resolver. + * @param sqlOps + * - SqlOperations object + * @param attemptInsert + * - set true if need to make attempt to insert dummy records to newly created table. Set false + * to skip insert step. + */ + @JvmStatic + @Throws(Exception::class) + fun attemptTableOperations( + outputSchema: String?, + database: JdbcDatabase, + namingResolver: NamingConventionTransformer, + sqlOps: SqlOperations, + attemptInsert: Boolean + ) { + // verify we have write permissions on the target schema by creating a table with a + // random name, + // then dropping that table + try { + // Get metadata from the database to see whether connection is possible + database.bufferedResultSetQuery( + { conn: Connection -> conn.metaData.catalogs }, + { queryContext: ResultSet? -> + JdbcUtils.defaultSourceOperations.rowToJson(queryContext!!) + }, + ) + + // verify we have write permissions on the target schema by creating a table with a + // random name, + // then dropping that table + val outputTableName = + namingResolver.getIdentifier( + "_airbyte_connection_test_" + + UUID.randomUUID().toString().replace("-".toRegex(), ""), + ) + sqlOps.createSchemaIfNotExists(database, outputSchema) + sqlOps.createTableIfNotExists(database, outputSchema, outputTableName) + // verify if user has permission to make SQL INSERT queries + try { + if (attemptInsert) { + sqlOps.insertRecords( + database, + listOf(dummyRecord), + outputSchema, + outputTableName, + ) + } + } finally { + sqlOps.dropTableIfExists(database, outputSchema, outputTableName) + } + } catch (e: SQLException) { + if (Objects.isNull(e.cause) || e.cause !is SQLException) { + throw ConnectionErrorException(e.sqlState, e.errorCode, e.message, e) + } else { + val cause = e.cause as SQLException? + throw ConnectionErrorException(e.sqlState, cause!!.errorCode, cause.message, e) + } + } catch (e: Exception) { + throw Exception(e) + } + } + + private val dummyRecord: PartialAirbyteMessage + /** + * Generates a dummy AirbyteRecordMessage with random values. + * + * @return AirbyteRecordMessage object with dummy values that may be used to test insert + * permission. + */ + get() { + val dummyDataToInsert = Jsons.deserialize("{ \"field1\": true }") + return PartialAirbyteMessage() + .withRecord( + PartialAirbyteRecordMessage() + .withStream("stream1") + .withEmittedAt(1602637589000L), + ) + .withSerialized(dummyDataToInsert.toString()) + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyConsumerFactory.kt index c1a1a9d9a98e..ad844f5d1505 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyConsumerFactory.kt @@ -20,6 +20,7 @@ import javax.sql.DataSource import org.slf4j.Logger import org.slf4j.LoggerFactory +// TODO: Delete this class, this is only used in StarburstGalaxyDestination object CopyConsumerFactory { private val LOGGER: Logger = LoggerFactory.getLogger(CopyConsumerFactory::class.java) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyDestination.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyDestination.kt index f8f11b717d1c..0895df3cd18a 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyDestination.kt @@ -20,6 +20,7 @@ import javax.sql.DataSource import org.slf4j.Logger import org.slf4j.LoggerFactory +// TODO: Delete this class, this is only used in StarburstGalaxyDestination abstract class CopyDestination : BaseConnector, Destination { /** * The default database schema field in the destination config is "schema". To change it, pass