Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bulk-cdk: add feature flag environments #46692

Merged
merged 15 commits into from
Oct 18, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
package io.airbyte.cdk

import io.airbyte.cdk.command.ConnectorCommandLinePropertySource
import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.command.MetadataYamlPropertySource
import io.micronaut.configuration.picocli.MicronautFactory
import io.micronaut.context.ApplicationContext
import io.micronaut.context.RuntimeBeanDefinition
import io.micronaut.context.env.CommandLinePropertySource
import io.micronaut.context.env.Environment
import io.micronaut.context.env.MapPropertySource
import io.micronaut.core.cli.CommandLine as MicronautCommandLine
import java.nio.file.Path
import kotlin.system.exitProcess
Expand All @@ -21,9 +21,11 @@ import picocli.CommandLine.Model.UsageMessageSpec
class AirbyteSourceRunner(
/** CLI args. */
args: Array<out String>,
/** Environment variables. */
systemEnv: Map<String, String> = System.getenv(),
/** Micronaut bean definition overrides, used only for tests. */
vararg testBeanDefinitions: RuntimeBeanDefinition<*>,
) : AirbyteConnectorRunner("source", args, testBeanDefinitions) {
) : AirbyteConnectorRunner("source", args, systemEnv, testBeanDefinitions) {
companion object {
@JvmStatic
fun run(vararg args: String) {
Expand All @@ -36,10 +38,11 @@ class AirbyteSourceRunner(
class AirbyteDestinationRunner(
/** CLI args. */
args: Array<out String>,
testEnvironments: Map<String, String> = emptyMap(),
/** Environment variables. */
systemEnv: Map<String, String> = System.getenv(),
/** Micronaut bean definition overrides, used only for tests. */
vararg testBeanDefinitions: RuntimeBeanDefinition<*>,
) : AirbyteConnectorRunner("destination", args, testBeanDefinitions, testEnvironments) {
) : AirbyteConnectorRunner("destination", args, systemEnv, testBeanDefinitions) {
companion object {
@JvmStatic
fun run(vararg args: String) {
Expand All @@ -55,10 +58,13 @@ class AirbyteDestinationRunner(
sealed class AirbyteConnectorRunner(
val connectorType: String,
val args: Array<out String>,
systemEnv: Map<String, String>,
val testBeanDefinitions: Array<out RuntimeBeanDefinition<*>>,
val testProperties: Map<String, String> = emptyMap(),
) {
val envs: Array<String> = arrayOf(Environment.CLI, connectorType)

val envs: Array<String> =
arrayOf(Environment.CLI, connectorType) +
FeatureFlag.active(systemEnv).map { it.micronautEnvironmentName }

inline fun <reified R : Runnable> run() {
val picocliCommandLineFactory = PicocliCommandLineFactory(this)
Expand All @@ -73,7 +79,6 @@ sealed class AirbyteConnectorRunner(
ApplicationContext.builder(R::class.java, *envs)
.propertySources(
*listOfNotNull(
MapPropertySource("additional_properties", testProperties),
airbytePropertySource,
commandLinePropertySource,
MetadataYamlPropertySource(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.command

import io.micronaut.context.annotation.Factory
import io.micronaut.context.env.Environment
import jakarta.inject.Singleton
import java.util.EnumSet

/**
* An enum of all feature flags, currently these are set via environment vars.
*
* Micronaut can inject a Set<FeatureFlag> singleton of all active feature flags.
*/
enum class FeatureFlag(
val micronautEnvironmentName: String,
val envVar: EnvVar,
val requiredEnvVarValue: String,
private val transformActualValue: (String) -> String = { it }
) {

/** [AIRBYTE_CLOUD_DEPLOYMENT] is active when the connector is running in Airbyte Cloud. */
AIRBYTE_CLOUD_DEPLOYMENT(
micronautEnvironmentName = AIRBYTE_CLOUD_ENV,
envVar = EnvVar.DEPLOYMENT_MODE,
requiredEnvVarValue = "CLOUD",
transformActualValue = { it.trim().uppercase() },
);

/** Environment variable binding shell declaration which activates the feature flag. */
val envVarBindingDeclaration: String
get() = "${envVar.name}=$requiredEnvVarValue"

enum class EnvVar(val defaultValue: String = "") {
DEPLOYMENT_MODE
}

companion object {
internal fun active(systemEnv: Map<String, String>): List<FeatureFlag> =
entries.filter { featureFlag: FeatureFlag ->
val envVar: EnvVar = featureFlag.envVar
val envVarValue: String = systemEnv[envVar.name] ?: envVar.defaultValue
featureFlag.transformActualValue(envVarValue) == featureFlag.requiredEnvVarValue
}
}

@Factory
private class MicronautFactory {

@Singleton
fun active(environment: Environment): Set<FeatureFlag> =
EnumSet.noneOf(FeatureFlag::class.java).apply {
addAll(
FeatureFlag.entries.filter {
environment.activeNames.contains(it.micronautEnvironmentName)
}
)
}
}
}

const val AIRBYTE_CLOUD_ENV = "airbyte-cloud"
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ data object CliRunner {
config: ConfigurationSpecification? = null,
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
vararg featureFlags: FeatureFlag,
): CliRunnable {
val out = CliRunnerOutputStream()
val runnable: Runnable =
makeRunnable(op, config, catalog, state) { args: Array<String> ->
AirbyteSourceRunner(args, out.beanDefinition)
AirbyteSourceRunner(args, featureFlags.systemEnv, out.beanDefinition)
}
return CliRunnable(runnable, out.results)
}
Expand All @@ -52,7 +53,7 @@ data object CliRunner {
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
inputStream: InputStream,
testProperties: Map<String, String> = emptyMap(),
vararg featureFlags: FeatureFlag,
): CliRunnable {
val inputBeanDefinition: RuntimeBeanDefinition<InputStream> =
RuntimeBeanDefinition.builder(InputStream::class.java) { inputStream }
Expand All @@ -63,7 +64,7 @@ data object CliRunner {
makeRunnable(op, config, catalog, state) { args: Array<String> ->
AirbyteDestinationRunner(
args,
testProperties,
featureFlags.systemEnv,
inputBeanDefinition,
out.beanDefinition
)
Expand All @@ -77,6 +78,7 @@ data object CliRunner {
config: ConfigurationSpecification? = null,
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
featureFlags: Set<FeatureFlag> = setOf(),
vararg input: AirbyteMessage,
): CliRunnable {
val inputJsonBytes: ByteArray =
Expand All @@ -88,7 +90,7 @@ data object CliRunner {
baos.toByteArray()
}
val inputStream: InputStream = ByteArrayInputStream(inputJsonBytes)
return destination(op, config, catalog, state, inputStream)
return destination(op, config, catalog, state, inputStream, *featureFlags.toTypedArray())
}

private fun makeRunnable(
Expand Down Expand Up @@ -120,6 +122,9 @@ data object CliRunner {
}
}

private val Array<out FeatureFlag>.systemEnv: Map<String, String>
get() = toSet().map { it.envVar.name to it.requiredEnvVarValue }.toMap()

private fun inputFile(contents: Any?): Path? =
contents?.let {
Files.createTempFile(null, null).also { file ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
package io.airbyte.cdk.load.check

import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.command.ValidatedJsonUtils
import io.airbyte.cdk.load.test.util.FakeDataDumper
import io.airbyte.cdk.load.test.util.IntegrationTest
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.test.util.destination_process.TestDeploymentMode
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import java.nio.charset.StandardCharsets
Expand All @@ -23,7 +23,7 @@ import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll

data class CheckTestConfig(val configPath: String, val deploymentMode: TestDeploymentMode)
data class CheckTestConfig(val configPath: String, val featureFlags: Set<FeatureFlag> = emptySet())

open class CheckIntegrationTest<T : ConfigurationSpecification>(
val configurationClass: Class<T>,
Expand All @@ -37,14 +37,14 @@ open class CheckIntegrationTest<T : ConfigurationSpecification>(
) {
@Test
open fun testSuccessConfigs() {
for ((path, deploymentMode) in successConfigFilenames) {
for ((path, featureFlags) in successConfigFilenames) {
val fileContents = Files.readString(Path.of(path), StandardCharsets.UTF_8)
val config = ValidatedJsonUtils.parseOne(configurationClass, fileContents)
val process =
destinationProcessFactory.createDestinationProcess(
"check",
config = config,
deploymentMode = deploymentMode,
featureFlags = featureFlags.toTypedArray(),
)
runBlocking { process.run() }
val messages = process.readMessages()
Expand All @@ -66,14 +66,14 @@ open class CheckIntegrationTest<T : ConfigurationSpecification>(
@Test
open fun testFailConfigs() {
for ((checkTestConfig, failurePattern) in failConfigFilenamesAndFailureReasons) {
val (path, deploymentMode) = checkTestConfig
val (path, featureFlags) = checkTestConfig
val fileContents = Files.readString(Path.of(path))
val config = ValidatedJsonUtils.parseOne(configurationClass, fileContents)
val process =
destinationProcessFactory.createDestinationProcess(
"check",
config = config,
deploymentMode = deploymentMode,
featureFlags = featureFlags.toTypedArray(),
)
runBlocking { process.run() }
val messages = process.readMessages()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import com.deblock.jsondiff.matcher.LenientJsonObjectPartialMatcher
import com.deblock.jsondiff.matcher.StrictJsonArrayPartialMatcher
import com.deblock.jsondiff.matcher.StrictPrimitivePartialMatcher
import com.deblock.jsondiff.viewer.OnlyErrorDiffViewer
import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.load.test.util.FakeDataDumper
import io.airbyte.cdk.load.test.util.IntegrationTest
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.test.util.destination_process.TestDeploymentMode
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
import java.nio.file.Files
Expand All @@ -42,27 +42,26 @@ abstract class SpecTest :
) {
@Test
fun testSpecOss() {
testSpec(TestDeploymentMode.OSS)
testSpec("expected-spec-oss.json")
}

@Test
fun testSpecCloud() {
testSpec(TestDeploymentMode.CLOUD)
testSpec("expected-spec-cloud.json", FeatureFlag.AIRBYTE_CLOUD_DEPLOYMENT)
}

private fun testSpec(deploymentMode: TestDeploymentMode) {
val expectedSpecFilename = "expected-spec-${deploymentMode.name.lowercase()}.json"
private fun testSpec(
expectedSpecFilename: String,
vararg featureFlags: FeatureFlag,
) {
val expectedSpecPath = Path.of("src/test-integration/resources", expectedSpecFilename)

if (!Files.exists(expectedSpecPath)) {
Files.createFile(expectedSpecPath)
}
val expectedSpec = Files.readString(expectedSpecPath)
val process =
destinationProcessFactory.createDestinationProcess(
"spec",
deploymentMode = deploymentMode,
)
destinationProcessFactory.createDestinationProcess("spec", featureFlags = featureFlags)
runBlocking { process.run() }
val messages = process.readMessages()
val specMessages = messages.filter { it.type == AirbyteMessage.Type.SPEC }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.cdk.load.test.util.destination_process

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.load.test.util.IntegrationTest
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
Expand Down Expand Up @@ -40,11 +41,6 @@ interface DestinationProcess {
suspend fun shutdown()
}

enum class TestDeploymentMode {
CLOUD,
OSS
}

@SuppressFBWarnings("NP_NONNULL_RETURN_VIOLATION", "good old lateinit")
abstract class DestinationProcessFactory {
/**
Expand All @@ -58,6 +54,6 @@ abstract class DestinationProcessFactory {
command: String,
config: ConfigurationSpecification? = null,
catalog: ConfiguredAirbyteCatalog? = null,
deploymentMode: TestDeploymentMode = TestDeploymentMode.OSS,
vararg featureFlags: FeatureFlag,
): DestinationProcess
}
Loading
Loading