From 6050e74a95acaa3c6c189abc2ca190e97825c7fe Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Fri, 11 Oct 2024 10:55:55 -0700 Subject: [PATCH] Bulk Load CDK: GZip Compression and S3V2 Usage --- ...DestinationRecordToAirbyteValueWithMeta.kt | 2 + .../airbyte/cdk/load/file/StreamProcessor.kt | 26 ++++ .../toolkits/load-object-storage/build.gradle | 2 + .../ObjectStorageCompressionSpecification.kt | 86 ++++++++++++ .../ObjectStorageFormatSpecification.kt | 51 ++++++- .../object_storage/ObjectStorageClient.kt | 11 +- .../ObjectStoragePathFactory.kt | 18 ++- .../cdk/load/ObjectStorageDataDumper.kt | 62 +++++++++ .../bulk/toolkits/load-s3/build.gradle | 1 + .../load/command/s3/S3PathSpecification.kt | 9 ++ .../io/airbyte/cdk/load/file/s3/S3Client.kt | 100 +++++--------- .../cdk/load/file/s3/S3MultipartUpload.kt | 83 ++++++++++++ .../destination-s3-v2/metadata.yaml | 7 +- .../src/main/kotlin/S3V2Checker.kt | 11 +- .../src/main/kotlin/S3V2Configuration.kt | 25 ++-- .../src/main/kotlin/S3V2Writer.kt | 8 +- .../destination/s3_v2/S3V2CheckTest.kt | 6 +- .../destination/s3_v2/S3V2DataDumper.kt | 32 +++++ .../destination/s3_v2/S3V2TestUtils.kt | 7 +- .../destination/s3_v2/S3V2WriteTest.kt | 65 ++------- .../resources/expected-spec-cloud.json | 128 +++++++++++++----- .../resources/expected-spec-oss.json | 128 +++++++++++++----- 22 files changed, 646 insertions(+), 222 deletions(-) create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/file/StreamProcessor.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageCompressionSpecification.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt create mode 100644 airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2DataDumper.kt diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/DestinationRecordToAirbyteValueWithMeta.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/DestinationRecordToAirbyteValueWithMeta.kt index 0c80d9b6f1e9..84392b1503c3 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/DestinationRecordToAirbyteValueWithMeta.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/DestinationRecordToAirbyteValueWithMeta.kt @@ -7,10 +7,12 @@ package io.airbyte.cdk.load.data import io.airbyte.cdk.load.command.DestinationCatalog import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.message.DestinationRecord.Meta +import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton import java.util.* @Singleton +@Secondary class DestinationRecordToAirbyteValueWithMeta(private val catalog: DestinationCatalog) { fun decorate(record: DestinationRecord): ObjectValue { val streamActual = catalog.getStream(record.stream.name, record.stream.namespace) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/file/StreamProcessor.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/file/StreamProcessor.kt new file mode 100644 index 000000000000..1edc10a2c366 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/file/StreamProcessor.kt @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.file + +import java.io.ByteArrayOutputStream +import java.util.zip.GZIPOutputStream + +interface StreamProcessor { + val wrapper: (ByteArrayOutputStream) -> T + val partFinisher: T.() -> Unit + val extension: String? +} + +data object NoopProcessor : StreamProcessor { + override val wrapper: (ByteArrayOutputStream) -> ByteArrayOutputStream = { it } + override val partFinisher: ByteArrayOutputStream.() -> Unit = {} + override val extension: String? = null +} + +data object GZIPProcessor : StreamProcessor { + override val wrapper: (ByteArrayOutputStream) -> GZIPOutputStream = { GZIPOutputStream(it) } + override val partFinisher: GZIPOutputStream.() -> Unit = { finish() } + override val extension: String = "gz" +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle b/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle index 32b69385afe4..889fd7a5852f 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle @@ -1,4 +1,6 @@ dependencies { implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base') implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load') + + testFixturesImplementation testFixtures(project(":airbyte-cdk:bulk:core:bulk-cdk-core-load")) } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageCompressionSpecification.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageCompressionSpecification.kt new file mode 100644 index 000000000000..e1a5e6c64f52 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageCompressionSpecification.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.command.object_storage + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.annotation.JsonPropertyDescription +import com.fasterxml.jackson.annotation.JsonSubTypes +import com.fasterxml.jackson.annotation.JsonTypeInfo +import com.fasterxml.jackson.annotation.JsonValue +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import io.airbyte.cdk.load.file.GZIPProcessor +import io.airbyte.cdk.load.file.NoopProcessor +import io.airbyte.cdk.load.file.StreamProcessor +import java.io.ByteArrayOutputStream +import java.io.OutputStream + +/** + * Mix-in to provide file format configuration. + * + * The specification is intended to be applied to file formats that are compatible with file-level + * compression (csv, jsonl) and does not need to be added to the destination spec directly. The + * [ObjectStorageCompressionConfigurationProvider] can be added to the top-level + * [io.airbyte.cdk.load.command.DestinationConfiguration] and initialized directly with + * [ObjectStorageFormatSpecificationProvider.toObjectStorageFormatConfiguration]. (See the comments + * on [io.airbyte.cdk.load.command.DestinationConfiguration] for more details.) + */ +interface ObjectStorageCompressionSpecificationProvider { + @get:JsonSchemaTitle("Compression") + @get:JsonPropertyDescription( + "Whether the output files should be compressed. If compression is selected, the output filename will have an extra extension (GZIP: \".jsonl.gz\").", + ) + @get:JsonProperty("compression") + val compression: ObjectStorageCompressionSpecification + + fun toCompressionConfiguration(): ObjectStorageCompressionConfiguration<*> { + return when (compression) { + is NoCompressionSpecification -> ObjectStorageCompressionConfiguration(NoopProcessor) + is GZIPCompressionSpecification -> ObjectStorageCompressionConfiguration(GZIPProcessor) + } + } + + companion object { + fun getNoCompressionConfiguration(): + ObjectStorageCompressionConfiguration { + return ObjectStorageCompressionConfiguration(NoopProcessor) + } + } +} + +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.EXISTING_PROPERTY, + property = "compression_type", +) +@JsonSubTypes( + JsonSubTypes.Type(value = NoCompressionSpecification::class, name = "No Compression"), + JsonSubTypes.Type(value = GZIPCompressionSpecification::class, name = "GZIP"), +) +sealed class ObjectStorageCompressionSpecification( + @JsonProperty("compression_type") open val compressionType: Type +) { + enum class Type(@get:JsonValue val typeName: String) { + NoCompression("No Compression"), + GZIP("GZIP"), + } +} + +@JsonSchemaTitle("No Compression") +class NoCompressionSpecification( + @JsonProperty("compression_type") override val compressionType: Type = Type.NoCompression +) : ObjectStorageCompressionSpecification(compressionType) + +@JsonSchemaTitle("GZIP") +class GZIPCompressionSpecification( + @JsonProperty("compression_type") override val compressionType: Type = Type.GZIP +) : ObjectStorageCompressionSpecification(compressionType) + +data class ObjectStorageCompressionConfiguration( + val compressor: StreamProcessor +) + +interface ObjectStorageCompressionConfigurationProvider { + val objectStorageCompressionConfiguration: ObjectStorageCompressionConfiguration +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageFormatSpecification.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageFormatSpecification.kt index 9f51099e8d23..f8e57af85562 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageFormatSpecification.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageFormatSpecification.kt @@ -8,13 +8,24 @@ import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.annotation.JsonPropertyDescription import com.fasterxml.jackson.annotation.JsonSubTypes import com.fasterxml.jackson.annotation.JsonTypeInfo +import com.fasterxml.jackson.annotation.JsonValue import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +/** + * Mix-in to provide file format configuration. + * + * NOTE: This assumes a fixed set of file formats. If you need to support a different set, clone the + * [ObjectStorageFormatSpecification] class with a new set of enums. + * + * See [io.airbyte.cdk.load.command.DestinationConfiguration] for more details on how to use this + * interface. + */ interface ObjectStorageFormatSpecificationProvider { @get:JsonSchemaTitle("Output Format") @get:JsonPropertyDescription( "Format of the data output. See here for more details", ) + @get:JsonProperty("format") val format: ObjectStorageFormatSpecification fun toObjectStorageFormatConfiguration(): ObjectStorageFormatConfiguration { @@ -25,6 +36,15 @@ interface ObjectStorageFormatSpecificationProvider { is ParquetFormatSpecification -> ParquetFormatConfiguration() } } + + fun toCompressionConfiguration(): ObjectStorageCompressionConfiguration<*> { + return when (format) { + is ObjectStorageCompressionSpecificationProvider -> + (format as ObjectStorageCompressionSpecificationProvider) + .toCompressionConfiguration() + else -> ObjectStorageCompressionSpecificationProvider.getNoCompressionConfiguration() + } + } } @JsonTypeInfo( @@ -38,21 +58,40 @@ interface ObjectStorageFormatSpecificationProvider { JsonSubTypes.Type(value = AvroFormatSpecification::class, name = "AVRO"), JsonSubTypes.Type(value = ParquetFormatSpecification::class, name = "PARQUET") ) -sealed class ObjectStorageFormatSpecification { - @JsonSchemaTitle("Format Type") @JsonProperty("format_type") val formatType: String = "JSONL" +sealed class ObjectStorageFormatSpecification( + @JsonSchemaTitle("Format Type") @JsonProperty("format_type") open val formatType: Type +) { + enum class Type(@get:JsonValue val typeName: String) { + JSONL("JSONL"), + CSV("CSV"), + AVRO("AVRO"), + PARQUET("PARQUET") + } } @JsonSchemaTitle("JSON Lines: Newline-delimited JSON") -class JsonFormatSpecification : ObjectStorageFormatSpecification() +class JsonFormatSpecification( + @JsonProperty("format_type") override val formatType: Type = Type.JSONL +) : ObjectStorageFormatSpecification(formatType), ObjectStorageCompressionSpecificationProvider { + override val compression: ObjectStorageCompressionSpecification = NoCompressionSpecification() +} @JsonSchemaTitle("CSV: Comma-Separated Values") -class CSVFormatSpecification : ObjectStorageFormatSpecification() +class CSVFormatSpecification( + @JsonProperty("format_type") override val formatType: Type = Type.CSV +) : ObjectStorageFormatSpecification(formatType), ObjectStorageCompressionSpecificationProvider { + override val compression: ObjectStorageCompressionSpecification = NoCompressionSpecification() +} @JsonSchemaTitle("Avro: Apache Avro") -class AvroFormatSpecification : ObjectStorageFormatSpecification() +class AvroFormatSpecification( + @JsonProperty("format_type") override val formatType: Type = Type.AVRO +) : ObjectStorageFormatSpecification(formatType) @JsonSchemaTitle("Parquet: Columnar Storage") -class ParquetFormatSpecification : ObjectStorageFormatSpecification() +class ParquetFormatSpecification( + @JsonProperty("format_type") override val formatType: Type = Type.PARQUET +) : ObjectStorageFormatSpecification(formatType) interface OutputFormatConfigurationProvider { val outputFormat: ObjectStorageFormatConfiguration diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt index 7ddaa5e2b583..1d2e7faa66de 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt @@ -4,7 +4,10 @@ package io.airbyte.cdk.load.file.object_storage +import io.airbyte.cdk.load.file.NoopProcessor +import io.airbyte.cdk.load.file.StreamProcessor import java.io.InputStream +import java.io.OutputStream import kotlinx.coroutines.flow.Flow interface ObjectStorageClient, U : ObjectStorageStreamingUploadWriter> { @@ -13,7 +16,13 @@ interface ObjectStorageClient, U : ObjectStorageStreamingUpl suspend fun get(key: String, block: (InputStream) -> U): U suspend fun put(key: String, bytes: ByteArray): T suspend fun delete(remoteObject: T) - suspend fun streamingUpload(key: String, collector: suspend U.() -> Unit): T + suspend fun streamingUpload(key: String, block: suspend (U) -> Unit): T = + streamingUpload(key, NoopProcessor, block) + suspend fun streamingUpload( + key: String, + streamProcessor: StreamProcessor, + block: suspend (U) -> Unit + ): T } interface ObjectStorageStreamingUploadWriter { diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt index 1d5d93cca17e..a40a0b8ea44e 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt @@ -5,6 +5,7 @@ package io.airbyte.cdk.load.file.object_storage import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfigurationProvider import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider import io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfigurationProvider import io.airbyte.cdk.load.file.TimeProvider @@ -23,11 +24,21 @@ import java.util.* class ObjectStoragePathFactory( pathConfigProvider: ObjectStoragePathConfigurationProvider, formatConfigProvider: ObjectStorageFormatConfigurationProvider? = null, + compressionConfigProvider: ObjectStorageCompressionConfigurationProvider<*>? = null, timeProvider: TimeProvider? = null, ) { private val loadedAt = timeProvider?.let { Instant.ofEpochMilli(it.currentTimeMillis()) } private val pathConfig = pathConfigProvider.objectStoragePathConfiguration - private val defaultExtension = formatConfigProvider?.objectStorageFormatConfiguration?.extension + private val fileFormatExtension = + formatConfigProvider?.objectStorageFormatConfiguration?.extension + private val compressionExtension = + compressionConfigProvider?.objectStorageCompressionConfiguration?.compressor?.extension + private val defaultExtension = + if (fileFormatExtension != null && compressionExtension != null) { + "$fileFormatExtension.$compressionExtension" + } else { + fileFormatExtension ?: compressionExtension + } inner class VariableContext( val stream: DestinationStream, @@ -143,8 +154,9 @@ class ObjectStoragePathFactory( fun from(config: T, timeProvider: TimeProvider? = null): ObjectStoragePathFactory where T : ObjectStoragePathConfigurationProvider, - T : ObjectStorageFormatConfigurationProvider { - return ObjectStoragePathFactory(config, config, timeProvider) + T : ObjectStorageFormatConfigurationProvider, + T : ObjectStorageCompressionConfigurationProvider<*> { + return ObjectStoragePathFactory(config, config, config, timeProvider) } } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt new file mode 100644 index 000000000000..82396476119f --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfiguration +import io.airbyte.cdk.load.data.toAirbyteValue +import io.airbyte.cdk.load.file.GZIPProcessor +import io.airbyte.cdk.load.file.NoopProcessor +import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient +import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory +import io.airbyte.cdk.load.file.object_storage.RemoteObject +import io.airbyte.cdk.load.test.util.OutputRecord +import io.airbyte.cdk.load.test.util.toOutputRecord +import io.airbyte.cdk.load.util.deserializeToNode +import java.io.InputStream +import java.util.zip.GZIPInputStream +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext + +class ObjectStorageDataDumper( + private val stream: DestinationStream, + private val client: ObjectStorageClient<*, *>, + private val pathFactory: ObjectStoragePathFactory, + private val compressionConfig: ObjectStorageCompressionConfiguration<*>? = null +) { + fun dump(): List { + val prefix = pathFactory.getFinalDirectory(stream).toString() + return runBlocking { + withContext(Dispatchers.IO) { + client + .list(prefix) + .map { listedObject: RemoteObject<*> -> + client.get(listedObject.key) { objectData: InputStream -> + when (compressionConfig?.compressor) { + is GZIPProcessor -> GZIPInputStream(objectData) + is NoopProcessor, + null -> objectData + else -> error("Unsupported compressor") + } + .bufferedReader() + .lineSequence() + .map { line -> + line + .deserializeToNode() + .toAirbyteValue(stream.schemaWithMeta) + .toOutputRecord() + } + .toList() + } + } + .toList() + .flatten() + } + } + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-s3/build.gradle b/airbyte-cdk/bulk/toolkits/load-s3/build.gradle index eb4e0b973098..00336f267d49 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/build.gradle +++ b/airbyte-cdk/bulk/toolkits/load-s3/build.gradle @@ -4,5 +4,6 @@ dependencies { api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-aws') api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-object-storage') + testFixturesApi(testFixtures(project(":airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-object-storage"))) implementation("aws.sdk.kotlin:s3:1.0.0") } diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/command/s3/S3PathSpecification.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/command/s3/S3PathSpecification.kt index 743cfe46d2b0..b7a76991a3d3 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/command/s3/S3PathSpecification.kt +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/command/s3/S3PathSpecification.kt @@ -10,6 +10,15 @@ import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfiguration +/** + * Mix-in to provide S3 path configuration fields as properties. + * + * NOTE: For legacy reasons, this is unnecessarily s3-specific. Future cloud storage solutions + * should create a single generic version of this in the `object-storage` toolkit and use that. + * + * See [io.airbyte.cdk.load.command.DestinationConfiguration] for more details on how to use this + * interface. + */ interface S3PathSpecification { @get:JsonSchemaTitle("S3 Path Format") @get:JsonPropertyDescription( diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt index 41390bedd5fa..bd5235c0eba5 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt @@ -5,27 +5,25 @@ package io.airbyte.cdk.load.file.s3 import aws.sdk.kotlin.runtime.auth.credentials.StaticCredentialsProvider -import aws.sdk.kotlin.services.s3.model.CompleteMultipartUploadRequest -import aws.sdk.kotlin.services.s3.model.CompletedMultipartUpload -import aws.sdk.kotlin.services.s3.model.CompletedPart import aws.sdk.kotlin.services.s3.model.CopyObjectRequest import aws.sdk.kotlin.services.s3.model.CreateMultipartUploadRequest -import aws.sdk.kotlin.services.s3.model.CreateMultipartUploadResponse import aws.sdk.kotlin.services.s3.model.DeleteObjectRequest import aws.sdk.kotlin.services.s3.model.GetObjectRequest import aws.sdk.kotlin.services.s3.model.ListObjectsRequest import aws.sdk.kotlin.services.s3.model.PutObjectRequest -import aws.sdk.kotlin.services.s3.model.UploadPartRequest import aws.smithy.kotlin.runtime.content.ByteStream import aws.smithy.kotlin.runtime.content.toInputStream import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.command.aws.AWSAccessKeyConfigurationProvider +import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfiguration +import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfigurationProvider import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfiguration import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfigurationProvider import io.airbyte.cdk.load.command.s3.S3BucketConfiguration import io.airbyte.cdk.load.command.s3.S3BucketConfigurationProvider +import io.airbyte.cdk.load.file.NoopProcessor +import io.airbyte.cdk.load.file.StreamProcessor import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient -import io.airbyte.cdk.load.file.object_storage.ObjectStorageStreamingUploadWriter import io.airbyte.cdk.load.file.object_storage.RemoteObject import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Factory @@ -33,6 +31,7 @@ import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton import java.io.ByteArrayOutputStream import java.io.InputStream +import java.io.OutputStream import kotlinx.coroutines.flow.flow data class S3Object(override val key: String, override val storageConfig: S3BucketConfiguration) : @@ -46,7 +45,8 @@ class S3Client( private val client: aws.sdk.kotlin.services.s3.S3Client, val bucketConfig: S3BucketConfiguration, private val uploadConfig: ObjectStorageUploadConfiguration?, -) : ObjectStorageClient { + private val compressionConfig: ObjectStorageCompressionConfiguration<*>? = null, +) : ObjectStorageClient.Writer> { private val log = KotlinLogging.logger {} override suspend fun list(prefix: String) = flow { @@ -114,75 +114,35 @@ class S3Client( override suspend fun streamingUpload( key: String, - collector: suspend S3MultipartUpload.Writer.() -> Unit + block: suspend (S3MultipartUpload<*>.Writer) -> Unit + ): S3Object { + return streamingUpload(key, compressionConfig?.compressor ?: NoopProcessor, block) + } + + override suspend fun streamingUpload( + key: String, + streamProcessor: StreamProcessor, + block: suspend (S3MultipartUpload<*>.Writer) -> Unit ): S3Object { val request = CreateMultipartUploadRequest { this.bucket = bucketConfig.s3BucketName this.key = key } val response = client.createMultipartUpload(request) - S3MultipartUpload(response).upload(collector) - return S3Object(key, bucketConfig) - } - - inner class S3MultipartUpload(private val response: CreateMultipartUploadResponse) { - private val uploadedParts = mutableListOf() - private var inputBuffer = ByteArrayOutputStream() - private val partSize = - uploadConfig?.streamingUploadPartSize - ?: throw IllegalStateException("Streaming upload part size is not configured") - - suspend fun upload(collector: suspend S3MultipartUpload.Writer.() -> Unit) { - log.info { - "Starting multipart upload to ${response.bucket}/${response.key} (${response.uploadId}" - } - collector.invoke(Writer()) - complete() - } - - inner class Writer : ObjectStorageStreamingUploadWriter { - override suspend fun write(bytes: ByteArray) { - inputBuffer.write(bytes) - if (inputBuffer.size() >= partSize) { - uploadPart() - } - } - } - - private suspend fun uploadPart() { - val partNumber = uploadedParts.size + 1 - val request = UploadPartRequest { - uploadId = response.uploadId - bucket = response.bucket - key = response.key - body = ByteStream.fromBytes(inputBuffer.toByteArray()) - this.partNumber = partNumber - } - val uploadResponse = client.uploadPart(request) - uploadedParts.add( - CompletedPart { - this.partNumber = partNumber - this.eTag = uploadResponse.eTag - } + val upload = + S3MultipartUpload( + client, + response, + ByteArrayOutputStream(), + streamProcessor, + uploadConfig ) - inputBuffer.reset() - } - - private suspend fun complete() { - if (inputBuffer.size() > 0) { - uploadPart() - } - log.info { - "Completing multipart upload to ${response.bucket}/${response.key} (${response.uploadId}" - } - val request = CompleteMultipartUploadRequest { - uploadId = response.uploadId - bucket = response.bucket - key = response.key - this.multipartUpload = CompletedMultipartUpload { parts = uploadedParts } - } - client.completeMultipartUpload(request) + log.info { + "Starting multipart upload to ${response.bucket}/${response.key} (${response.uploadId}" } + block(upload.Writer()) + upload.complete() + return S3Object(key, bucketConfig) } } @@ -191,6 +151,7 @@ class S3ClientFactory( private val keyConfig: AWSAccessKeyConfigurationProvider, private val bucketConfig: S3BucketConfigurationProvider, private val uploadConifg: ObjectStorageUploadConfigurationProvider? = null, + private val compressionConfig: ObjectStorageCompressionConfigurationProvider<*>? = null, ) { companion object { fun make(config: T) where @@ -217,7 +178,8 @@ class S3ClientFactory( return S3Client( client, bucketConfig.s3BucketConfiguration, - uploadConifg?.objectStorageUploadConfiguration + uploadConifg?.objectStorageUploadConfiguration, + compressionConfig?.objectStorageCompressionConfiguration, ) } } diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt new file mode 100644 index 000000000000..87f62c29ab90 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.file.s3 + +import aws.sdk.kotlin.services.s3.model.CompleteMultipartUploadRequest +import aws.sdk.kotlin.services.s3.model.CompletedMultipartUpload +import aws.sdk.kotlin.services.s3.model.CompletedPart +import aws.sdk.kotlin.services.s3.model.CreateMultipartUploadResponse +import aws.sdk.kotlin.services.s3.model.UploadPartRequest +import aws.smithy.kotlin.runtime.content.ByteStream +import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfiguration +import io.airbyte.cdk.load.file.StreamProcessor +import io.airbyte.cdk.load.file.object_storage.ObjectStorageStreamingUploadWriter +import io.github.oshai.kotlinlogging.KotlinLogging +import java.io.ByteArrayOutputStream +import java.io.OutputStream + +class S3MultipartUpload( + private val client: aws.sdk.kotlin.services.s3.S3Client, + private val response: CreateMultipartUploadResponse, + private val underlyingBuffer: ByteArrayOutputStream, + private val streamProcessor: StreamProcessor, + uploadConfig: ObjectStorageUploadConfiguration?, +) { + private val log = KotlinLogging.logger {} + + private val uploadedParts = mutableListOf() + private val partSize = + uploadConfig?.streamingUploadPartSize + ?: throw IllegalStateException("Streaming upload part size is not configured") + private val wrappingBuffer = streamProcessor.wrapper(underlyingBuffer) + + inner class Writer : ObjectStorageStreamingUploadWriter { + override suspend fun write(bytes: ByteArray) { + wrappingBuffer.write(bytes) + if (underlyingBuffer.size() >= partSize) { + uploadPart() + } + } + + override suspend fun write(string: String) { + write(string.toByteArray(Charsets.UTF_8)) + } + } + + private suspend fun uploadPart() { + streamProcessor.partFinisher.invoke(wrappingBuffer) + val partNumber = uploadedParts.size + 1 + val request = UploadPartRequest { + uploadId = response.uploadId + bucket = response.bucket + key = response.key + body = ByteStream.fromBytes(underlyingBuffer.toByteArray()) + this.partNumber = partNumber + } + val uploadResponse = client.uploadPart(request) + uploadedParts.add( + CompletedPart { + this.partNumber = partNumber + this.eTag = uploadResponse.eTag + } + ) + underlyingBuffer.reset() + } + + suspend fun complete() { + if (underlyingBuffer.size() > 0) { + uploadPart() + } + log.info { + "Completing multipart upload to ${response.bucket}/${response.key} (${response.uploadId}" + } + val request = CompleteMultipartUploadRequest { + uploadId = response.uploadId + bucket = response.bucket + key = response.key + this.multipartUpload = CompletedMultipartUpload { parts = uploadedParts } + } + client.completeMultipartUpload(request) + } +} diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index 744f472028ae..62211564e565 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 - dockerImageTag: 0.1.4 + dockerImageTag: 0.1.5 dockerRepository: airbyte/destination-s3-v2 githubIssueLabel: destination-s3-v2 icon: s3.svg @@ -31,4 +31,9 @@ data: secretStore: type: GSM alias: airbyte-connector-testing-secret-store + - name: SECRET_DESTINATION-S3-V2-JSONL-GZIP + fileName: s3_dest_v2_jsonl_gzip_config.json + secretStore: + type: GSM + alias: airbyte-connector-testing-secret-store metadataSpecVersion: "1.0" diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt index 773184d10216..4bb161d6d5a9 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt @@ -13,14 +13,16 @@ import io.airbyte.cdk.load.file.s3.S3Object import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.exceptions.ConfigurationException import jakarta.inject.Singleton +import java.io.OutputStream import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking @Singleton -class S3V2Checker(private val timeProvider: TimeProvider) : DestinationChecker { +class S3V2Checker(private val timeProvider: TimeProvider) : + DestinationChecker> { private val log = KotlinLogging.logger {} - override fun check(config: S3V2Configuration) { + override fun check(config: S3V2Configuration) { runBlocking { if (config.objectStorageFormatConfiguration !is JsonFormatConfiguration) { throw ConfigurationException("Currently only JSON format is supported") @@ -28,11 +30,12 @@ class S3V2Checker(private val timeProvider: TimeProvider) : DestinationChecker( // Client-facing configuration override val awsAccessKeyConfiguration: AWSAccessKeyConfiguration, override val s3BucketConfiguration: S3BucketConfiguration, override val objectStoragePathConfiguration: ObjectStoragePathConfiguration, override val objectStorageFormatConfiguration: ObjectStorageFormatConfiguration, + override val objectStorageCompressionConfiguration: ObjectStorageCompressionConfiguration, // Internal configuration override val objectStorageUploadConfiguration: ObjectStorageUploadConfiguration = ObjectStorageUploadConfiguration(5L * 1024 * 1024), - override val recordBatchSizeBytes: Long = 200L * 1024 * 1024 + override val recordBatchSizeBytes: Long = 200L * 1024 * 1024, ) : DestinationConfiguration(), AWSAccessKeyConfigurationProvider, S3BucketConfigurationProvider, ObjectStoragePathConfigurationProvider, ObjectStorageFormatConfigurationProvider, - ObjectStorageUploadConfigurationProvider + ObjectStorageUploadConfigurationProvider, + ObjectStorageCompressionConfigurationProvider @Singleton class S3V2ConfigurationFactory : - DestinationConfigurationFactory { - override fun makeWithoutExceptionHandling(pojo: S3V2Specification): S3V2Configuration { + DestinationConfigurationFactory> { + override fun makeWithoutExceptionHandling(pojo: S3V2Specification): S3V2Configuration<*> { return S3V2Configuration( awsAccessKeyConfiguration = pojo.toAWSAccessKeyConfiguration(), s3BucketConfiguration = pojo.toS3BucketConfiguration(), objectStoragePathConfiguration = pojo.toObjectStoragePathConfiguration(), - objectStorageFormatConfiguration = pojo.toObjectStorageFormatConfiguration() + objectStorageFormatConfiguration = pojo.toObjectStorageFormatConfiguration(), + objectStorageCompressionConfiguration = pojo.toCompressionConfiguration() ) } } +@Suppress("UNCHECKED_CAST") @Factory -class S3V2ConfigurationProvider(private val config: DestinationConfiguration) { +class S3V2ConfigurationProvider(private val config: DestinationConfiguration) { @Singleton - fun get(): S3V2Configuration { - return config as S3V2Configuration + fun get(): S3V2Configuration { + return config as S3V2Configuration } } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt index 2c8df1b0af31..aa8da9da1a98 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicLong class S3V2Writer( private val s3Client: S3Client, private val pathFactory: ObjectStoragePathFactory, - private val recordDecorator: DestinationRecordToAirbyteValueWithMeta + private val recordDecorator: DestinationRecordToAirbyteValueWithMeta, ) : DestinationWriter { sealed interface S3V2Batch : Batch data class StagedObject( @@ -51,11 +51,11 @@ class S3V2Writer( val partNumber = partNumber.getAndIncrement() val key = pathFactory.getPathToFile(stream, partNumber, isStaging = true).toString() val s3Object = - s3Client.streamingUpload(key) { + s3Client.streamingUpload(key) { writer -> records.forEach { val serialized = recordDecorator.decorate(it).toJson().serializeToString() - write(serialized) - write("\n") + writer.write(serialized) + writer.write("\n") } } return StagedObject(s3Object = s3Object, partNumber = partNumber) diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt index 3bac6fb726f9..c322111f70ae 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt @@ -15,8 +15,12 @@ class S3V2CheckTest : successConfigFilenames = listOf( CheckTestConfig( - S3V2TestUtils.MINIMAL_CONFIG_PATH, + S3V2TestUtils.JSON_UNCOMPRESSED_CONFIG_PATH, setOf(FeatureFlag.AIRBYTE_CLOUD_DEPLOYMENT) + ), + CheckTestConfig( + S3V2TestUtils.JSON_GZIP_CONFIG_PATH, + setOf(FeatureFlag.AIRBYTE_CLOUD_DEPLOYMENT), ) ), failConfigFilenamesAndFailureReasons = emptyMap() diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2DataDumper.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2DataDumper.kt new file mode 100644 index 000000000000..77a9809a882e --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2DataDumper.kt @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3_v2 + +import io.airbyte.cdk.command.ConfigurationSpecification +import io.airbyte.cdk.load.ObjectStorageDataDumper +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory +import io.airbyte.cdk.load.file.s3.S3ClientFactory +import io.airbyte.cdk.load.test.util.DestinationDataDumper +import io.airbyte.cdk.load.test.util.OutputRecord + +object S3V2DataDumper : DestinationDataDumper { + override fun dumpRecords( + spec: ConfigurationSpecification, + stream: DestinationStream + ): List { + val config = + S3V2ConfigurationFactory().makeWithoutExceptionHandling(spec as S3V2Specification) + val s3Client = S3ClientFactory.make(config) + val pathFactory = ObjectStoragePathFactory.from(config) + return ObjectStorageDataDumper( + stream, + s3Client, + pathFactory, + config.objectStorageCompressionConfiguration + ) + .dump() + } +} diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt index 2d72b272c73f..98bd860a6b53 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt @@ -9,10 +9,11 @@ import java.nio.file.Files import java.nio.file.Path object S3V2TestUtils { - const val MINIMAL_CONFIG_PATH = "secrets/s3_dest_v2_minimal_required_config.json" - val minimalConfig: S3V2Specification = + const val JSON_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_minimal_required_config.json" + const val JSON_GZIP_CONFIG_PATH = "secrets/s3_dest_v2_jsonl_gzip_config.json" + fun getConfig(configPath: String): S3V2Specification = ValidatedJsonUtils.parseOne( S3V2Specification::class.java, - Files.readString(Path.of(MINIMAL_CONFIG_PATH)), + Files.readString(Path.of(configPath)), ) } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index 666d5a12782a..cbf7e23747e8 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -4,30 +4,14 @@ package io.airbyte.integrations.destination.s3_v2 -import io.airbyte.cdk.command.ConfigurationSpecification -import io.airbyte.cdk.load.command.DestinationStream -import io.airbyte.cdk.load.data.toAirbyteValue -import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory -import io.airbyte.cdk.load.file.s3.S3ClientFactory -import io.airbyte.cdk.load.file.s3.S3Object -import io.airbyte.cdk.load.test.util.DestinationDataDumper import io.airbyte.cdk.load.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper -import io.airbyte.cdk.load.test.util.OutputRecord -import io.airbyte.cdk.load.test.util.toOutputRecord -import io.airbyte.cdk.load.util.deserializeToNode import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest -import java.io.InputStream -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext import org.junit.jupiter.api.Test -class S3V2WriteTest : +class S3V2WriteTestJsonUncompressed : BasicFunctionalityIntegrationTest( - S3V2TestUtils.minimalConfig, + S3V2TestUtils.getConfig(S3V2TestUtils.JSON_UNCOMPRESSED_CONFIG_PATH), S3V2DataDumper, NoopDestinationCleaner, NoopExpectedRecordMapper, @@ -38,40 +22,15 @@ class S3V2WriteTest : } } -object S3V2DataDumper : DestinationDataDumper { - override fun dumpRecords( - spec: ConfigurationSpecification, - stream: DestinationStream - ): List { - val config = - S3V2ConfigurationFactory().makeWithoutExceptionHandling(spec as S3V2Specification) - val s3Client = S3ClientFactory.make(config) - // Note: because we cannot mock wall time in docker, this - // path code cannot contain time-based macros. - // TODO: add pattern matching to the path factory. - val pathFactory = ObjectStoragePathFactory.from(config) - val prefix = pathFactory.getFinalDirectory(stream).toString() - return runBlocking { - withContext(Dispatchers.IO) { - s3Client - .list(prefix) - .map { listedObject: S3Object -> - s3Client.get(listedObject.key) { objectData: InputStream -> - objectData - .bufferedReader() - .lineSequence() - .map { line -> - line - .deserializeToNode() - .toAirbyteValue(stream.schemaWithMeta) - .toOutputRecord() - } - .toList() - } - } - .toList() - .flatten() - } - } +class S3V2WriteTestJsonGzip : + BasicFunctionalityIntegrationTest( + S3V2TestUtils.getConfig(S3V2TestUtils.JSON_GZIP_CONFIG_PATH), + S3V2DataDumper, + NoopDestinationCleaner, + NoopExpectedRecordMapper, + ) { + @Test + override fun testBasicWrite() { + super.testBasicWrite() } } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json index 2b93394a99a8..f713da3d7210 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json @@ -6,6 +6,37 @@ "type" : "object", "additionalProperties" : true, "properties" : { + "access_key_id" : { + "type" : "string", + "description" : "The access key ID to access the S3 bucket. Airbyte requires Read and Write permissions to the given bucket. Read more here.", + "title" : "S3 Key ID", + "examples" : [ "A012345678910EXAMPLE" ] + }, + "secret_access_key" : { + "type" : "string", + "description" : "The corresponding secret to the access key ID. Read more here", + "title" : "S3 Access Key", + "examples" : [ "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY" ] + }, + "s3_bucket_name" : { + "type" : "string", + "description" : "The name of the S3 bucket. Read more here.", + "title" : "S3 Bucket Name", + "examples" : [ "airbyte_sync" ] + }, + "s3_bucket_path" : { + "type" : "string", + "description" : "Directory under the S3 bucket where data will be written. Read more here", + "title" : "S3 Bucket Path", + "examples" : [ "data_sync/test" ] + }, + "s3_bucket_region" : { + "type" : "string", + "enum" : [ "af-south-1", "ap-east-1", "ap-northeast-1", "ap-northeast-2", "ap-northeast-3", "ap-south-1", "ap-south-2", "ap-southeast-1", "ap-southeast-2", "ap-southeast-3", "ap-southeast-4", "ca-central-1", "ca-west-1", "cn-north-1", "cn-northwest-1", "eu-central-1", "eu-central-2", "eu-north-1", "eu-south-1", "eu-south-2", "eu-west-1", "eu-west-2", "eu-west-3", "il-central-1", "me-central-1", "me-south-1", "sa-east-1", "us-east-1", "us-east-2", "us-gov-east-1", "us-gov-west-1", "us-west-1", "us-west-2" ], + "description" : "The region of the S3 bucket. See here for all region codes.", + "title" : "S3 Bucket Region", + "examples" : [ "us-east-1" ] + }, "format" : { "oneOf" : [ { "title" : "JSON Lines: Newline-delimited JSON", @@ -16,9 +47,39 @@ "type" : "string", "enum" : [ "JSONL" ], "default" : "JSONL" + }, + "compression" : { + "oneOf" : [ { + "title" : "No Compression", + "type" : "object", + "additionalProperties" : true, + "properties" : { + "compression_type" : { + "type" : "string", + "enum" : [ "No Compression" ], + "default" : "No Compression" + } + }, + "required" : [ "compression_type" ] + }, { + "title" : "GZIP", + "type" : "object", + "additionalProperties" : true, + "properties" : { + "compression_type" : { + "type" : "string", + "enum" : [ "GZIP" ], + "default" : "GZIP" + } + }, + "required" : [ "compression_type" ] + } ], + "description" : "Whether the output files should be compressed. If compression is selected, the output filename will have an extra extension (GZIP: \".jsonl.gz\").", + "title" : "Compression", + "type" : "object" } }, - "required" : [ "format_type" ] + "required" : [ "format_type", "compression" ] }, { "title" : "CSV: Comma-Separated Values", "type" : "object", @@ -28,9 +89,39 @@ "type" : "string", "enum" : [ "CSV" ], "default" : "CSV" + }, + "compression" : { + "oneOf" : [ { + "title" : "No Compression", + "type" : "object", + "additionalProperties" : true, + "properties" : { + "compression_type" : { + "type" : "string", + "enum" : [ "No Compression" ], + "default" : "No Compression" + } + }, + "required" : [ "compression_type" ] + }, { + "title" : "GZIP", + "type" : "object", + "additionalProperties" : true, + "properties" : { + "compression_type" : { + "type" : "string", + "enum" : [ "GZIP" ], + "default" : "GZIP" + } + }, + "required" : [ "compression_type" ] + } ], + "description" : "Whether the output files should be compressed. If compression is selected, the output filename will have an extra extension (GZIP: \".jsonl.gz\").", + "title" : "Compression", + "type" : "object" } }, - "required" : [ "format_type" ] + "required" : [ "format_type", "compression" ] }, { "title" : "Avro: Apache Avro", "type" : "object", @@ -60,37 +151,6 @@ "title" : "Output Format", "type" : "object" }, - "access_key_id" : { - "type" : "string", - "description" : "The access key ID to access the S3 bucket. Airbyte requires Read and Write permissions to the given bucket. Read more here.", - "title" : "S3 Key ID", - "examples" : [ "A012345678910EXAMPLE" ] - }, - "secret_access_key" : { - "type" : "string", - "description" : "The corresponding secret to the access key ID. Read more here", - "title" : "S3 Access Key", - "examples" : [ "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY" ] - }, - "s3_bucket_name" : { - "type" : "string", - "description" : "The name of the S3 bucket. Read more here.", - "title" : "S3 Bucket Name", - "examples" : [ "airbyte_sync" ] - }, - "s3_bucket_path" : { - "type" : "string", - "description" : "Directory under the S3 bucket where data will be written. Read more here", - "title" : "S3 Bucket Path", - "examples" : [ "data_sync/test" ] - }, - "s3_bucket_region" : { - "type" : "string", - "enum" : [ "af-south-1", "ap-east-1", "ap-northeast-1", "ap-northeast-2", "ap-northeast-3", "ap-south-1", "ap-south-2", "ap-southeast-1", "ap-southeast-2", "ap-southeast-3", "ap-southeast-4", "ca-central-1", "ca-west-1", "cn-north-1", "cn-northwest-1", "eu-central-1", "eu-central-2", "eu-north-1", "eu-south-1", "eu-south-2", "eu-west-1", "eu-west-2", "eu-west-3", "il-central-1", "me-central-1", "me-south-1", "sa-east-1", "us-east-1", "us-east-2", "us-gov-east-1", "us-gov-west-1", "us-west-1", "us-west-2" ], - "description" : "The region of the S3 bucket. See here for all region codes.", - "title" : "S3 Bucket Region", - "examples" : [ "us-east-1" ] - }, "s3_endpoint" : { "type" : "string", "description" : "Your S3 endpoint url. Read more here", @@ -117,7 +177,7 @@ "examples" : [ "__staging/data_sync/test" ] } }, - "required" : [ "format", "access_key_id", "secret_access_key", "s3_bucket_name", "s3_bucket_path", "s3_bucket_region" ] + "required" : [ "access_key_id", "secret_access_key", "s3_bucket_name", "s3_bucket_path", "s3_bucket_region", "format" ] }, "supportsIncremental" : true, "supportsNormalization" : false, diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json index 2b93394a99a8..f713da3d7210 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json @@ -6,6 +6,37 @@ "type" : "object", "additionalProperties" : true, "properties" : { + "access_key_id" : { + "type" : "string", + "description" : "The access key ID to access the S3 bucket. Airbyte requires Read and Write permissions to the given bucket. Read more here.", + "title" : "S3 Key ID", + "examples" : [ "A012345678910EXAMPLE" ] + }, + "secret_access_key" : { + "type" : "string", + "description" : "The corresponding secret to the access key ID. Read more here", + "title" : "S3 Access Key", + "examples" : [ "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY" ] + }, + "s3_bucket_name" : { + "type" : "string", + "description" : "The name of the S3 bucket. Read more here.", + "title" : "S3 Bucket Name", + "examples" : [ "airbyte_sync" ] + }, + "s3_bucket_path" : { + "type" : "string", + "description" : "Directory under the S3 bucket where data will be written. Read more here", + "title" : "S3 Bucket Path", + "examples" : [ "data_sync/test" ] + }, + "s3_bucket_region" : { + "type" : "string", + "enum" : [ "af-south-1", "ap-east-1", "ap-northeast-1", "ap-northeast-2", "ap-northeast-3", "ap-south-1", "ap-south-2", "ap-southeast-1", "ap-southeast-2", "ap-southeast-3", "ap-southeast-4", "ca-central-1", "ca-west-1", "cn-north-1", "cn-northwest-1", "eu-central-1", "eu-central-2", "eu-north-1", "eu-south-1", "eu-south-2", "eu-west-1", "eu-west-2", "eu-west-3", "il-central-1", "me-central-1", "me-south-1", "sa-east-1", "us-east-1", "us-east-2", "us-gov-east-1", "us-gov-west-1", "us-west-1", "us-west-2" ], + "description" : "The region of the S3 bucket. See here for all region codes.", + "title" : "S3 Bucket Region", + "examples" : [ "us-east-1" ] + }, "format" : { "oneOf" : [ { "title" : "JSON Lines: Newline-delimited JSON", @@ -16,9 +47,39 @@ "type" : "string", "enum" : [ "JSONL" ], "default" : "JSONL" + }, + "compression" : { + "oneOf" : [ { + "title" : "No Compression", + "type" : "object", + "additionalProperties" : true, + "properties" : { + "compression_type" : { + "type" : "string", + "enum" : [ "No Compression" ], + "default" : "No Compression" + } + }, + "required" : [ "compression_type" ] + }, { + "title" : "GZIP", + "type" : "object", + "additionalProperties" : true, + "properties" : { + "compression_type" : { + "type" : "string", + "enum" : [ "GZIP" ], + "default" : "GZIP" + } + }, + "required" : [ "compression_type" ] + } ], + "description" : "Whether the output files should be compressed. If compression is selected, the output filename will have an extra extension (GZIP: \".jsonl.gz\").", + "title" : "Compression", + "type" : "object" } }, - "required" : [ "format_type" ] + "required" : [ "format_type", "compression" ] }, { "title" : "CSV: Comma-Separated Values", "type" : "object", @@ -28,9 +89,39 @@ "type" : "string", "enum" : [ "CSV" ], "default" : "CSV" + }, + "compression" : { + "oneOf" : [ { + "title" : "No Compression", + "type" : "object", + "additionalProperties" : true, + "properties" : { + "compression_type" : { + "type" : "string", + "enum" : [ "No Compression" ], + "default" : "No Compression" + } + }, + "required" : [ "compression_type" ] + }, { + "title" : "GZIP", + "type" : "object", + "additionalProperties" : true, + "properties" : { + "compression_type" : { + "type" : "string", + "enum" : [ "GZIP" ], + "default" : "GZIP" + } + }, + "required" : [ "compression_type" ] + } ], + "description" : "Whether the output files should be compressed. If compression is selected, the output filename will have an extra extension (GZIP: \".jsonl.gz\").", + "title" : "Compression", + "type" : "object" } }, - "required" : [ "format_type" ] + "required" : [ "format_type", "compression" ] }, { "title" : "Avro: Apache Avro", "type" : "object", @@ -60,37 +151,6 @@ "title" : "Output Format", "type" : "object" }, - "access_key_id" : { - "type" : "string", - "description" : "The access key ID to access the S3 bucket. Airbyte requires Read and Write permissions to the given bucket. Read more here.", - "title" : "S3 Key ID", - "examples" : [ "A012345678910EXAMPLE" ] - }, - "secret_access_key" : { - "type" : "string", - "description" : "The corresponding secret to the access key ID. Read more here", - "title" : "S3 Access Key", - "examples" : [ "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY" ] - }, - "s3_bucket_name" : { - "type" : "string", - "description" : "The name of the S3 bucket. Read more here.", - "title" : "S3 Bucket Name", - "examples" : [ "airbyte_sync" ] - }, - "s3_bucket_path" : { - "type" : "string", - "description" : "Directory under the S3 bucket where data will be written. Read more here", - "title" : "S3 Bucket Path", - "examples" : [ "data_sync/test" ] - }, - "s3_bucket_region" : { - "type" : "string", - "enum" : [ "af-south-1", "ap-east-1", "ap-northeast-1", "ap-northeast-2", "ap-northeast-3", "ap-south-1", "ap-south-2", "ap-southeast-1", "ap-southeast-2", "ap-southeast-3", "ap-southeast-4", "ca-central-1", "ca-west-1", "cn-north-1", "cn-northwest-1", "eu-central-1", "eu-central-2", "eu-north-1", "eu-south-1", "eu-south-2", "eu-west-1", "eu-west-2", "eu-west-3", "il-central-1", "me-central-1", "me-south-1", "sa-east-1", "us-east-1", "us-east-2", "us-gov-east-1", "us-gov-west-1", "us-west-1", "us-west-2" ], - "description" : "The region of the S3 bucket. See here for all region codes.", - "title" : "S3 Bucket Region", - "examples" : [ "us-east-1" ] - }, "s3_endpoint" : { "type" : "string", "description" : "Your S3 endpoint url. Read more here", @@ -117,7 +177,7 @@ "examples" : [ "__staging/data_sync/test" ] } }, - "required" : [ "format", "access_key_id", "secret_access_key", "s3_bucket_name", "s3_bucket_path", "s3_bucket_region" ] + "required" : [ "access_key_id", "secret_access_key", "s3_bucket_name", "s3_bucket_path", "s3_bucket_region", "format" ] }, "supportsIncremental" : true, "supportsNormalization" : false,