-
Notifications
You must be signed in to change notification settings - Fork 4.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Bulk Load CDK: CSV Support, S3V2Usage
- Loading branch information
1 parent
1cc7f2c
commit 6546e39
Showing
16 changed files
with
285 additions
and
56 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
25 changes: 25 additions & 0 deletions
25
...ts/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToCsvHeader.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
/* | ||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.cdk.load.data | ||
|
||
import java.io.Writer | ||
import org.apache.commons.csv.CSVFormat | ||
import org.apache.commons.csv.CSVPrinter | ||
|
||
class AirbyteTypeToCsvHeader { | ||
fun convert(schema: AirbyteType): Array<String> { | ||
if (schema !is ObjectType) { | ||
throw IllegalArgumentException("Only object types are supported") | ||
} | ||
return schema.properties.map { it.key }.toTypedArray() | ||
} | ||
} | ||
|
||
fun AirbyteType.toCsvHeader(): Array<String> { | ||
return AirbyteTypeToCsvHeader().convert(this) | ||
} | ||
|
||
fun AirbyteType.toCsvPrinterWithHeader(writer: Writer): CSVPrinter = | ||
CSVFormat.Builder.create().setHeader(*toCsvHeader()).build().print(writer) |
31 changes: 31 additions & 0 deletions
31
...kits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueToCsvRow.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.cdk.load.data | ||
|
||
import io.airbyte.cdk.load.util.serializeToString | ||
|
||
class AirbyteValueToCsvRow { | ||
fun convert(value: AirbyteValue): Array<String> { | ||
if (value !is ObjectValue) { | ||
throw IllegalArgumentException("Only object values are supported") | ||
} | ||
return value.values.map { convertInner(it.value) }.toTypedArray() | ||
} | ||
|
||
private fun convertInner(value: AirbyteValue): String { | ||
return when (value) { | ||
is ObjectValue -> value.toJson().serializeToString() | ||
is ArrayValue -> value.toJson().serializeToString() | ||
is StringValue -> value.value | ||
is IntegerValue -> value.value.toString() | ||
is NumberValue -> value.value.toString() | ||
else -> value.toString() | ||
} | ||
} | ||
} | ||
|
||
fun AirbyteValue.toCsvRecord(): Array<String> { | ||
return AirbyteValueToCsvRow().convert(this) | ||
} |
58 changes: 58 additions & 0 deletions
58
...kits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/CsvRowToAirbyteValue.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.cdk.load.data | ||
|
||
import io.airbyte.cdk.load.util.deserializeToNode | ||
import org.apache.commons.csv.CSVRecord | ||
|
||
class CsvRowToAirbyteValue { | ||
fun convert(row: CSVRecord, schema: AirbyteType): AirbyteValue { | ||
if (schema !is ObjectType) { | ||
throw IllegalArgumentException("Only object types are supported") | ||
} | ||
val asList = row.toList() | ||
if (asList.size != schema.properties.size) { | ||
throw IllegalArgumentException("Row size does not match schema size") | ||
} | ||
val properties = linkedMapOf<String, AirbyteValue>() | ||
schema.properties | ||
.toList() | ||
.zip(asList) | ||
.map { (property, value) -> | ||
property.first to convertInner(value, property.second.type) | ||
} | ||
.toMap(properties) | ||
return ObjectValue(properties) | ||
} | ||
|
||
private fun convertInner(value: String, field: AirbyteType): AirbyteValue { | ||
return when (field) { | ||
is ArrayType -> ArrayValue(value.split(",").map { convertInner(it, field.items.type) }) | ||
is BooleanType -> BooleanValue(value.toBoolean()) | ||
is IntegerType -> IntegerValue(value.toLong()) | ||
is NumberType -> NumberValue(value.toBigDecimal()) | ||
is ObjectType -> { | ||
val properties = linkedMapOf<String, AirbyteValue>() | ||
value | ||
.deserializeToNode() | ||
.fields() | ||
.asSequence() | ||
.map { entry -> | ||
entry.key to entry.value.toAirbyteValue(field.properties[entry.key]!!.type) | ||
} | ||
.toMap(properties) | ||
ObjectValue(properties) | ||
} | ||
is ObjectTypeWithoutSchema -> | ||
value.deserializeToNode().toAirbyteValue(ObjectTypeWithoutSchema) | ||
is StringType -> StringValue(value) | ||
else -> throw IllegalArgumentException("Unsupported field type: $field") | ||
} | ||
} | ||
} | ||
|
||
fun CSVRecord.toAirbyteValue(schema: AirbyteType): AirbyteValue { | ||
return CsvRowToAirbyteValue().convert(this, schema) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
26 changes: 26 additions & 0 deletions
26
...ct-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDestinationCleaner.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.cdk.load | ||
|
||
import io.airbyte.cdk.load.command.DestinationStream | ||
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient | ||
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory | ||
import io.airbyte.cdk.load.file.object_storage.RemoteObject | ||
import kotlinx.coroutines.Dispatchers | ||
import kotlinx.coroutines.runBlocking | ||
import kotlinx.coroutines.withContext | ||
|
||
class ObjectStorageDestinationCleaner<T : RemoteObject<*>> { | ||
fun cleanup( | ||
stream: DestinationStream, | ||
client: ObjectStorageClient<T>, | ||
pathFactory: ObjectStoragePathFactory, | ||
) { | ||
val prefix = pathFactory.getFinalDirectory(stream).toString() | ||
runBlocking { | ||
withContext(Dispatchers.IO) { client.list(prefix).collect { client.delete(it) } } | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.