From 266f11c32569ad787b78d4e57f00b70e76047505 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Fri, 18 Oct 2024 14:50:25 -0700 Subject: [PATCH] switched to output stream --- .../airbyte/cdk/load/util/OutputStreamUtil.kt | 11 ++++ .../cdk/load/data/CsvRowToAirbyteValue.kt | 9 ++- .../object_storage/ObjectStorageClient.kt | 5 +- .../io/airbyte/cdk/load/file/s3/S3Client.kt | 16 ++--- .../cdk/load/file/s3/S3MultipartUpload.kt | 65 +++++++++++-------- .../destination-s3-v2/metadata.yaml | 4 +- .../src/main/kotlin/S3V2Checker.kt | 1 + .../src/main/kotlin/S3V2Writer.kt | 19 ++++-- 8 files changed, 79 insertions(+), 51 deletions(-) create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/util/OutputStreamUtil.kt diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/util/OutputStreamUtil.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/util/OutputStreamUtil.kt new file mode 100644 index 000000000000..f1728c2fff20 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/util/OutputStreamUtil.kt @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.util + +import java.io.OutputStream + +fun OutputStream.write(string: String) { + write(string.toByteArray(Charsets.UTF_8)) +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/CsvRowToAirbyteValue.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/CsvRowToAirbyteValue.kt index 07290159e949..58d85ee13b65 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/CsvRowToAirbyteValue.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/CsvRowToAirbyteValue.kt @@ -29,7 +29,14 @@ class CsvRowToAirbyteValue { private fun convertInner(value: String, field: AirbyteType): AirbyteValue { return when (field) { - is ArrayType -> ArrayValue(value.split(",").map { convertInner(it, field.items.type) }) + is ArrayType -> + value + .deserializeToNode() + .elements() + .asSequence() + .map { it.toAirbyteValue(field.items.type) } + .toList() + .let(::ArrayValue) is BooleanType -> BooleanValue(value.toBoolean()) is IntegerType -> IntegerValue(value.toLong()) is NumberType -> NumberValue(value.toBigDecimal()) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt index dfa4c84bb27b..de5af60f96f0 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt @@ -8,7 +8,6 @@ import io.airbyte.cdk.load.file.NoopProcessor import io.airbyte.cdk.load.file.StreamProcessor import java.io.InputStream import java.io.OutputStream -import java.io.Writer import kotlinx.coroutines.flow.Flow interface ObjectStorageClient> { @@ -17,11 +16,11 @@ interface ObjectStorageClient> { suspend fun get(key: String, block: (InputStream) -> U): U suspend fun put(key: String, bytes: ByteArray): T suspend fun delete(remoteObject: T) - suspend fun streamingUpload(key: String, block: suspend (Writer) -> Unit): T = + suspend fun streamingUpload(key: String, block: suspend (OutputStream) -> Unit): T = streamingUpload(key, NoopProcessor, block) suspend fun streamingUpload( key: String, streamProcessor: StreamProcessor, - block: suspend (Writer) -> Unit + block: suspend (OutputStream) -> Unit ): T } diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt index 119ee8ec758e..5f6101e03d03 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt @@ -32,7 +32,6 @@ import jakarta.inject.Singleton import java.io.ByteArrayOutputStream import java.io.InputStream import java.io.OutputStream -import java.io.Writer import kotlinx.coroutines.flow.flow data class S3Object(override val key: String, override val storageConfig: S3BucketConfiguration) : @@ -113,14 +112,17 @@ class S3Client( client.deleteObject(request) } - override suspend fun streamingUpload(key: String, block: suspend (Writer) -> Unit): S3Object { + override suspend fun streamingUpload( + key: String, + block: suspend (OutputStream) -> Unit + ): S3Object { return streamingUpload(key, compressionConfig?.compressor ?: NoopProcessor, block) } override suspend fun streamingUpload( key: String, streamProcessor: StreamProcessor, - block: suspend (Writer) -> Unit + block: suspend (OutputStream) -> Unit ): S3Object { val request = CreateMultipartUploadRequest { this.bucket = bucketConfig.s3BucketName @@ -135,13 +137,7 @@ class S3Client( streamProcessor, uploadConfig ) - log.info { - "Starting multipart upload to ${response.bucket}/${response.key} (${response.uploadId}" - } - val uploadJob = upload.start() - block(upload.UploadWriter()) - upload.complete() - uploadJob.join() + upload.run(block) return S3Object(key, bucketConfig) } } diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt index e020169929ae..4b5c7ad92941 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt @@ -12,14 +12,13 @@ import aws.sdk.kotlin.services.s3.model.UploadPartRequest import aws.smithy.kotlin.runtime.content.ByteStream import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfiguration import io.airbyte.cdk.load.file.StreamProcessor +import io.airbyte.cdk.load.util.setOnce import io.github.oshai.kotlinlogging.KotlinLogging import java.io.ByteArrayOutputStream import java.io.OutputStream -import java.io.Writer -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job +import java.util.concurrent.atomic.AtomicBoolean import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking @@ -37,35 +36,49 @@ class S3MultipartUpload( uploadConfig?.streamingUploadPartSize ?: throw IllegalStateException("Streaming upload part size is not configured") private val wrappingBuffer = streamProcessor.wrapper(underlyingBuffer) + private val workQueue = Channel Unit>(Channel.UNLIMITED) + private val closeOnce = AtomicBoolean(false) - private val work = Channel Unit>(Channel.UNLIMITED) - - suspend fun start(): Job = - CoroutineScope(Dispatchers.IO).launch { - for (unit in work) { - uploadPart() + suspend fun run(block: suspend (OutputStream) -> Unit) = coroutineScope { + log.info { + "Starting multipart upload to ${response.bucket}/${response.key} (${response.uploadId}" + } + launch { + for (item in workQueue) { + item() } - completeInner() + complete() } + UploadStream().use { block(it) } + } - inner class UploadWriter : Writer() { - override fun close() { - log.warn { "Close called on UploadWriter, ignoring." } + inner class UploadStream : OutputStream() { + override fun close() = runBlocking { + workQueue.send { + if (closeOnce.setOnce()) { + workQueue.close() + } + } } - override fun flush() { - throw NotImplementedError("flush() is not supported on S3MultipartUpload.UploadWriter") - } + override fun flush() = runBlocking { workQueue.send { wrappingBuffer.flush() } } - override fun write(str: String) { - wrappingBuffer.write(str.toByteArray(Charsets.UTF_8)) - if (underlyingBuffer.size() >= partSize) { - runBlocking { work.send { uploadPart() } } + override fun write(b: Int) = runBlocking { + workQueue.send { + wrappingBuffer.write(b) + if (underlyingBuffer.size() >= partSize) { + uploadPart() + } } } - override fun write(cbuf: CharArray, off: Int, len: Int) { - write(String(cbuf, off, len)) + override fun write(b: ByteArray) = runBlocking { + workQueue.send { + wrappingBuffer.write(b) + if (underlyingBuffer.size() >= partSize) { + uploadPart() + } + } } } @@ -89,11 +102,7 @@ class S3MultipartUpload( underlyingBuffer.reset() } - suspend fun complete() { - work.close() - } - - private suspend fun completeInner() { + private suspend fun complete() { if (underlyingBuffer.size() > 0) { uploadPart() } diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index 0e5eb2c2599a..f928f57e2c09 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -36,12 +36,12 @@ data: secretStore: type: GSM alias: airbyte-connector-testing-secret-store - - name: SECRET_DESTINATION-S3-V2-CSV-CONFIG + - name: SECRET_DESTINATION-S3-V2-CSV fileName: s3_dest_v2_csv_config.json secretStore: type: GSM alias: airbyte-connector-testing-secret-store - - name: SECRET_DESTINATION-S3-V2-CSV-GZIP-CONFIG + - name: SECRET_DESTINATION-S3-V2-CSV-GZIP fileName: s3_dest_v2_csv_gzip_config.json secretStore: type: GSM diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt index 9dad4be13749..311d9ae374a4 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt @@ -11,6 +11,7 @@ import io.airbyte.cdk.load.file.TimeProvider import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory import io.airbyte.cdk.load.file.s3.S3ClientFactory import io.airbyte.cdk.load.file.s3.S3Object +import io.airbyte.cdk.load.util.write import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.exceptions.ConfigurationException import jakarta.inject.Singleton diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt index 5dc2e8c98cc2..df62a84aec69 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt @@ -19,6 +19,7 @@ import io.airbyte.cdk.load.file.s3.S3Object import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.util.serializeToString +import io.airbyte.cdk.load.util.write import io.airbyte.cdk.load.write.DestinationWriter import io.airbyte.cdk.load.write.StreamLoader import jakarta.inject.Singleton @@ -57,22 +58,26 @@ class S3V2Writer( val partNumber = partNumber.getAndIncrement() val key = pathFactory.getPathToFile(stream, partNumber, isStaging = true).toString() val s3Object = - s3Client.streamingUpload(key) { writer -> + s3Client.streamingUpload(key) { outputStream -> when (formatConfig.objectStorageFormatConfiguration) { is JsonFormatConfiguration -> { records.forEach { val serialized = recordDecorator.decorate(it).toJson().serializeToString() - writer.write(serialized) - writer.write("\n") + outputStream.write(serialized) + outputStream.write("\n") } } is CSVFormatConfiguration -> { - stream.schemaWithMeta.toCsvPrinterWithHeader(writer).use { printer -> - records.forEach { - printer.printRecord(*recordDecorator.decorate(it).toCsvRecord()) + stream.schemaWithMeta + .toCsvPrinterWithHeader(outputStream.writer()) + .use { printer -> + records.forEach { + printer.printRecord( + *recordDecorator.decorate(it).toCsvRecord() + ) + } } - } } else -> throw IllegalStateException("Unsupported format") }