Skip to content

Commit

Permalink
Merge branch 'master' into pnilan/p1-fix-backoff-retry-with-auth
Browse files Browse the repository at this point in the history
  • Loading branch information
pnilan committed Oct 21, 2024
2 parents cd3e214 + 8134828 commit e13fae1
Show file tree
Hide file tree
Showing 113 changed files with 14,877 additions and 3,526 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ import java.nio.file.Path
abstract class DestinationConfiguration : Configuration {
open val recordBatchSizeBytes: Long = 200L * 1024L * 1024L
open val tmpFileDirectory: Path = Path.of("airbyte-cdk-load")
open val firstStageTmpFilePrefix: String = "staged-raw-records"
open val firstStageTmpFileSuffix: String = ".jsonl"

/** Memory queue settings */
open val maxMessageQueueMemoryUsageRatio: Double = 0.2 // 0 => No limit, 1.0 => 100% of JVM heap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ data class DestinationStream(
* what actually exists, as many destinations have legacy data from before this schema was
* adopted.
*/
val schemaWithMeta: AirbyteType
val schemaWithMeta: ObjectType
get() =
ObjectType(
linkedMapOf(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.file

import io.airbyte.cdk.load.command.DestinationConfiguration
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.nio.file.Files
import java.nio.file.Path

interface SpillFileProvider {
fun createTempFile(): Path
}

@Singleton
@Secondary
class DefaultSpillFileProvider(val config: DestinationConfiguration) : SpillFileProvider {
override fun createTempFile(): Path {
val directory = config.tmpFileDirectory
Files.createDirectories(directory)
return Files.createTempFile(directory, "staged-raw-records", "jsonl")
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ T : ScopedTask {
log.warn { "Sync task $innerTask was cancelled." }
throw e
} catch (e: Exception) {
log.error { "Caught exception in sync task $innerTask: $e" }
handleSyncFailure(e)
}
}
Expand Down Expand Up @@ -127,6 +128,7 @@ T : ScopedTask {
log.warn { "Stream task $innerTask was cancelled." }
throw e
} catch (e: Exception) {
log.error { "Caught exception in sync task $innerTask: $e" }
handleStreamFailure(stream, e)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import io.airbyte.cdk.load.task.DestinationTaskLauncher
import io.airbyte.cdk.load.task.ImplementorScope
import io.airbyte.cdk.load.task.StreamLevel
import io.airbyte.cdk.load.task.internal.SpilledRawMessagesLocalFile
import io.airbyte.cdk.load.util.lineSequence
import io.airbyte.cdk.load.write.StreamLoader
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import kotlin.io.path.inputStream

interface ProcessRecordsTask : StreamLevel, ImplementorScope

Expand Down Expand Up @@ -48,10 +50,10 @@ class DefaultProcessRecordsTask(
log.info { "Processing records from $file" }
val batch =
try {
file.localFile.toFileReader().use { reader ->
file.localFile.inputStream().use { inputStream ->
val records =
reader
.lines()
inputStream
.lineSequence()
.map {
when (val message = deserializer.deserialize(it)) {
is DestinationStreamAffinedMessage -> message
Expand All @@ -71,7 +73,7 @@ class DefaultProcessRecordsTask(
}
} finally {
log.info { "Processing completed, deleting $file" }
file.localFile.delete()
file.localFile.toFile().delete()
}

val wrapped = BatchEnvelope(batch, file.indexRange)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
package io.airbyte.cdk.load.task.internal

import com.google.common.collect.Range
import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.file.LocalFile
import io.airbyte.cdk.load.file.TempFileProvider
import io.airbyte.cdk.load.file.SpillFileProvider
import io.airbyte.cdk.load.message.DestinationRecordWrapped
import io.airbyte.cdk.load.message.MessageQueueSupplier
import io.airbyte.cdk.load.message.QueueReader
Expand All @@ -22,8 +20,11 @@ import io.airbyte.cdk.load.task.StreamLevel
import io.airbyte.cdk.load.util.takeUntilInclusive
import io.airbyte.cdk.load.util.use
import io.airbyte.cdk.load.util.withNextAdjacentValue
import io.airbyte.cdk.load.util.write
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton
import java.nio.file.Path
import kotlin.io.path.outputStream
import kotlinx.coroutines.flow.last
import kotlinx.coroutines.flow.runningFold

Expand All @@ -36,8 +37,7 @@ interface SpillToDiskTask : StreamLevel, InternalScope
* TODO: Allow for the record batch size to be supplied per-stream. (Needed?)
*/
class DefaultSpillToDiskTask(
private val config: DestinationConfiguration,
private val tmpFileProvider: TempFileProvider,
private val spillFileProvider: SpillFileProvider,
private val queue: QueueReader<Reserved<DestinationRecordWrapped>>,
private val flushStrategy: FlushStrategy,
override val stream: DestinationStream,
Expand All @@ -53,22 +53,17 @@ class DefaultSpillToDiskTask(
)

override suspend fun execute() {
val tmpFile =
tmpFileProvider.createTempFile(
config.tmpFileDirectory,
config.firstStageTmpFilePrefix,
config.firstStageTmpFileSuffix
)
val tmpFile = spillFileProvider.createTempFile()
val result =
tmpFile.toFileWriter().use { writer ->
tmpFile.outputStream().use { outputStream ->
queue
.consume()
.runningFold(ReadResult()) { (range, sizeBytes, _), reserved ->
reserved.use {
when (val wrapped = it.value) {
is StreamRecordWrapped -> {
writer.write(wrapped.record.serialized)
writer.write("\n")
outputStream.write(wrapped.record.serialized)
outputStream.write("\n")
val nextRange = range.withNextAdjacentValue(wrapped.index)
val nextSize = sizeBytes + wrapped.sizeBytes
val forceFlush =
Expand Down Expand Up @@ -108,8 +103,7 @@ interface SpillToDiskTaskFactory {

@Singleton
class DefaultSpillToDiskTaskFactory(
private val config: DestinationConfiguration,
private val tmpFileProvider: TempFileProvider,
private val spillFileProvider: SpillFileProvider,
private val queueSupplier:
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationRecordWrapped>>,
private val flushStrategy: FlushStrategy,
Expand All @@ -119,8 +113,7 @@ class DefaultSpillToDiskTaskFactory(
stream: DestinationStream
): SpillToDiskTask {
return DefaultSpillToDiskTask(
config,
tmpFileProvider,
spillFileProvider,
queueSupplier.get(stream.descriptor),
flushStrategy,
stream,
Expand All @@ -130,7 +123,7 @@ class DefaultSpillToDiskTaskFactory(
}

data class SpilledRawMessagesLocalFile(
val localFile: LocalFile,
val localFile: Path,
val totalSizeBytes: Long,
val indexRange: Range<Long>,
val endOfStream: Boolean = false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.util

import java.io.InputStream

fun InputStream.lineSequence(): Sequence<String> = bufferedReader(Charsets.UTF_8).lineSequence()
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ import java.nio.file.Path
@Requires(env = ["MockDestinationConfiguration"])
class MockDestinationConfiguration : DestinationConfiguration() {
override val recordBatchSizeBytes: Long = 1024L
override val tmpFileDirectory: Path = Path.of("/tmp-test")
override val firstStageTmpFilePrefix: String = "spilled"
override val firstStageTmpFileSuffix: String = ".jsonl"
override val tmpFileDirectory: Path = Path.of("tmp-test")

override val maxCheckpointFlushTimeMs: Long = 1000L
override val maxMessageQueueMemoryUsageRatio: Double = 0.1
Expand Down

This file was deleted.

Loading

0 comments on commit e13fae1

Please sign in to comment.