diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle b/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle index 889fd7a5852f..e9a2f683d569 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle @@ -2,5 +2,7 @@ dependencies { implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base') implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load') + api("org.apache.commons:commons-csv:1.10.0") + testFixturesImplementation testFixtures(project(":airbyte-cdk:bulk:core:bulk-cdk-core-load")) } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToCsvHeader.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToCsvHeader.kt new file mode 100644 index 000000000000..6edb3a6af91c --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToCsvHeader.kt @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data + +import java.io.Writer +import org.apache.commons.csv.CSVFormat +import org.apache.commons.csv.CSVPrinter + +class AirbyteTypeToCsvHeader { + fun convert(schema: AirbyteType): Array { + if (schema !is ObjectType) { + throw IllegalArgumentException("Only object types are supported") + } + return schema.properties.map { it.key }.toTypedArray() + } +} + +fun AirbyteType.toCsvHeader(): Array { + return AirbyteTypeToCsvHeader().convert(this) +} + +fun AirbyteType.toCsvPrinterWithHeader(writer: Writer): CSVPrinter = + CSVFormat.Builder.create().setHeader(*toCsvHeader()).build().print(writer) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueToCsvRow.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueToCsvRow.kt new file mode 100644 index 000000000000..cdac400e4b3b --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueToCsvRow.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data + +import io.airbyte.cdk.load.util.serializeToString + +class AirbyteValueToCsvRow { + fun convert(value: AirbyteValue): Array { + if (value !is ObjectValue) { + throw IllegalArgumentException("Only object values are supported") + } + return value.values.map { convertInner(it.value) }.toTypedArray() + } + + private fun convertInner(value: AirbyteValue): String { + return when (value) { + is ObjectValue -> value.toJson().serializeToString() + is ArrayValue -> value.toJson().serializeToString() + is StringValue -> value.value + is IntegerValue -> value.value.toString() + is NumberValue -> value.value.toString() + else -> value.toString() + } + } +} + +fun AirbyteValue.toCsvRecord(): Array { + return AirbyteValueToCsvRow().convert(this) +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/CsvRowToAirbyteValue.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/CsvRowToAirbyteValue.kt new file mode 100644 index 000000000000..07290159e949 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/CsvRowToAirbyteValue.kt @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data + +import io.airbyte.cdk.load.util.deserializeToNode +import org.apache.commons.csv.CSVRecord + +class CsvRowToAirbyteValue { + fun convert(row: CSVRecord, schema: AirbyteType): AirbyteValue { + if (schema !is ObjectType) { + throw IllegalArgumentException("Only object types are supported") + } + val asList = row.toList() + if (asList.size != schema.properties.size) { + throw IllegalArgumentException("Row size does not match schema size") + } + val properties = linkedMapOf() + schema.properties + .toList() + .zip(asList) + .map { (property, value) -> + property.first to convertInner(value, property.second.type) + } + .toMap(properties) + return ObjectValue(properties) + } + + private fun convertInner(value: String, field: AirbyteType): AirbyteValue { + return when (field) { + is ArrayType -> ArrayValue(value.split(",").map { convertInner(it, field.items.type) }) + is BooleanType -> BooleanValue(value.toBoolean()) + is IntegerType -> IntegerValue(value.toLong()) + is NumberType -> NumberValue(value.toBigDecimal()) + is ObjectType -> { + val properties = linkedMapOf() + value + .deserializeToNode() + .fields() + .asSequence() + .map { entry -> + entry.key to entry.value.toAirbyteValue(field.properties[entry.key]!!.type) + } + .toMap(properties) + ObjectValue(properties) + } + is ObjectTypeWithoutSchema -> + value.deserializeToNode().toAirbyteValue(ObjectTypeWithoutSchema) + is StringType -> StringValue(value) + else -> throw IllegalArgumentException("Unsupported field type: $field") + } + } +} + +fun CSVRecord.toAirbyteValue(schema: AirbyteType): AirbyteValue { + return CsvRowToAirbyteValue().convert(this, schema) +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt index 1d2e7faa66de..dfa4c84bb27b 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt @@ -8,24 +8,20 @@ import io.airbyte.cdk.load.file.NoopProcessor import io.airbyte.cdk.load.file.StreamProcessor import java.io.InputStream import java.io.OutputStream +import java.io.Writer import kotlinx.coroutines.flow.Flow -interface ObjectStorageClient, U : ObjectStorageStreamingUploadWriter> { +interface ObjectStorageClient> { suspend fun list(prefix: String): Flow suspend fun move(remoteObject: T, toKey: String): T suspend fun get(key: String, block: (InputStream) -> U): U suspend fun put(key: String, bytes: ByteArray): T suspend fun delete(remoteObject: T) - suspend fun streamingUpload(key: String, block: suspend (U) -> Unit): T = + suspend fun streamingUpload(key: String, block: suspend (Writer) -> Unit): T = streamingUpload(key, NoopProcessor, block) suspend fun streamingUpload( key: String, streamProcessor: StreamProcessor, - block: suspend (U) -> Unit + block: suspend (Writer) -> Unit ): T } - -interface ObjectStorageStreamingUploadWriter { - suspend fun write(bytes: ByteArray) - suspend fun write(string: String) = write(string.toByteArray(Charsets.UTF_8)) -} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt index 82396476119f..6c3ded6d6a33 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt @@ -5,7 +5,10 @@ package io.airbyte.cdk.load import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration +import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfiguration +import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfiguration import io.airbyte.cdk.load.data.toAirbyteValue import io.airbyte.cdk.load.file.GZIPProcessor import io.airbyte.cdk.load.file.NoopProcessor @@ -15,6 +18,7 @@ import io.airbyte.cdk.load.file.object_storage.RemoteObject import io.airbyte.cdk.load.test.util.OutputRecord import io.airbyte.cdk.load.test.util.toOutputRecord import io.airbyte.cdk.load.util.deserializeToNode +import java.io.BufferedReader import java.io.InputStream import java.util.zip.GZIPInputStream import kotlinx.coroutines.Dispatchers @@ -22,11 +26,14 @@ import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext +import org.apache.commons.csv.CSVFormat +import org.apache.commons.csv.CSVParser class ObjectStorageDataDumper( private val stream: DestinationStream, - private val client: ObjectStorageClient<*, *>, + private val client: ObjectStorageClient<*>, private val pathFactory: ObjectStoragePathFactory, + private val formatConfig: ObjectStorageFormatConfiguration, private val compressionConfig: ObjectStorageCompressionConfiguration<*>? = null ) { fun dump(): List { @@ -37,21 +44,14 @@ class ObjectStorageDataDumper( .list(prefix) .map { listedObject: RemoteObject<*> -> client.get(listedObject.key) { objectData: InputStream -> - when (compressionConfig?.compressor) { + val reader = + when (compressionConfig?.compressor) { is GZIPProcessor -> GZIPInputStream(objectData) is NoopProcessor, null -> objectData else -> error("Unsupported compressor") - } - .bufferedReader() - .lineSequence() - .map { line -> - line - .deserializeToNode() - .toAirbyteValue(stream.schemaWithMeta) - .toOutputRecord() - } - .toList() + }.bufferedReader() + readLines(reader) } } .toList() @@ -59,4 +59,28 @@ class ObjectStorageDataDumper( } } } + + @Suppress("DEPRECATION") + private fun readLines(reader: BufferedReader): List = + when (formatConfig) { + is JsonFormatConfiguration -> { + reader + .lineSequence() + .map { line -> + line + .deserializeToNode() + .toAirbyteValue(stream.schemaWithMeta) + .toOutputRecord() + } + .toList() + } + is CSVFormatConfiguration -> { + CSVParser(reader, CSVFormat.DEFAULT.withHeader()).use { + it.records.map { record -> + record.toAirbyteValue(stream.schemaWithMeta).toOutputRecord() + } + } + } + else -> error("Unsupported format") + } } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDestinationCleaner.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDestinationCleaner.kt new file mode 100644 index 000000000000..276309474284 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDestinationCleaner.kt @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient +import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory +import io.airbyte.cdk.load.file.object_storage.RemoteObject +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext + +class ObjectStorageDestinationCleaner> { + fun cleanup( + stream: DestinationStream, + client: ObjectStorageClient, + pathFactory: ObjectStoragePathFactory, + ) { + val prefix = pathFactory.getFinalDirectory(stream).toString() + runBlocking { + withContext(Dispatchers.IO) { client.list(prefix).collect { client.delete(it) } } + } + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt index bd5235c0eba5..119ee8ec758e 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt @@ -32,6 +32,7 @@ import jakarta.inject.Singleton import java.io.ByteArrayOutputStream import java.io.InputStream import java.io.OutputStream +import java.io.Writer import kotlinx.coroutines.flow.flow data class S3Object(override val key: String, override val storageConfig: S3BucketConfiguration) : @@ -46,7 +47,7 @@ class S3Client( val bucketConfig: S3BucketConfiguration, private val uploadConfig: ObjectStorageUploadConfiguration?, private val compressionConfig: ObjectStorageCompressionConfiguration<*>? = null, -) : ObjectStorageClient.Writer> { +) : ObjectStorageClient { private val log = KotlinLogging.logger {} override suspend fun list(prefix: String) = flow { @@ -112,17 +113,14 @@ class S3Client( client.deleteObject(request) } - override suspend fun streamingUpload( - key: String, - block: suspend (S3MultipartUpload<*>.Writer) -> Unit - ): S3Object { + override suspend fun streamingUpload(key: String, block: suspend (Writer) -> Unit): S3Object { return streamingUpload(key, compressionConfig?.compressor ?: NoopProcessor, block) } override suspend fun streamingUpload( key: String, streamProcessor: StreamProcessor, - block: suspend (S3MultipartUpload<*>.Writer) -> Unit + block: suspend (Writer) -> Unit ): S3Object { val request = CreateMultipartUploadRequest { this.bucket = bucketConfig.s3BucketName @@ -140,8 +138,10 @@ class S3Client( log.info { "Starting multipart upload to ${response.bucket}/${response.key} (${response.uploadId}" } - block(upload.Writer()) + val uploadJob = upload.start() + block(upload.UploadWriter()) upload.complete() + uploadJob.join() return S3Object(key, bucketConfig) } } diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt index 87f62c29ab90..e020169929ae 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt @@ -12,10 +12,16 @@ import aws.sdk.kotlin.services.s3.model.UploadPartRequest import aws.smithy.kotlin.runtime.content.ByteStream import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfiguration import io.airbyte.cdk.load.file.StreamProcessor -import io.airbyte.cdk.load.file.object_storage.ObjectStorageStreamingUploadWriter import io.github.oshai.kotlinlogging.KotlinLogging import java.io.ByteArrayOutputStream import java.io.OutputStream +import java.io.Writer +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking class S3MultipartUpload( private val client: aws.sdk.kotlin.services.s3.S3Client, @@ -32,16 +38,34 @@ class S3MultipartUpload( ?: throw IllegalStateException("Streaming upload part size is not configured") private val wrappingBuffer = streamProcessor.wrapper(underlyingBuffer) - inner class Writer : ObjectStorageStreamingUploadWriter { - override suspend fun write(bytes: ByteArray) { - wrappingBuffer.write(bytes) - if (underlyingBuffer.size() >= partSize) { + private val work = Channel Unit>(Channel.UNLIMITED) + + suspend fun start(): Job = + CoroutineScope(Dispatchers.IO).launch { + for (unit in work) { uploadPart() } + completeInner() + } + + inner class UploadWriter : Writer() { + override fun close() { + log.warn { "Close called on UploadWriter, ignoring." } + } + + override fun flush() { + throw NotImplementedError("flush() is not supported on S3MultipartUpload.UploadWriter") + } + + override fun write(str: String) { + wrappingBuffer.write(str.toByteArray(Charsets.UTF_8)) + if (underlyingBuffer.size() >= partSize) { + runBlocking { work.send { uploadPart() } } + } } - override suspend fun write(string: String) { - write(string.toByteArray(Charsets.UTF_8)) + override fun write(cbuf: CharArray, off: Int, len: Int) { + write(String(cbuf, off, len)) } } @@ -66,6 +90,10 @@ class S3MultipartUpload( } suspend fun complete() { + work.close() + } + + private suspend fun completeInner() { if (underlyingBuffer.size() > 0) { uploadPart() } diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index 62211564e565..0e5eb2c2599a 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -36,4 +36,14 @@ data: secretStore: type: GSM alias: airbyte-connector-testing-secret-store + - name: SECRET_DESTINATION-S3-V2-CSV-CONFIG + fileName: s3_dest_v2_csv_config.json + secretStore: + type: GSM + alias: airbyte-connector-testing-secret-store + - name: SECRET_DESTINATION-S3-V2-CSV-GZIP-CONFIG + fileName: s3_dest_v2_csv_gzip_config.json + secretStore: + type: GSM + alias: airbyte-connector-testing-secret-store metadataSpecVersion: "1.0" diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt index 4bb161d6d5a9..9dad4be13749 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.s3_v2 import io.airbyte.cdk.load.check.DestinationChecker +import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration import io.airbyte.cdk.load.file.TimeProvider import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory @@ -24,8 +25,11 @@ class S3V2Checker(private val timeProvider: TimeProvider) : override fun check(config: S3V2Configuration) { runBlocking { - if (config.objectStorageFormatConfiguration !is JsonFormatConfiguration) { - throw ConfigurationException("Currently only JSON format is supported") + if ( + config.objectStorageFormatConfiguration !is JsonFormatConfiguration && + config.objectStorageFormatConfiguration !is CSVFormatConfiguration + ) { + throw ConfigurationException("Currently only JSON and CSV format is supported") } val s3Client = S3ClientFactory.make(config) val pathFactory = ObjectStoragePathFactory.from(config, timeProvider) diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt index aa8da9da1a98..5dc2e8c98cc2 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt @@ -6,7 +6,12 @@ package io.airbyte.integrations.destination.s3_v2 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration +import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration +import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider import io.airbyte.cdk.load.data.DestinationRecordToAirbyteValueWithMeta +import io.airbyte.cdk.load.data.toCsvPrinterWithHeader +import io.airbyte.cdk.load.data.toCsvRecord import io.airbyte.cdk.load.data.toJson import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory import io.airbyte.cdk.load.file.s3.S3Client @@ -24,6 +29,7 @@ class S3V2Writer( private val s3Client: S3Client, private val pathFactory: ObjectStoragePathFactory, private val recordDecorator: DestinationRecordToAirbyteValueWithMeta, + private val formatConfig: ObjectStorageFormatConfigurationProvider ) : DestinationWriter { sealed interface S3V2Batch : Batch data class StagedObject( @@ -52,10 +58,23 @@ class S3V2Writer( val key = pathFactory.getPathToFile(stream, partNumber, isStaging = true).toString() val s3Object = s3Client.streamingUpload(key) { writer -> - records.forEach { - val serialized = recordDecorator.decorate(it).toJson().serializeToString() - writer.write(serialized) - writer.write("\n") + when (formatConfig.objectStorageFormatConfiguration) { + is JsonFormatConfiguration -> { + records.forEach { + val serialized = + recordDecorator.decorate(it).toJson().serializeToString() + writer.write(serialized) + writer.write("\n") + } + } + is CSVFormatConfiguration -> { + stream.schemaWithMeta.toCsvPrinterWithHeader(writer).use { printer -> + records.forEach { + printer.printRecord(*recordDecorator.decorate(it).toCsvRecord()) + } + } + } + else -> throw IllegalStateException("Unsupported format") } } return StagedObject(s3Object = s3Object, partNumber = partNumber) diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt index c322111f70ae..aeb05ce9f245 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt @@ -21,7 +21,15 @@ class S3V2CheckTest : CheckTestConfig( S3V2TestUtils.JSON_GZIP_CONFIG_PATH, setOf(FeatureFlag.AIRBYTE_CLOUD_DEPLOYMENT), - ) + ), + CheckTestConfig( + S3V2TestUtils.CSV_UNCOMPRESSED_CONFIG_PATH, + setOf(FeatureFlag.AIRBYTE_CLOUD_DEPLOYMENT), + ), + CheckTestConfig( + S3V2TestUtils.CSV_GZIP_CONFIG_PATH, + setOf(FeatureFlag.AIRBYTE_CLOUD_DEPLOYMENT), + ), ), failConfigFilenamesAndFailureReasons = emptyMap() ) { diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2DataDumper.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2DataDumper.kt index 77a9809a882e..7d010dcc70bb 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2DataDumper.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2DataDumper.kt @@ -25,7 +25,8 @@ object S3V2DataDumper : DestinationDataDumper { stream, s3Client, pathFactory, - config.objectStorageCompressionConfiguration + config.objectStorageFormatConfiguration, + config.objectStorageCompressionConfiguration, ) .dump() } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt index 98bd860a6b53..0a19dadce18e 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt @@ -11,6 +11,8 @@ import java.nio.file.Path object S3V2TestUtils { const val JSON_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_minimal_required_config.json" const val JSON_GZIP_CONFIG_PATH = "secrets/s3_dest_v2_jsonl_gzip_config.json" + const val CSV_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_csv_config.json" + const val CSV_GZIP_CONFIG_PATH = "secrets/s3_dest_v2_csv_gzip_config.json" fun getConfig(configPath: String): S3V2Specification = ValidatedJsonUtils.parseOne( S3V2Specification::class.java, diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index cbf7e23747e8..8da9d9add8d9 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -9,9 +9,9 @@ import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest import org.junit.jupiter.api.Test -class S3V2WriteTestJsonUncompressed : +abstract class S3V2WriteTest(path: String) : BasicFunctionalityIntegrationTest( - S3V2TestUtils.getConfig(S3V2TestUtils.JSON_UNCOMPRESSED_CONFIG_PATH), + S3V2TestUtils.getConfig(path), S3V2DataDumper, NoopDestinationCleaner, NoopExpectedRecordMapper, @@ -22,15 +22,10 @@ class S3V2WriteTestJsonUncompressed : } } -class S3V2WriteTestJsonGzip : - BasicFunctionalityIntegrationTest( - S3V2TestUtils.getConfig(S3V2TestUtils.JSON_GZIP_CONFIG_PATH), - S3V2DataDumper, - NoopDestinationCleaner, - NoopExpectedRecordMapper, - ) { - @Test - override fun testBasicWrite() { - super.testBasicWrite() - } -} +class S3V2WriteTestJsonUncompressed : S3V2WriteTest(S3V2TestUtils.JSON_UNCOMPRESSED_CONFIG_PATH) + +class S3V2WriteTestJsonGzip : S3V2WriteTest(S3V2TestUtils.JSON_GZIP_CONFIG_PATH) + +class S3V2WriteTestCsvUncompressed : S3V2WriteTest(S3V2TestUtils.CSV_UNCOMPRESSED_CONFIG_PATH) + +class S3V2WriteTestCsvGzip : S3V2WriteTest(S3V2TestUtils.CSV_GZIP_CONFIG_PATH)