Skip to content

Commit

Permalink
switched to output stream
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Oct 18, 2024
1 parent e58b65d commit 3f2b360
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.util

import java.io.OutputStream

fun OutputStream.write(string: String) {
write(string.toByteArray(Charsets.UTF_8))
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import io.airbyte.cdk.load.file.NoopProcessor
import io.airbyte.cdk.load.file.StreamProcessor
import java.io.InputStream
import java.io.OutputStream
import java.io.Writer
import kotlinx.coroutines.flow.Flow

interface ObjectStorageClient<T : RemoteObject<*>> {
Expand All @@ -17,11 +16,11 @@ interface ObjectStorageClient<T : RemoteObject<*>> {
suspend fun <U> get(key: String, block: (InputStream) -> U): U
suspend fun put(key: String, bytes: ByteArray): T
suspend fun delete(remoteObject: T)
suspend fun streamingUpload(key: String, block: suspend (Writer) -> Unit): T =
suspend fun streamingUpload(key: String, block: suspend (OutputStream) -> Unit): T =
streamingUpload(key, NoopProcessor, block)
suspend fun <V : OutputStream> streamingUpload(
key: String,
streamProcessor: StreamProcessor<V>,
block: suspend (Writer) -> Unit
block: suspend (OutputStream) -> Unit
): T
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import jakarta.inject.Singleton
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.io.OutputStream
import java.io.Writer
import kotlinx.coroutines.flow.flow

data class S3Object(override val key: String, override val storageConfig: S3BucketConfiguration) :
Expand Down Expand Up @@ -113,14 +112,17 @@ class S3Client(
client.deleteObject(request)
}

override suspend fun streamingUpload(key: String, block: suspend (Writer) -> Unit): S3Object {
override suspend fun streamingUpload(
key: String,
block: suspend (OutputStream) -> Unit
): S3Object {
return streamingUpload(key, compressionConfig?.compressor ?: NoopProcessor, block)
}

override suspend fun <U : OutputStream> streamingUpload(
key: String,
streamProcessor: StreamProcessor<U>,
block: suspend (Writer) -> Unit
block: suspend (OutputStream) -> Unit
): S3Object {
val request = CreateMultipartUploadRequest {
this.bucket = bucketConfig.s3BucketName
Expand All @@ -139,7 +141,7 @@ class S3Client(
"Starting multipart upload to ${response.bucket}/${response.key} (${response.uploadId}"
}
val uploadJob = upload.start()
block(upload.UploadWriter())
block(upload.UploadStream())
upload.complete()
uploadJob.join()
return S3Object(key, bucketConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import io.airbyte.cdk.load.file.StreamProcessor
import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.ByteArrayOutputStream
import java.io.OutputStream
import java.io.Writer
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
Expand Down Expand Up @@ -48,24 +47,27 @@ class S3MultipartUpload<T : OutputStream>(
completeInner()
}

inner class UploadWriter : Writer() {
inner class UploadStream : OutputStream() {
override fun close() {
log.warn { "Close called on UploadWriter, ignoring." }
}

override fun flush() {
throw NotImplementedError("flush() is not supported on S3MultipartUpload.UploadWriter")
wrappingBuffer.flush()
}

override fun write(str: String) {
wrappingBuffer.write(str.toByteArray(Charsets.UTF_8))
override fun write(b: Int) {
wrappingBuffer.write(b)
if (underlyingBuffer.size() >= partSize) {
runBlocking { work.send { uploadPart() } }
}
}

override fun write(cbuf: CharArray, off: Int, len: Int) {
write(String(cbuf, off, len))
override fun write(b: ByteArray) {
wrappingBuffer.write(b)
if (underlyingBuffer.size() >= partSize) {
runBlocking { work.send { uploadPart() } }
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ data:
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-S3-V2-CSV-CONFIG
- name: SECRET_DESTINATION-S3-V2-CSV
fileName: s3_dest_v2_csv_config.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-S3-V2-CSV-GZIP-CONFIG
- name: SECRET_DESTINATION-S3-V2-CSV-GZIP
fileName: s3_dest_v2_csv_gzip_config.json
secretStore:
type: GSM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.airbyte.cdk.load.file.TimeProvider
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
import io.airbyte.cdk.load.file.s3.S3ClientFactory
import io.airbyte.cdk.load.file.s3.S3Object
import io.airbyte.cdk.load.util.write
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.exceptions.ConfigurationException
import jakarta.inject.Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import io.airbyte.cdk.load.file.s3.S3Object
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.util.serializeToString
import io.airbyte.cdk.load.util.write
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
import jakarta.inject.Singleton
Expand Down Expand Up @@ -57,22 +58,26 @@ class S3V2Writer(
val partNumber = partNumber.getAndIncrement()
val key = pathFactory.getPathToFile(stream, partNumber, isStaging = true).toString()
val s3Object =
s3Client.streamingUpload(key) { writer ->
s3Client.streamingUpload(key) { outputStream ->
when (formatConfig.objectStorageFormatConfiguration) {
is JsonFormatConfiguration -> {
records.forEach {
val serialized =
recordDecorator.decorate(it).toJson().serializeToString()
writer.write(serialized)
writer.write("\n")
outputStream.write(serialized)
outputStream.write("\n")
}
}
is CSVFormatConfiguration -> {
stream.schemaWithMeta.toCsvPrinterWithHeader(writer).use { printer ->
records.forEach {
printer.printRecord(*recordDecorator.decorate(it).toCsvRecord())
stream.schemaWithMeta
.toCsvPrinterWithHeader(outputStream.writer())
.use { printer ->
records.forEach {
printer.printRecord(
*recordDecorator.decorate(it).toCsvRecord()
)
}
}
}
}
else -> throw IllegalStateException("Unsupported format")
}
Expand Down

0 comments on commit 3f2b360

Please sign in to comment.