Skip to content

Commit

Permalink
Bulk Load CDK: CSV Support, S3V2Usage (#47005)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Oct 19, 2024
1 parent d1d6e23 commit 4f534ef
Show file tree
Hide file tree
Showing 17 changed files with 355 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.util

import java.io.OutputStream

fun OutputStream.write(string: String) {
write(string.toByteArray(Charsets.UTF_8))
}
2 changes: 2 additions & 0 deletions airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@ dependencies {
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load')

api("org.apache.commons:commons-csv:1.10.0")

testFixturesImplementation testFixtures(project(":airbyte-cdk:bulk:core:bulk-cdk-core-load"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import java.io.Writer
import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVPrinter

class AirbyteTypeToCsvHeader {
fun convert(schema: AirbyteType): Array<String> {
if (schema !is ObjectType) {
throw IllegalArgumentException("Only object types are supported")
}
return schema.properties.map { it.key }.toTypedArray()
}
}

fun AirbyteType.toCsvHeader(): Array<String> {
return AirbyteTypeToCsvHeader().convert(this)
}

fun AirbyteType.toCsvPrinterWithHeader(writer: Writer): CSVPrinter =
CSVFormat.Builder.create().setHeader(*toCsvHeader()).build().print(writer)
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.util.serializeToString

class AirbyteValueToCsvRow {
fun convert(value: AirbyteValue): Array<String> {
if (value !is ObjectValue) {
throw IllegalArgumentException("Only object values are supported")
}
return value.values.map { convertInner(it.value) }.toTypedArray()
}

private fun convertInner(value: AirbyteValue): String {
return when (value) {
is ObjectValue -> value.toJson().serializeToString()
is ArrayValue -> value.toJson().serializeToString()
is StringValue -> value.value
is IntegerValue -> value.value.toString()
is NumberValue -> value.value.toString()
else -> value.toString()
}
}
}

fun AirbyteValue.toCsvRecord(): Array<String> {
return AirbyteValueToCsvRow().convert(this)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.util.deserializeToNode
import org.apache.commons.csv.CSVRecord

class CsvRowToAirbyteValue {
fun convert(row: CSVRecord, schema: AirbyteType): AirbyteValue {
if (schema !is ObjectType) {
throw IllegalArgumentException("Only object types are supported")
}
val asList = row.toList()
if (asList.size != schema.properties.size) {
throw IllegalArgumentException("Row size does not match schema size")
}
val properties = linkedMapOf<String, AirbyteValue>()
schema.properties
.toList()
.zip(asList)
.map { (property, value) ->
property.first to convertInner(value, property.second.type)
}
.toMap(properties)
return ObjectValue(properties)
}

private fun convertInner(value: String, field: AirbyteType): AirbyteValue {
return when (field) {
is ArrayType ->
value
.deserializeToNode()
.elements()
.asSequence()
.map { it.toAirbyteValue(field.items.type) }
.toList()
.let(::ArrayValue)
is BooleanType -> BooleanValue(value.toBoolean())
is IntegerType -> IntegerValue(value.toLong())
is NumberType -> NumberValue(value.toBigDecimal())
is ObjectType -> {
val properties = linkedMapOf<String, AirbyteValue>()
value
.deserializeToNode()
.fields()
.asSequence()
.map { entry ->
entry.key to entry.value.toAirbyteValue(field.properties[entry.key]!!.type)
}
.toMap(properties)
ObjectValue(properties)
}
is ObjectTypeWithoutSchema ->
value.deserializeToNode().toAirbyteValue(ObjectTypeWithoutSchema)
is StringType -> StringValue(value)
else -> throw IllegalArgumentException("Unsupported field type: $field")
}
}
}

fun CSVRecord.toAirbyteValue(schema: AirbyteType): AirbyteValue {
return CsvRowToAirbyteValue().convert(this, schema)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,17 @@ import java.io.InputStream
import java.io.OutputStream
import kotlinx.coroutines.flow.Flow

interface ObjectStorageClient<T : RemoteObject<*>, U : ObjectStorageStreamingUploadWriter> {
interface ObjectStorageClient<T : RemoteObject<*>> {
suspend fun list(prefix: String): Flow<T>
suspend fun move(remoteObject: T, toKey: String): T
suspend fun <U> get(key: String, block: (InputStream) -> U): U
suspend fun put(key: String, bytes: ByteArray): T
suspend fun delete(remoteObject: T)
suspend fun streamingUpload(key: String, block: suspend (U) -> Unit): T =
suspend fun streamingUpload(key: String, block: suspend (OutputStream) -> Unit): T =
streamingUpload(key, NoopProcessor, block)
suspend fun <V : OutputStream> streamingUpload(
key: String,
streamProcessor: StreamProcessor<V>,
block: suspend (U) -> Unit
block: suspend (OutputStream) -> Unit
): T
}

interface ObjectStorageStreamingUploadWriter {
suspend fun write(bytes: ByteArray)
suspend fun write(string: String) = write(string.toByteArray(Charsets.UTF_8))
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
package io.airbyte.cdk.load

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfiguration
import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfiguration
import io.airbyte.cdk.load.data.toAirbyteValue
import io.airbyte.cdk.load.file.GZIPProcessor
import io.airbyte.cdk.load.file.NoopProcessor
Expand All @@ -15,18 +18,22 @@ import io.airbyte.cdk.load.file.object_storage.RemoteObject
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.test.util.toOutputRecord
import io.airbyte.cdk.load.util.deserializeToNode
import java.io.BufferedReader
import java.io.InputStream
import java.util.zip.GZIPInputStream
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVParser

class ObjectStorageDataDumper(
private val stream: DestinationStream,
private val client: ObjectStorageClient<*, *>,
private val client: ObjectStorageClient<*>,
private val pathFactory: ObjectStoragePathFactory,
private val formatConfig: ObjectStorageFormatConfiguration,
private val compressionConfig: ObjectStorageCompressionConfiguration<*>? = null
) {
fun dump(): List<OutputRecord> {
Expand All @@ -37,26 +44,43 @@ class ObjectStorageDataDumper(
.list(prefix)
.map { listedObject: RemoteObject<*> ->
client.get(listedObject.key) { objectData: InputStream ->
when (compressionConfig?.compressor) {
val reader =
when (compressionConfig?.compressor) {
is GZIPProcessor -> GZIPInputStream(objectData)
is NoopProcessor,
null -> objectData
else -> error("Unsupported compressor")
}
.bufferedReader()
.lineSequence()
.map { line ->
line
.deserializeToNode()
.toAirbyteValue(stream.schemaWithMeta)
.toOutputRecord()
}
.toList()
}.bufferedReader()
readLines(reader)
}
}
.toList()
.flatten()
}
}
}

@Suppress("DEPRECATION")
private fun readLines(reader: BufferedReader): List<OutputRecord> =
when (formatConfig) {
is JsonFormatConfiguration -> {
reader
.lineSequence()
.map { line ->
line
.deserializeToNode()
.toAirbyteValue(stream.schemaWithMeta)
.toOutputRecord()
}
.toList()
}
is CSVFormatConfiguration -> {
CSVParser(reader, CSVFormat.DEFAULT.withHeader()).use {
it.records.map { record ->
record.toAirbyteValue(stream.schemaWithMeta).toOutputRecord()
}
}
}
else -> error("Unsupported format")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
import io.airbyte.cdk.load.file.object_storage.RemoteObject
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext

class ObjectStorageDestinationCleaner<T : RemoteObject<*>> {
fun cleanup(
stream: DestinationStream,
client: ObjectStorageClient<T>,
pathFactory: ObjectStoragePathFactory,
) {
val prefix = pathFactory.getFinalDirectory(stream).toString()
runBlocking {
withContext(Dispatchers.IO) { client.list(prefix).collect { client.delete(it) } }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class S3Client(
val bucketConfig: S3BucketConfiguration,
private val uploadConfig: ObjectStorageUploadConfiguration?,
private val compressionConfig: ObjectStorageCompressionConfiguration<*>? = null,
) : ObjectStorageClient<S3Object, S3MultipartUpload<*>.Writer> {
) : ObjectStorageClient<S3Object> {
private val log = KotlinLogging.logger {}

override suspend fun list(prefix: String) = flow {
Expand Down Expand Up @@ -114,15 +114,15 @@ class S3Client(

override suspend fun streamingUpload(
key: String,
block: suspend (S3MultipartUpload<*>.Writer) -> Unit
block: suspend (OutputStream) -> Unit
): S3Object {
return streamingUpload(key, compressionConfig?.compressor ?: NoopProcessor, block)
}

override suspend fun <U : OutputStream> streamingUpload(
key: String,
streamProcessor: StreamProcessor<U>,
block: suspend (S3MultipartUpload<*>.Writer) -> Unit
block: suspend (OutputStream) -> Unit
): S3Object {
val request = CreateMultipartUploadRequest {
this.bucket = bucketConfig.s3BucketName
Expand All @@ -137,11 +137,7 @@ class S3Client(
streamProcessor,
uploadConfig
)
log.info {
"Starting multipart upload to ${response.bucket}/${response.key} (${response.uploadId}"
}
block(upload.Writer())
upload.complete()
upload.runUsing(block)
return S3Object(key, bucketConfig)
}
}
Expand Down
Loading

0 comments on commit 4f534ef

Please sign in to comment.