Skip to content

Commit

Permalink
cdk-cleanup-snowflake
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa committed May 22, 2024
1 parent 5b7873a commit 3ac8c86
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 1 deletion.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.35.8 | 2024-05-22 | [\#38572](https://github.com/airbytehq/airbyte/pull/38572) | Add a temporary static method to decouple SnowflakeDestination from AbstractJdbcDestination |
| 0.35.7 | 2024-05-20 | [\#38357](https://github.com/airbytehq/airbyte/pull/38357) | Decouple create namespace from per stream operation interface. |
| 0.35.6 | 2024-05-17 | [\#38107](https://github.com/airbytehq/airbyte/pull/38107) | New interfaces for Destination connectors to plug into AsyncStreamConsumer |
| 0.35.5 | 2024-05-17 | [\#38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume-role authentication to s3 |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.35.7
version=0.35.8
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination.jdbc

import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.db.jdbc.JdbcUtils
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage
import io.airbyte.commons.exceptions.ConnectionErrorException
import io.airbyte.commons.json.Jsons
import java.sql.Connection
import java.sql.ResultSet
import java.sql.SQLException
import java.util.*

object JdbcCheckOperations {

/**
* Verifies if provided creds has enough permissions. Steps are: 1. Create schema if not exists.
* 2. Create test table. 3. Insert dummy record to newly created table if "attemptInsert" set to
* true.
* 4. Delete table created on step 2.
*
* @param outputSchema
* - schema to tests against.
* @param database
* - database to tests against.
* @param namingResolver
* - naming resolver.
* @param sqlOps
* - SqlOperations object
* @param attemptInsert
* - set true if need to make attempt to insert dummy records to newly created table. Set false
* to skip insert step.
*/
@JvmStatic
@Throws(Exception::class)
fun attemptTableOperations(
outputSchema: String?,
database: JdbcDatabase,
namingResolver: NamingConventionTransformer,
sqlOps: SqlOperations,
attemptInsert: Boolean
) {
// verify we have write permissions on the target schema by creating a table with a
// random name,
// then dropping that table
try {
// Get metadata from the database to see whether connection is possible
database.bufferedResultSetQuery(
{ conn: Connection -> conn.metaData.catalogs },
{ queryContext: ResultSet? ->
JdbcUtils.defaultSourceOperations.rowToJson(queryContext!!)
},
)

// verify we have write permissions on the target schema by creating a table with a
// random name,
// then dropping that table
val outputTableName =
namingResolver.getIdentifier(
"_airbyte_connection_test_" +
UUID.randomUUID().toString().replace("-".toRegex(), ""),
)
sqlOps.createSchemaIfNotExists(database, outputSchema)
sqlOps.createTableIfNotExists(database, outputSchema, outputTableName)
// verify if user has permission to make SQL INSERT queries
try {
if (attemptInsert) {
sqlOps.insertRecords(
database,
listOf(dummyRecord),
outputSchema,
outputTableName,
)
}
} finally {
sqlOps.dropTableIfExists(database, outputSchema, outputTableName)
}
} catch (e: SQLException) {
if (Objects.isNull(e.cause) || e.cause !is SQLException) {
throw ConnectionErrorException(e.sqlState, e.errorCode, e.message, e)
} else {
val cause = e.cause as SQLException?
throw ConnectionErrorException(e.sqlState, cause!!.errorCode, cause.message, e)
}
} catch (e: Exception) {
throw Exception(e)
}
}

private val dummyRecord: PartialAirbyteMessage
/**
* Generates a dummy AirbyteRecordMessage with random values.
*
* @return AirbyteRecordMessage object with dummy values that may be used to test insert
* permission.
*/
get() {
val dummyDataToInsert = Jsons.deserialize("{ \"field1\": true }")
return PartialAirbyteMessage()
.withRecord(
PartialAirbyteRecordMessage()
.withStream("stream1")
.withEmittedAt(1602637589000L),
)
.withSerialized(dummyDataToInsert.toString())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import javax.sql.DataSource
import org.slf4j.Logger
import org.slf4j.LoggerFactory

// TODO: Delete this class, this is only used in StarburstGalaxyDestination
object CopyConsumerFactory {
private val LOGGER: Logger = LoggerFactory.getLogger(CopyConsumerFactory::class.java)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import javax.sql.DataSource
import org.slf4j.Logger
import org.slf4j.LoggerFactory

// TODO: Delete this class, this is only used in StarburstGalaxyDestination
abstract class CopyDestination : BaseConnector, Destination {
/**
* The default database schema field in the destination config is "schema". To change it, pass
Expand Down

0 comments on commit 3ac8c86

Please sign in to comment.