Skip to content

Commit

Permalink
source-mysql-v2: check encryption for cloud deployments
Browse files Browse the repository at this point in the history
  • Loading branch information
postamar committed Oct 9, 2024
1 parent fb064cf commit 794fecc
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ package io.airbyte.integrations.source.mysql
import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.command.CdcSourceConfiguration
import io.airbyte.cdk.command.ConfigurationSpecificationSupplier
import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.command.JdbcSourceConfiguration
import io.airbyte.cdk.command.SourceConfiguration
import io.airbyte.cdk.command.SourceConfigurationFactory
import io.airbyte.cdk.ssh.SshConnectionOptions
import io.airbyte.cdk.ssh.SshNoTunnelMethod
import io.airbyte.cdk.ssh.SshTunnelMethodConfiguration
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Factory
import jakarta.inject.Inject
import jakarta.inject.Singleton
import java.net.URLDecoder
import java.nio.charset.StandardCharsets
Expand Down Expand Up @@ -67,8 +70,11 @@ enum class InvalidCdcCursorPositionBehavior {
}

@Singleton
class MysqlSourceConfigurationFactory :
class MysqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set<FeatureFlag>) :
SourceConfigurationFactory<MysqlSourceConfigurationSpecification, MysqlSourceConfiguration> {

constructor() : this(emptySet())

override fun makeWithoutExceptionHandling(
pojo: MysqlSourceConfigurationSpecification,
): MysqlSourceConfiguration {
Expand Down Expand Up @@ -98,7 +104,18 @@ class MysqlSourceConfigurationFactory :
val encryption: Encryption = pojo.getEncryptionValue()
val jdbcEncryption =
when (encryption) {
is EncryptionPreferred -> MysqlJdbcEncryption(sslMode = SSLMode.PREFERRED)
is EncryptionPreferred -> {
if (
featureFlags.contains(FeatureFlag.AIRBYTE_CLOUD_DEPLOYMENT) &&
sshTunnel is SshNoTunnelMethod
) {
throw ConfigErrorException(
"Connection from Airbyte Cloud requires " +
"SSL encryption or an SSH tunnel."
)
}
MysqlJdbcEncryption(sslMode = SSLMode.PREFERRED)
}
is EncryptionRequired -> MysqlJdbcEncryption(sslMode = SSLMode.REQUIRED)
is SslVerifyCertificate ->
MysqlJdbcEncryption(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,17 +286,17 @@ class SslVerifyIdentity : Encryption {

@ConfigurationProperties("$CONNECTOR_CONFIG_PREFIX.ssl_mode")
class MicronautPropertiesFriendlyEncryption {
var encryptionMethod: String = "preferred"
var mode: String = "preferred"
var sslCertificate: String? = null

@JsonValue
fun asEncryption(): Encryption =
when (encryptionMethod) {
when (mode) {
"preferred" -> EncryptionPreferred
"required" -> EncryptionRequired
"verify_ca" -> SslVerifyCertificate().also { it.sslCertificate = sslCertificate!! }
"verify_identity" -> SslVerifyIdentity().also { it.sslCertificate = sslCertificate!! }
else -> throw ConfigErrorException("invalid value $encryptionMethod")
else -> throw ConfigErrorException("invalid value $mode")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.integrations.source.mysql

import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.command.AIRBYTE_CLOUD_ENV
import io.airbyte.cdk.command.ConfigurationSpecificationSupplier
import io.airbyte.cdk.command.SourceConfigurationFactory
import io.airbyte.cdk.ssh.SshNoTunnelMethod
import io.airbyte.cdk.ssh.SshPasswordAuthTunnelMethod
import io.micronaut.context.annotation.Property
import io.micronaut.context.env.Environment
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
Expand All @@ -13,7 +14,7 @@ import java.time.Duration
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

@MicronautTest(environments = [Environment.TEST], rebuildContext = true)
@MicronautTest(environments = [Environment.TEST, AIRBYTE_CLOUD_ENV], rebuildContext = true)
class MysqlSourceConfigurationTest {
@Inject
lateinit var pojoSupplier:
Expand All @@ -24,7 +25,16 @@ class MysqlSourceConfigurationTest {
SourceConfigurationFactory<MysqlSourceConfigurationSpecification, MysqlSourceConfiguration>

@Test
@Property(name = "airbyte.connector.config.json", value = CONFIG)
@Property(name = "airbyte.connector.config.host", value = "localhost")
@Property(name = "airbyte.connector.config.port", value = "12345")
@Property(name = "airbyte.connector.config.username", value = "FOO")
@Property(name = "airbyte.connector.config.password", value = "BAR")
@Property(name = "airbyte.connector.config.database", value = "SYSTEM")
@Property(name = "airbyte.connector.config.ssl_mode.mode", value = "required")
@Property(
name = "airbyte.connector.config.jdbc_url_params",
value = "theAnswerToLiveAndEverything=42&sessionVariables=max_execution_time=10000&foo=bar&"
)
fun testParseJdbcParameters() {
val pojo: MysqlSourceConfigurationSpecification = pojoSupplier.get()

Expand All @@ -33,7 +43,7 @@ class MysqlSourceConfigurationTest {
Assertions.assertEquals(config.realHost, "localhost")
Assertions.assertEquals(config.realPort, 12345)
Assertions.assertEquals(config.namespaces, setOf("SYSTEM"))
Assertions.assertTrue(config.sshTunnel is SshPasswordAuthTunnelMethod)
Assertions.assertTrue(config.sshTunnel is SshNoTunnelMethod)

Assertions.assertEquals(config.jdbcProperties["user"], "FOO")
Assertions.assertEquals(config.jdbcProperties["password"], "BAR")
Expand All @@ -46,6 +56,19 @@ class MysqlSourceConfigurationTest {
Assertions.assertEquals(config.jdbcProperties["foo"], "bar")
}

@Test
@Property(name = "airbyte.connector.config.host", value = "localhost")
@Property(name = "airbyte.connector.config.port", value = "12345")
@Property(name = "airbyte.connector.config.username", value = "FOO")
@Property(name = "airbyte.connector.config.password", value = "BAR")
@Property(name = "airbyte.connector.config.database", value = "SYSTEM")
fun testAirbyteCloudDeployment() {
val pojo: MysqlSourceConfigurationSpecification = pojoSupplier.get()
Assertions.assertThrows(ConfigErrorException::class.java) {
factory.makeWithoutExceptionHandling(pojo)
}
}

@Test
@Property(name = "airbyte.connector.config.json", value = CONFIG_V1)
fun testParseConfigFromV1() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.integrations.source.mysql

import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.command.SourceConfigurationFactory
import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires
Expand All @@ -11,12 +12,12 @@ import java.time.Duration
@Singleton
@Requires(env = [Environment.TEST])
@Primary
class MysqlSourceTestConfigurationFactory :
class MysqlSourceTestConfigurationFactory(val featureFlags: Set<FeatureFlag>) :
SourceConfigurationFactory<MysqlSourceConfigurationSpecification, MysqlSourceConfiguration> {
override fun makeWithoutExceptionHandling(
pojo: MysqlSourceConfigurationSpecification,
): MysqlSourceConfiguration =
MysqlSourceConfigurationFactory()
MysqlSourceConfigurationFactory(featureFlags)
.makeWithoutExceptionHandling(pojo)
.copy(
maxConcurrency = 1,
Expand Down

0 comments on commit 794fecc

Please sign in to comment.