-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bulk Load CDK: CSV Support, S3V2Usage #47005
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
83c6de4
to
f82bdaa
Compare
5f740f3
to
ca54e41
Compare
f82bdaa
to
d611a14
Compare
ca54e41
to
eb1a86a
Compare
70c1199
to
d40830b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wasn't sure about the multipart upload stuff, everything else makes sense
|
||
private fun convertInner(value: String, field: AirbyteType): AirbyteValue { | ||
return when (field) { | ||
is ArrayType -> ArrayValue(value.split(",").map { convertInner(it, field.items.type) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't the serialized string [1,2,3]
? i.e shouldn't we also strip the square brackets (or maybe just parse it as json)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be is ArrayType -> value.deserializeToNode().elements().asSequence().map { it.toAirbyteValue(field.items.type) }.toList().let(::ArrayValue)
|
||
suspend fun start(): Job = | ||
CoroutineScope(Dispatchers.IO).launch { | ||
for (unit in work) { | ||
uploadPart() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this supposed to be unit()
? it doesn't seem like we ever actually invoke the function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd partially converted it to a Unit
channel, since all the work items were uploadPart
. Fixed.
override fun write(str: String) { | ||
wrappingBuffer.write(str.toByteArray(Charsets.UTF_8)) | ||
if (underlyingBuffer.size() >= partSize) { | ||
runBlocking { work.send { uploadPart() } } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure I follow here - don't we need to call uploadPart() directly to avoid a race condition? (i.e. someone calls write()
while uploadPart is still in-flight)
or should this be work.send { uploadPart(underlyingBuffer); underlyingBuffer = newBuffer() }
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, we need to sequence the work items. The most straightforward thing to do is just to push whatever would have been in the body of the function into the channel.
suspend fun run(block: suspend (OutputStream) -> Unit) = coroutineScope {
log.info {
"Starting multipart upload to ${response.bucket}/${response.key} (${response.uploadId}"
}
launch {
for (item in workQueue) {
item()
}
complete()
}
UploadStream().use {
block(it)
}
}
inner class UploadStream : OutputStream() {
override fun close() = runBlocking {
workQueue.send { workQueue.close() }
}
override fun flush() = runBlocking {
workQueue.send { wrappingBuffer.flush() }
}
override fun write(b: Int) = runBlocking {
workQueue.send {
wrappingBuffer.write(b)
if (underlyingBuffer.size() >= partSize) {
uploadPart()
}
}
}
override fun write(b: ByteArray) = runBlocking {
workQueue.send {
wrappingBuffer.write(b)
if (underlyingBuffer.size() >= partSize) {
uploadPart()
}
}
}
}
This also simplifies the caller to
val upload =
S3MultipartUpload(
client,
response,
ByteArrayOutputStream(),
streamProcessor,
uploadConfig
)
upload.run(block)
Also note I converted the writer into an output stream. This will be necessary for Avro.
eb1a86a
to
6050e74
Compare
d40830b
to
3f2b360
Compare
22081d5
to
266f11c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(copying from DM - let's add some comments about why we need the channel -> why we need to be an OutputStream, but otherwise lgtm)
266f11c
to
834513e
Compare
834513e
to
7cb9d9d
Compare
What
CSV. Uses the old printer with AirbyteValue/Schema directly.
Conversions aren't complete, and there are no tests. I will come back and shore all that up after I've proved out Avro and Parquet.
Bonuses
java.io.Writer
to work w/o resorting torunBlocking
, sostreamingUpload
now accepts a block that works onWriter
, allowing the apacheCSVPrinter
to wrap it; some callouts:start()
and await the returned job (wonky, but limited to the client so the implementor still gets a clean interface)Writer::close
is a noop (the contract is that the writer is closed when the block exits)flush
throws, since there really isn't a need for it, but maybe later I'll wire it touploadPart
if there's a need