From 46206e3b70407262ed3715bb42fceb76f93171b1 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Mon, 14 Oct 2024 11:15:43 -0700 Subject: [PATCH] Bulk Load CDK: AWS/S3 Configuration mix-ins, and noop S3V2 usage (#46744) --- .../load/command/DestinationConfiguration.kt | 46 ++++++++++ .../bulk/toolkits/load-aws/build.gradle | 4 + .../command/aws/AWSAccessKeySpecification.kt | 44 +++++++++ .../bulk/toolkits/load-s3/build.gradle | 7 ++ .../load/command/s3/S3BucketSpecification.kt | 92 +++++++++++++++++++ .../connectors/destination-s3-v2/build.gradle | 5 +- .../destination-s3-v2/metadata.yaml | 8 +- .../src/main/kotlin/S3V2Configuration.kt | 14 ++- .../src/main/kotlin/S3V2Specification.kt | 12 ++- .../destination/s3_v2/S3V2CheckTest.kt | 7 +- .../resources/expected-spec-cloud.json | 35 ++++++- .../resources/expected-spec-oss.json | 35 ++++++- 12 files changed, 300 insertions(+), 9 deletions(-) create mode 100644 airbyte-cdk/bulk/toolkits/load-aws/build.gradle create mode 100644 airbyte-cdk/bulk/toolkits/load-aws/src/main/kotlin/io/airbyte/cdk/load/command/aws/AWSAccessKeySpecification.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-s3/build.gradle create mode 100644 airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/command/s3/S3BucketSpecification.kt diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt index 8efb51666485..404e53cfda11 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt @@ -11,6 +11,52 @@ import io.micronaut.context.annotation.Factory import jakarta.inject.Singleton import java.nio.file.Path +/** + * To implement a [DestinationConfiguration]: + * + * - Create a class `{MyDestination}Specification` extending [ConfigurationSpecification] + * + * - Add any mixin `...Specification`s from this package (the jackson annotations will be inherited) + * + * - Add any required custom fields to the spec w/ jackson annotations + * + * - Create a class `{MyDestination}Configuration` extending [DestinationConfiguration] + * + * - Add the corresponding mixin `...ConfigurationProvider`s for any added spec mixins + * + * - (Add overrides for any fields provided by the providers) + * + * - Add custom config to the configuration as needed + * + * - Implement `DestinationConfigurationFactory` as a @[Singleton], using the `to...Configuration` + * methods from the specs to map to the provided configuration fields + * + * - (Set your custom fields as needed.) + * + * - Add a @[Factory] injected with [DestinationConfiguration], returning a @[Singleton] downcast to + * your implementation; ie, + * + * ``` + * @Factory + * class MyDestinationConfigurationProvider( + * private val config: DestinationConfiguration + * ){ + * @Singleton + * fun destinationConfig(): MyDestinationConfiguration = + * config as MyDestinationConfiguration + * } + * ``` + * + * Now your configuration will be automatically parsed and available for injection. ie, + * + * ``` + * @Singleton + * class MyDestinationWriter( + * private val config: MyDestinationConfiguration // <- automatically injected by micronaut + * ): DestinationWriter { + * // ... + * ``` + */ abstract class DestinationConfiguration : Configuration { open val recordBatchSizeBytes: Long = 200L * 1024L * 1024L open val tmpFileDirectory: Path = Path.of("airbyte-cdk-load") diff --git a/airbyte-cdk/bulk/toolkits/load-aws/build.gradle b/airbyte-cdk/bulk/toolkits/load-aws/build.gradle new file mode 100644 index 000000000000..32b69385afe4 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-aws/build.gradle @@ -0,0 +1,4 @@ +dependencies { + implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base') + implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load') +} diff --git a/airbyte-cdk/bulk/toolkits/load-aws/src/main/kotlin/io/airbyte/cdk/load/command/aws/AWSAccessKeySpecification.kt b/airbyte-cdk/bulk/toolkits/load-aws/src/main/kotlin/io/airbyte/cdk/load/command/aws/AWSAccessKeySpecification.kt new file mode 100644 index 000000000000..17779147ad18 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-aws/src/main/kotlin/io/airbyte/cdk/load/command/aws/AWSAccessKeySpecification.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.command.aws + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.annotation.JsonPropertyDescription +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle + +/** + * Mix-in to a configuration to add AWS access key id and secret access key fields as properties. + * + * See [io.airbyte.cdk.load.command.DestinationConfiguration] for more details on how to use this + * interface. + */ +interface AWSAccessKeySpecification { + @get:JsonSchemaTitle("S3 Key ID") + @get:JsonPropertyDescription( + "The access key ID to access the S3 bucket. Airbyte requires Read and Write permissions to the given bucket. Read more here." + ) + @get:JsonProperty("access_key_id") + @get:JsonSchemaInject(json = """{"examples":["A012345678910EXAMPLE"]}""") + val accessKeyId: String + + @get:JsonSchemaTitle("S3 Access Key") + @get:JsonPropertyDescription( + "The corresponding secret to the access key ID. Read more here" + ) + @get:JsonProperty("secret_access_key") + @get:JsonSchemaInject(json = """{"examples":["a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"]}""") + val secretAccessKey: String + + fun toAWSAccessKeyConfiguration(): AWSAccessKeyConfiguration { + return AWSAccessKeyConfiguration(accessKeyId, secretAccessKey) + } +} + +data class AWSAccessKeyConfiguration(val accessKeyId: String, val secretAccessKey: String) + +interface AWSAccessKeyConfigurationProvider { + val awsAccessKeyConfiguration: AWSAccessKeyConfiguration +} diff --git a/airbyte-cdk/bulk/toolkits/load-s3/build.gradle b/airbyte-cdk/bulk/toolkits/load-s3/build.gradle new file mode 100644 index 000000000000..e2f3228e5791 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-s3/build.gradle @@ -0,0 +1,7 @@ +dependencies { + implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base') + implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load') + implementation project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-aws') + + implementation("aws.sdk.kotlin:s3:1.0.0") +} diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/command/s3/S3BucketSpecification.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/command/s3/S3BucketSpecification.kt new file mode 100644 index 000000000000..8b537653175a --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/command/s3/S3BucketSpecification.kt @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.command.s3 + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.annotation.JsonPropertyDescription +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle + +enum class S3BucketRegion { + `af-south-1`, + `ap-east-1`, + `ap-northeast-1`, + `ap-northeast-2`, + `ap-northeast-3`, + `ap-south-1`, + `ap-south-2`, + `ap-southeast-1`, + `ap-southeast-2`, + `ap-southeast-3`, + `ap-southeast-4`, + `ca-central-1`, + `ca-west-1`, + `cn-north-1`, + `cn-northwest-1`, + `eu-central-1`, + `eu-central-2`, + `eu-north-1`, + `eu-south-1`, + `eu-south-2`, + `eu-west-1`, + `eu-west-2`, + `eu-west-3`, + `il-central-1`, + `me-central-1`, + `me-south-1`, + `sa-east-1`, + `us-east-1`, + `us-east-2`, + `us-gov-east-1`, + `us-gov-west-1`, + `us-west-1`, + `us-west-2` +} + +/** + * Mix-in to provide S3 bucket configuration fields as properties. + * + * See [io.airbyte.cdk.load.command.DestinationConfiguration] for more details on how to use this + * interface. + */ +interface S3BucketSpecification { + @get:JsonSchemaTitle("S3 Bucket Name") + @get:JsonPropertyDescription( + "The name of the S3 bucket. Read more here." + ) + @get:JsonProperty("s3_bucket_name") + @get:JsonSchemaInject(json = """{"examples":["airbyte_sync"]}""") + val s3BucketName: String + + @get:JsonSchemaTitle("S3 Bucket Region") + @get:JsonPropertyDescription( + "The region of the S3 bucket. See here for all region codes." + ) + @get:JsonProperty("s3_bucket_region", defaultValue = "") + @get:JsonSchemaInject(json = """{"examples":["us-east-1"]}""") + val s3BucketRegion: S3BucketRegion + + @get:JsonSchemaTitle("S3 Endpoint") + @get:JsonPropertyDescription( + "Your S3 endpoint url. Read more here" + ) + @get:JsonProperty("s3_endpoint", defaultValue = "", required = false) + @get:JsonSchemaInject(json = """{"examples":["http://localhost:9000"]}""") + val s3Endpoint: String? + + fun toS3BucketConfiguration(): S3BucketConfiguration { + return S3BucketConfiguration(s3BucketName, s3BucketRegion, s3Endpoint) + } +} + +data class S3BucketConfiguration( + val s3BucketName: String, + val s3BucketRegion: S3BucketRegion, + val s3Endpoint: String? +) + +interface S3BucketConfigurationProvider { + val s3BucketConfiguration: S3BucketConfiguration +} diff --git a/airbyte-integrations/connectors/destination-s3-v2/build.gradle b/airbyte-integrations/connectors/destination-s3-v2/build.gradle index 347cb34a5460..c77044ab3e47 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/build.gradle +++ b/airbyte-integrations/connectors/destination-s3-v2/build.gradle @@ -5,13 +5,14 @@ plugins { airbyteBulkConnector { core = 'load' - toolkits = [] + toolkits = ['load-aws', 'load-s3'] cdk = 'local' } application { mainClass = 'io.airbyte.integrations.destination.s3_v2.S3V2Destination' - applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'] + + //applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'] // Uncomment and replace to run locally //applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0', '--add-opens', 'java.base/sun.nio.ch=ALL-UNNAMED', '--add-opens', 'java.base/sun.security.action=ALL-UNNAMED', '--add-opens', 'java.base/java.lang=ALL-UNNAMED'] diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index c1a9e6fd7874..d35faf5c6764 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 - dockerImageTag: 0.1.0 + dockerImageTag: 0.1.1 dockerRepository: airbyte/destination-s3-v2 githubIssueLabel: destination-s3-v2 icon: s3.svg @@ -25,4 +25,10 @@ data: connectorTestSuitesOptions: - suite: unitTests - suite: integrationTests + testSecrets: + - name: SECRET_DESTINATION-S3-V2-MINIMAL-REQUIRED-CONFIG + fileName: s3_dest_v2_minimal_required_config.json + secretStore: + type: GSM + alias: airbyte-connector-testing-secret-store metadataSpecVersion: "1.0" diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Configuration.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Configuration.kt index 85e53c5ca254..7be9b5e9b837 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Configuration.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Configuration.kt @@ -6,16 +6,26 @@ package io.airbyte.integrations.destination.s3_v2 import io.airbyte.cdk.load.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationConfigurationFactory +import io.airbyte.cdk.load.command.aws.AWSAccessKeyConfiguration +import io.airbyte.cdk.load.command.aws.AWSAccessKeyConfigurationProvider +import io.airbyte.cdk.load.command.s3.S3BucketConfiguration +import io.airbyte.cdk.load.command.s3.S3BucketConfigurationProvider import io.micronaut.context.annotation.Factory import jakarta.inject.Singleton -data object S3V2Configuration : DestinationConfiguration() +data class S3V2Configuration( + override val awsAccessKeyConfiguration: AWSAccessKeyConfiguration, + override val s3BucketConfiguration: S3BucketConfiguration +) : DestinationConfiguration(), AWSAccessKeyConfigurationProvider, S3BucketConfigurationProvider @Singleton class S3V2ConfigurationFactory : DestinationConfigurationFactory { override fun makeWithoutExceptionHandling(pojo: S3V2Specification): S3V2Configuration { - return S3V2Configuration + return S3V2Configuration( + awsAccessKeyConfiguration = pojo.toAWSAccessKeyConfiguration(), + s3BucketConfiguration = pojo.toS3BucketConfiguration() + ) } } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt index c111ca986a7b..4a299eceecf2 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt @@ -6,13 +6,23 @@ package io.airbyte.integrations.destination.s3_v2 import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import io.airbyte.cdk.command.ConfigurationSpecification +import io.airbyte.cdk.load.command.aws.AWSAccessKeySpecification +import io.airbyte.cdk.load.command.s3.S3BucketRegion +import io.airbyte.cdk.load.command.s3.S3BucketSpecification import io.airbyte.cdk.load.spec.DestinationSpecificationExtension import io.airbyte.protocol.models.v0.DestinationSyncMode import jakarta.inject.Singleton @Singleton @JsonSchemaTitle("S3 V2 Destination Spec") -class S3V2Specification : ConfigurationSpecification() +class S3V2Specification : + ConfigurationSpecification(), AWSAccessKeySpecification, S3BucketSpecification { + override val accessKeyId: String = "" + override val secretAccessKey: String = "" + override val s3BucketName: String = "" + override val s3BucketRegion: S3BucketRegion = S3BucketRegion.`us-west-1` + override val s3Endpoint: String? = null +} @Singleton class S3V2SpecificationExtension : DestinationSpecificationExtension { diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt index 2db7fd3e592b..04d722e82cda 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt @@ -13,7 +13,12 @@ class S3V2CheckTest : CheckIntegrationTest( S3V2Specification::class.java, successConfigFilenames = - listOf(CheckTestConfig("test-configs/default.json", TestDeploymentMode.CLOUD)), + listOf( + CheckTestConfig( + "secrets/s3_dest_v2_minimal_required_config.json", + TestDeploymentMode.CLOUD + ) + ), failConfigFilenamesAndFailureReasons = emptyMap() ) { @Test diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json index 0562b559e6eb..27374908d608 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json @@ -5,7 +5,40 @@ "title" : "S3 V2 Destination Spec", "type" : "object", "additionalProperties" : true, - "properties" : { } + "properties" : { + "access_key_id" : { + "type" : "string", + "description" : "The access key ID to access the S3 bucket. Airbyte requires Read and Write permissions to the given bucket. Read more here.", + "title" : "S3 Key ID", + "examples" : [ "A012345678910EXAMPLE" ] + }, + "secret_access_key" : { + "type" : "string", + "description" : "The corresponding secret to the access key ID. Read more here", + "title" : "S3 Access Key", + "examples" : [ "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY" ] + }, + "s3_bucket_name" : { + "type" : "string", + "description" : "The name of the S3 bucket. Read more here.", + "title" : "S3 Bucket Name", + "examples" : [ "airbyte_sync" ] + }, + "s3_bucket_region" : { + "type" : "string", + "enum" : [ "af-south-1", "ap-east-1", "ap-northeast-1", "ap-northeast-2", "ap-northeast-3", "ap-south-1", "ap-south-2", "ap-southeast-1", "ap-southeast-2", "ap-southeast-3", "ap-southeast-4", "ca-central-1", "ca-west-1", "cn-north-1", "cn-northwest-1", "eu-central-1", "eu-central-2", "eu-north-1", "eu-south-1", "eu-south-2", "eu-west-1", "eu-west-2", "eu-west-3", "il-central-1", "me-central-1", "me-south-1", "sa-east-1", "us-east-1", "us-east-2", "us-gov-east-1", "us-gov-west-1", "us-west-1", "us-west-2" ], + "description" : "The region of the S3 bucket. See here for all region codes.", + "title" : "S3 Bucket Region", + "examples" : [ "us-east-1" ] + }, + "s3_endpoint" : { + "type" : "string", + "description" : "Your S3 endpoint url. Read more here", + "title" : "S3 Endpoint", + "examples" : [ "http://localhost:9000" ] + } + }, + "required" : [ "access_key_id", "secret_access_key", "s3_bucket_name", "s3_bucket_region" ] }, "supportsIncremental" : true, "supportsNormalization" : false, diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json index 0562b559e6eb..27374908d608 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json @@ -5,7 +5,40 @@ "title" : "S3 V2 Destination Spec", "type" : "object", "additionalProperties" : true, - "properties" : { } + "properties" : { + "access_key_id" : { + "type" : "string", + "description" : "The access key ID to access the S3 bucket. Airbyte requires Read and Write permissions to the given bucket. Read more here.", + "title" : "S3 Key ID", + "examples" : [ "A012345678910EXAMPLE" ] + }, + "secret_access_key" : { + "type" : "string", + "description" : "The corresponding secret to the access key ID. Read more here", + "title" : "S3 Access Key", + "examples" : [ "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY" ] + }, + "s3_bucket_name" : { + "type" : "string", + "description" : "The name of the S3 bucket. Read more here.", + "title" : "S3 Bucket Name", + "examples" : [ "airbyte_sync" ] + }, + "s3_bucket_region" : { + "type" : "string", + "enum" : [ "af-south-1", "ap-east-1", "ap-northeast-1", "ap-northeast-2", "ap-northeast-3", "ap-south-1", "ap-south-2", "ap-southeast-1", "ap-southeast-2", "ap-southeast-3", "ap-southeast-4", "ca-central-1", "ca-west-1", "cn-north-1", "cn-northwest-1", "eu-central-1", "eu-central-2", "eu-north-1", "eu-south-1", "eu-south-2", "eu-west-1", "eu-west-2", "eu-west-3", "il-central-1", "me-central-1", "me-south-1", "sa-east-1", "us-east-1", "us-east-2", "us-gov-east-1", "us-gov-west-1", "us-west-1", "us-west-2" ], + "description" : "The region of the S3 bucket. See here for all region codes.", + "title" : "S3 Bucket Region", + "examples" : [ "us-east-1" ] + }, + "s3_endpoint" : { + "type" : "string", + "description" : "Your S3 endpoint url. Read more here", + "title" : "S3 Endpoint", + "examples" : [ "http://localhost:9000" ] + } + }, + "required" : [ "access_key_id", "secret_access_key", "s3_bucket_name", "s3_bucket_region" ] }, "supportsIncremental" : true, "supportsNormalization" : false,