From 89f2db4793123d6a11ab73ace2b2668f8f3b0fd0 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Fri, 30 Aug 2024 12:12:02 -0700 Subject: [PATCH] (Incomplete) First Cut Load CDK with E2E Destination (#44822) --- airbyte-cdk/bulk/core/load/build.gradle | 12 ++ .../airbyte/cdk/command/DestinationCatalog.kt | 37 +++++ .../cdk/command/DestinationConfiguration.kt | 25 +++ .../DestinationConfigurationFactory.kt | 21 +++ .../airbyte/cdk/command/DestinationStream.kt | 45 ++++++ .../airbyte/cdk/command/WriteConfiguration.kt | 32 ++++ .../cdk/message/AirbyteStateMessageFactory.kt | 75 +++++++++ .../kotlin/io/airbyte/cdk/message/Batch.kt | 101 ++++++++++++ .../airbyte/cdk/message/DestinationMessage.kt | 145 +++++++++++++++++ .../message/DestinationMessageDeserializer.kt | 32 ++++ .../cdk/message/DestinationMessageQueue.kt | 101 ++++++++++++ .../io/airbyte/cdk/message/MessageQueue.kt | 83 ++++++++++ .../airbyte/cdk/message/MessageQueueReader.kt | 55 +++++++ .../airbyte/cdk/message/MessageQueueWriter.kt | 105 ++++++++++++ .../io/airbyte/cdk/state/MemoryManager.kt | 47 ++++++ .../io/airbyte/cdk/state/StateManager.kt | 142 ++++++++++++++++ .../io/airbyte/cdk/state/StreamManager.kt | 151 ++++++++++++++++++ .../io/airbyte/cdk/task/CloseStreamTask.kt | 54 +++++++ .../cdk/task/DestinationTaskLauncher.kt | 108 +++++++++++++ .../io/airbyte/cdk/task/OpenStreamTask.kt | 36 +++++ .../io/airbyte/cdk/task/ProcessBatchTask.kt | 57 +++++++ .../io/airbyte/cdk/task/ProcessRecordsTask.kt | 92 +++++++++++ .../kotlin/io/airbyte/cdk/task/SetupTask.kt | 35 ++++ .../io/airbyte/cdk/task/SpillToDiskTask.kt | 129 +++++++++++++++ .../io/airbyte/cdk/task/TaskLauncher.kt | 31 ++++ .../kotlin/io/airbyte/cdk/task/TaskRunner.kt | 51 ++++++ .../io/airbyte/cdk/task/TeardownTask.kt | 53 ++++++ .../io/airbyte/cdk/write/Destination.kt | 34 ++++ .../io/airbyte/cdk/write/InputConsumer.kt | 74 +++++++++ .../io/airbyte/cdk/write/StreamLoader.kt | 68 ++++++++ .../io/airbyte/cdk/write/WriteOperation.kt | 36 +++++ .../io/airbyte/cdk/write/InputConsumerTest.kt | 87 ++++++++++ 32 files changed, 2154 insertions(+) create mode 100644 airbyte-cdk/bulk/core/load/build.gradle create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationCatalog.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationConfiguration.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationConfigurationFactory.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationStream.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/WriteConfiguration.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/AirbyteStateMessageFactory.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/Batch.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessage.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessageDeserializer.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessageQueue.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/MessageQueue.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/MessageQueueReader.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/MessageQueueWriter.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/MemoryManager.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/StateManager.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/StreamManager.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/CloseStreamTask.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/DestinationTaskLauncher.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/OpenStreamTask.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/ProcessBatchTask.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/ProcessRecordsTask.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/SetupTask.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/SpillToDiskTask.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/TaskLauncher.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/TaskRunner.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/TeardownTask.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/Destination.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/InputConsumer.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/StreamLoader.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/WriteOperation.kt create mode 100644 airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/write/InputConsumerTest.kt diff --git a/airbyte-cdk/bulk/core/load/build.gradle b/airbyte-cdk/bulk/core/load/build.gradle new file mode 100644 index 000000000000..6200c67acc44 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/build.gradle @@ -0,0 +1,12 @@ +dependencies { + implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base') + implementation 'org.apache.commons:commons-lang3:3.14.0' + + // For ranges and rangesets + implementation("com.google.guava:guava:33.3.0-jre") + + testFixturesApi testFixtures(project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')) + + testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.7.1") + implementation "org.jetbrains.kotlin:kotlin-reflect:2.0.0" +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationCatalog.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationCatalog.kt new file mode 100644 index 000000000000..c476c049d840 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationCatalog.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.command + +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog +import io.micronaut.context.annotation.Factory +import jakarta.inject.Singleton + +/** + * Internal representation of destination streams. This is intended to be a case class specialized + * for usability. + */ +data class DestinationCatalog( + val streams: List = emptyList(), +) { + private val byDescriptor: Map = + streams.associateBy { it.descriptor } + + fun getStream(name: String, namespace: String): DestinationStream { + val descriptor = DestinationStream.Descriptor(namespace = namespace, name = name) + return byDescriptor[descriptor] + ?: throw IllegalArgumentException("Stream not found: namespace=$namespace, name=$name") + } +} + +@Factory +class DestinationCatalogFactory( + private val catalog: ConfiguredAirbyteCatalog, + private val streamFactory: DestinationStreamFactory +) { + @Singleton + fun make(): DestinationCatalog { + return DestinationCatalog(streams = catalog.streams.map { streamFactory.make(it) }) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationConfiguration.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationConfiguration.kt new file mode 100644 index 000000000000..12924d16f161 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationConfiguration.kt @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.command + +import io.micronaut.context.annotation.ConfigurationProperties +import io.micronaut.context.annotation.Factory +import jakarta.inject.Singleton + +@ConfigurationProperties("destination.config") +interface DestinationConfiguration : Configuration { + /** + * Micronaut factory which glues [ConfigurationJsonObjectSupplier] and + * [DestinationConfigurationFactory] together to produce a [DestinationConfiguration] singleton. + */ + @Factory + private class MicronautFactory { + @Singleton + fun sourceConfig( + pojoSupplier: ConfigurationJsonObjectSupplier, + factory: DestinationConfigurationFactory, + ): DestinationConfiguration = factory.make(pojoSupplier.get()) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationConfigurationFactory.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationConfigurationFactory.kt new file mode 100644 index 000000000000..982d134cab19 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationConfigurationFactory.kt @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.command + +import io.airbyte.cdk.ConfigErrorException + +interface DestinationConfigurationFactory< + I : ConfigurationJsonObjectBase, O : DestinationConfiguration> { + fun makeWithoutExceptionHandling(pojo: I): O + + /** Wraps [makeWithoutExceptionHandling] exceptions in [ConfigErrorException]. */ + fun make(pojo: I): O = + try { + makeWithoutExceptionHandling(pojo) + } catch (e: Exception) { + // Wrap NPEs (mostly) in ConfigErrorException. + throw ConfigErrorException("Failed to build ConnectorConfiguration.", e) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationStream.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationStream.kt new file mode 100644 index 000000000000..9d79d2bd948d --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationStream.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.command + +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream +import jakarta.inject.Singleton + +/** + * Internal representation of destination streams. This is intended to be a case class specialized + * for usability. + * + * TODO: Add missing info like sync type, generation_id, etc. + * + * TODO: Add dedicated schema type, converted from json-schema. + */ +class DestinationStream(val descriptor: Descriptor) { + data class Descriptor(val namespace: String, val name: String) + + override fun hashCode(): Int { + return descriptor.hashCode() + } + + override fun equals(other: Any?): Boolean { + return other is DestinationStream && descriptor == other.descriptor + } + + override fun toString(): String { + return "DestinationStream(descriptor=$descriptor)" + } +} + +@Singleton +class DestinationStreamFactory { + fun make(stream: ConfiguredAirbyteStream): DestinationStream { + return DestinationStream( + descriptor = + DestinationStream.Descriptor( + namespace = stream.stream.namespace, + name = stream.stream.name + ) + ) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/WriteConfiguration.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/WriteConfiguration.kt new file mode 100644 index 000000000000..2351a6b9a530 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/WriteConfiguration.kt @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.command + +import io.micronaut.context.annotation.Secondary +import jakarta.inject.Singleton + +/** + * General configuration for the write operation. The implementor can override this to tweak runtime + * behavior. + */ +interface WriteConfiguration { + /** Batch accumulation settings. */ + val recordBatchSizeBytes: Long + val firstStageTmpFilePrefix: String + + /** Memory queue settings */ + val maxMessageQueueMemoryUsageRatio: Double // as fraction of available memory + val estimatedRecordMemoryOverheadRatio: Double // 0 => No overhead, 1.0 => 2x overhead +} + +@Singleton +@Secondary +open class DefaultWriteConfiguration : WriteConfiguration { + override val recordBatchSizeBytes: Long = 200L * 1024L * 1024L + override val firstStageTmpFilePrefix = "airbyte-cdk-load-staged-raw-records" + + override val maxMessageQueueMemoryUsageRatio: Double = 0.2 + override val estimatedRecordMemoryOverheadRatio: Double = 0.1 +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/AirbyteStateMessageFactory.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/AirbyteStateMessageFactory.kt new file mode 100644 index 000000000000..ed88daa07553 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/AirbyteStateMessageFactory.kt @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.message + +import io.airbyte.protocol.models.v0.AirbyteGlobalState +import io.airbyte.protocol.models.v0.AirbyteStateMessage +import io.airbyte.protocol.models.v0.AirbyteStateStats +import io.airbyte.protocol.models.v0.AirbyteStreamState +import io.airbyte.protocol.models.v0.StreamDescriptor +import jakarta.inject.Singleton + +/** + * Converts the internal @[DestinationStateMessage] case class to the Protocol state messages + * required by @[io.airbyte.cdk.output.OutputConsumer] + */ +interface AirbyteStateMessageFactory { + fun fromDestinationStateMessage(message: DestinationStateMessage): AirbyteStateMessage +} + +@Singleton +class DefaultAirbyteStateMessageFactory : AirbyteStateMessageFactory { + override fun fromDestinationStateMessage( + message: DestinationStateMessage + ): AirbyteStateMessage { + return when (message) { + is DestinationStreamState -> + AirbyteStateMessage() + .withSourceStats( + AirbyteStateStats() + .withRecordCount(message.sourceStats.recordCount.toDouble()) + ) + .withDestinationStats( + message.destinationStats?.let { + AirbyteStateStats().withRecordCount(it.recordCount.toDouble()) + } + ?: throw IllegalStateException( + "Destination stats must be provided for DestinationStreamState" + ) + ) + .withType(AirbyteStateMessage.AirbyteStateType.STREAM) + .withStream(fromStreamState(message.streamState)) + is DestinationGlobalState -> + AirbyteStateMessage() + .withSourceStats( + AirbyteStateStats() + .withRecordCount(message.sourceStats.recordCount.toDouble()) + ) + .withDestinationStats( + message.destinationStats?.let { + AirbyteStateStats().withRecordCount(it.recordCount.toDouble()) + } + ) + .withType(AirbyteStateMessage.AirbyteStateType.GLOBAL) + .withGlobal( + AirbyteGlobalState() + .withSharedState(message.state) + .withStreamStates(message.streamStates.map { fromStreamState(it) }) + ) + } + } + + private fun fromStreamState( + streamState: DestinationStateMessage.StreamState + ): AirbyteStreamState { + return AirbyteStreamState() + .withStreamDescriptor( + StreamDescriptor() + .withNamespace(streamState.stream.descriptor.namespace) + .withName(streamState.stream.descriptor.name) + ) + .withStreamState(streamState.state) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/Batch.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/Batch.kt new file mode 100644 index 000000000000..7146d64877a0 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/Batch.kt @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.message + +import com.google.common.collect.Range +import com.google.common.collect.RangeSet +import com.google.common.collect.TreeRangeSet +import java.nio.file.Path + +/** + * Represents an accumulated batch of records in some stage of processing. + * + * Emitted by @[io.airbyte.cdk.write.StreamLoader.processRecords] to describe the batch accumulated. + * Non-[State.COMPLETE] batches are routed to @[io.airbyte.cdk.write.StreamLoader.processBatch] + * re-entrantly until completion. + * + * The framework will track the association between the Batch and the range of records it + * represents, by [Batch.State]s. The [State.PERSISTED] state has special meaning: it indicates that + * the associated ranges have been persisted remotely, and that platform checkpoint messages can be + * emitted. + * + * [State.SPOOLED] is used internally to indicate that records have been spooled to disk for + * processing and should not be used by implementors. + * + * When a stream has been read to End-of-stream, and all ranges between 0 and End-of-stream are + * [State.COMPLETE], then all records are considered to have been processed. + * + * The intended usage for implementors is to implement the provided interfaces in case classes that + * contain the necessary metadata for processing, using them in @ + * [io.airbyte.cdk.write.StreamLoader.processBatch] to route to the appropriate handler(s). + * + * For example: + * + * ```kotlin + * sealed class MyBatch: Batch + * data class MyLocalFile( + * override val path: Path, + * override val totalSizeBytes: Long + * ): StagedLocalFile + * data class MyRemoteObject( + * override val key: String + * ): RemoteObject + * // etc... + * ``` + */ +interface Batch { + enum class State { + SPOOLED, + LOCAL, + PERSISTED, + COMPLETE + } + + val state: State +} + +/** Simple batch: use if you need no other metadata for processing. */ +data class SimpleBatch(override val state: Batch.State) : Batch + +/** Represents a file of records locally staged. */ +abstract class StagedLocalFile() : Batch { + override val state: Batch.State = Batch.State.LOCAL + abstract val localPath: Path + abstract val totalSizeBytes: Long +} + +/** Represents a remote object containing persisted records. */ +abstract class RemoteObject() : Batch { + override val state: Batch.State = Batch.State.PERSISTED + abstract val key: String +} + +/** + * Represents a file of raw records staged to disk for pre-processing. Used internally by the + * framework + */ +data class SpooledRawMessagesLocalFile( + override val localPath: Path, + override val totalSizeBytes: Long, + override val state: Batch.State = Batch.State.SPOOLED +) : StagedLocalFile() + +/** + * Internally-used wrapper for tracking the association between a batch and the range of records it + * contains. + */ +data class BatchEnvelope( + val batch: B, + val ranges: RangeSet = TreeRangeSet.create() +) { + constructor( + batch: B, + range: Range + ) : this(batch = batch, ranges = TreeRangeSet.create(listOf(range))) + + fun withBatch(newBatch: C): BatchEnvelope { + return BatchEnvelope(newBatch, ranges) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessage.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessage.kt new file mode 100644 index 000000000000..1c7697579d05 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessage.kt @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.message + +import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.cdk.command.DestinationCatalog +import io.airbyte.cdk.command.DestinationStream +import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.AirbyteStateMessage +import io.airbyte.protocol.models.v0.AirbyteStreamState +import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus +import io.airbyte.protocol.models.v0.AirbyteTraceMessage +import jakarta.inject.Singleton + +/** + * Internal representation of destination messages. These are intended to be specialized for + * usability. Data should be marshalled to these from frontline deserialized objects. + */ +sealed class DestinationMessage + +/** Records. */ +sealed class DestinationRecordMessage : DestinationMessage() { + abstract val stream: DestinationStream +} + +data class DestinationRecord( + override val stream: DestinationStream, + val data: JsonNode? = null, + val emittedAtMs: Long, + val serialized: String +) : DestinationRecordMessage() + +data class DestinationStreamComplete( + override val stream: DestinationStream, + val emittedAtMs: Long +) : DestinationRecordMessage() + +/** State. */ +sealed class DestinationStateMessage : DestinationMessage() { + data class Stats(val recordCount: Long) + data class StreamState( + val stream: DestinationStream, + val state: JsonNode, + ) + + abstract val sourceStats: Stats + abstract val destinationStats: Stats? + + abstract fun withDestinationStats(stats: Stats): DestinationStateMessage +} + +data class DestinationStreamState( + val streamState: StreamState, + override val sourceStats: Stats, + override val destinationStats: Stats? = null +) : DestinationStateMessage() { + override fun withDestinationStats(stats: Stats) = + DestinationStreamState(streamState, sourceStats, stats) +} + +data class DestinationGlobalState( + val state: JsonNode, + override val sourceStats: Stats, + override val destinationStats: Stats? = null, + val streamStates: List = emptyList() +) : DestinationStateMessage() { + override fun withDestinationStats(stats: Stats) = + DestinationGlobalState(state, sourceStats, stats, streamStates) +} + +/** Catchall for anything unimplemented. */ +data object Undefined : DestinationMessage() + +@Singleton +class DestinationMessageFactory(private val catalog: DestinationCatalog) { + fun fromAirbyteMessage(message: AirbyteMessage, serialized: String): DestinationMessage { + return when (message.type) { + AirbyteMessage.Type.RECORD -> + DestinationRecord( + stream = + catalog.getStream( + namespace = message.record.namespace, + name = message.record.stream, + ), + // TODO: Map to AirbyteType + data = message.record.data, + emittedAtMs = message.record.emittedAt, + serialized = serialized + ) + AirbyteMessage.Type.TRACE -> { + val status = message.trace.streamStatus + val stream = + catalog.getStream( + namespace = status.streamDescriptor.namespace, + name = status.streamDescriptor.name, + ) + if ( + message.trace.type == AirbyteTraceMessage.Type.STREAM_STATUS && + status.status == AirbyteStreamStatus.COMPLETE + ) { + DestinationStreamComplete(stream, message.trace.emittedAt.toLong()) + } else { + Undefined + } + } + AirbyteMessage.Type.STATE -> { + when (message.state.type) { + AirbyteStateMessage.AirbyteStateType.STREAM -> + DestinationStreamState( + streamState = fromAirbyteStreamState(message.state.stream), + sourceStats = + DestinationStateMessage.Stats( + recordCount = message.state.sourceStats.recordCount.toLong() + ) + ) + AirbyteStateMessage.AirbyteStateType.GLOBAL -> + DestinationGlobalState( + sourceStats = + DestinationStateMessage.Stats( + recordCount = message.state.sourceStats.recordCount.toLong() + ), + state = message.state.global.sharedState, + streamStates = + message.state.global.streamStates.map { fromAirbyteStreamState(it) } + ) + else -> // TODO: Do we still need to handle LEGACY? + Undefined + } + } + else -> Undefined + } + } + + private fun fromAirbyteStreamState( + streamState: AirbyteStreamState + ): DestinationStateMessage.StreamState { + val descriptor = streamState.streamDescriptor + return DestinationStateMessage.StreamState( + stream = catalog.getStream(namespace = descriptor.namespace, name = descriptor.name), + state = streamState.streamState + ) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessageDeserializer.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessageDeserializer.kt new file mode 100644 index 000000000000..d9fd81667010 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessageDeserializer.kt @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.message + +import io.airbyte.cdk.util.Jsons +import io.airbyte.protocol.models.v0.AirbyteMessage +import jakarta.inject.Singleton + +interface Deserializer { + fun deserialize(serialized: String): T +} + +/** + * Converts the internal @[AirbyteMessage] to the internal @[DestinationMessage] Ideally, this would + * not use protocol messages at all, but rather a specialized deserializer for routing. + */ +@Singleton +class DefaultDestinationMessageDeserializer(private val messageFactory: DestinationMessageFactory) : + Deserializer { + + override fun deserialize(serialized: String): DestinationMessage { + try { + val node = Jsons.readTree(serialized) + val airbyteMessage = Jsons.treeToValue(node, AirbyteMessage::class.java) + return messageFactory.fromAirbyteMessage(airbyteMessage, serialized) + } catch (t: Throwable) { + throw RuntimeException("Failed to deserialize AirbyteMessage") + } + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessageQueue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessageQueue.kt new file mode 100644 index 000000000000..a8ed65668658 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessageQueue.kt @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.message + +import io.airbyte.cdk.command.DestinationCatalog +import io.airbyte.cdk.command.DestinationStream +import io.airbyte.cdk.command.WriteConfiguration +import io.airbyte.cdk.state.MemoryManager +import io.github.oshai.kotlinlogging.KotlinLogging +import jakarta.inject.Singleton +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock +import kotlinx.coroutines.runBlocking + +/** + * Wrapper for record messages published to the message queue, containing metadata like index and + * size. + * + * In a future where we deserialize only the info necessary for routing, this could include a dumb + * container for the serialized, and deserialization could be deferred until the spooled records + * were recovered from disk. + */ +sealed class DestinationRecordWrapped : Sized + +data class StreamRecordWrapped( + val index: Long, + override val sizeBytes: Long, + val record: DestinationRecord +) : DestinationRecordWrapped() + +data class StreamCompleteWrapped( + val index: Long, +) : DestinationRecordWrapped() { + override val sizeBytes: Long = 0L +} + +/** + * Message queue to which @[DestinationRecordWrapped] messages can be published on a @ + * [DestinationStream] key. + * + * It maintains a map of @[QueueChannel]s by stream, and tracks the memory usage across all + * channels, blocking when the maximum is reached. + * + * This maximum is expected to be low, as the assumption is that data will be spooled to disk as + * quickly as possible. + */ +@Singleton +class DestinationMessageQueue( + catalog: DestinationCatalog, + config: WriteConfiguration, + private val memoryManager: MemoryManager, + private val queueChannelFactory: QueueChannelFactory +) : MessageQueue { + private val channels: + ConcurrentHashMap> = + ConcurrentHashMap() + + private val totalQueueSizeBytes = AtomicLong(0L) + private val maxQueueSizeBytes: Long + private val memoryLock = ReentrantLock() + private val memoryLockCondition = memoryLock.newCondition() + + init { + catalog.streams.forEach { channels[it.descriptor] = queueChannelFactory.make(this) } + val adjustedRatio = + config.maxMessageQueueMemoryUsageRatio / + (1.0 + config.estimatedRecordMemoryOverheadRatio) + maxQueueSizeBytes = runBlocking { memoryManager.reserveRatio(adjustedRatio) } + } + + override suspend fun acquireQueueBytesBlocking(bytes: Long) { + memoryLock.withLock { + while (totalQueueSizeBytes.get() + bytes > maxQueueSizeBytes) { + memoryLockCondition.await() + } + totalQueueSizeBytes.addAndGet(bytes) + } + } + + override suspend fun releaseQueueBytes(bytes: Long) { + memoryLock.withLock { + totalQueueSizeBytes.addAndGet(-bytes) + memoryLockCondition.signalAll() + } + } + + override suspend fun getChannel( + key: DestinationStream, + ): QueueChannel { + return channels[key.descriptor] + ?: throw IllegalArgumentException( + "Reading from non-existent QueueChannel: ${key.descriptor}" + ) + } + + private val log = KotlinLogging.logger {} +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/MessageQueue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/MessageQueue.kt new file mode 100644 index 000000000000..8d392f8afa3d --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/MessageQueue.kt @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.message + +import jakarta.inject.Singleton +import java.util.concurrent.atomic.AtomicBoolean +import kotlinx.coroutines.channels.Channel + +interface Sized { + val sizeBytes: Long +} + +interface MessageQueue { + suspend fun acquireQueueBytesBlocking(bytes: Long) + suspend fun releaseQueueBytes(bytes: Long) + suspend fun getChannel(key: K): QueueChannel +} + +interface QueueChannel { + suspend fun close() + suspend fun isClosed(): Boolean + suspend fun send(message: T) + suspend fun receive(): T +} + +/** A channel that blocks when its parent queue has no available memory. */ +interface BlockingQueueChannel : QueueChannel { + val messageQueue: MessageQueue<*, T> + val channel: Channel + + override suspend fun send(message: T) { + if (isClosed()) { + throw IllegalStateException("Send to closed QueueChannel") + } + val estimatedSize = message.sizeBytes + messageQueue.acquireQueueBytesBlocking(estimatedSize) + channel.send(message) + } + + override suspend fun receive(): T { + if (isClosed()) { + throw IllegalStateException("Receive from closed QueueChannel") + } + val message = channel.receive() + val estimatedSize = message.sizeBytes + messageQueue.releaseQueueBytes(estimatedSize) + return message + } +} + +interface QueueChannelFactory { + fun make(messageQueue: MessageQueue<*, T>): QueueChannel +} + +/** + * The default queue channel is just a dumb wrapper around an unlimited kotlin channel of wrapped + * records. + * + * Note: we wrap channel closedness in an atomic boolean because the @[Channel.isClosedForSend] and + * @[Channel.isClosedForReceive] apis are marked as delicate/experimental. + */ +class DefaultQueueChannel(override val messageQueue: MessageQueue<*, DestinationRecordWrapped>) : + BlockingQueueChannel { + override val channel = Channel(Channel.UNLIMITED) + private val closed = AtomicBoolean(false) + + override suspend fun close() { + if (closed.compareAndSet(false, true)) { + channel.close() + } + } + + override suspend fun isClosed(): Boolean = closed.get() +} + +@Singleton +class DefaultQueueChannelFactory : QueueChannelFactory { + override fun make( + messageQueue: MessageQueue<*, DestinationRecordWrapped> + ): QueueChannel = DefaultQueueChannel(messageQueue) +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/MessageQueueReader.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/MessageQueueReader.kt new file mode 100644 index 000000000000..a9269a28f51b --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/MessageQueueReader.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.message + +import io.airbyte.cdk.command.DestinationStream +import io.github.oshai.kotlinlogging.KotlinLogging +import jakarta.inject.Singleton +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow + +/** + * A reader should provide a byte-limited flow of messages of the underlying type. The flow should + * terminate when maxBytes has been read, or when the stream is complete. + */ +interface MessageQueueReader { + suspend fun readChunk(key: K, maxBytes: Long): Flow +} + +@Singleton +class DestinationMessageQueueReader( + private val messageQueue: DestinationMessageQueue, +) : MessageQueueReader { + private val log = KotlinLogging.logger {} + + override suspend fun readChunk( + key: DestinationStream, + maxBytes: Long + ): Flow = flow { + log.info { "Reading chunk of $maxBytes bytes from stream $key" } + + var totalBytesRead = 0L + var recordsRead = 0L + while (totalBytesRead < maxBytes) { + when (val wrapped = messageQueue.getChannel(key).receive()) { + is StreamRecordWrapped -> { + totalBytesRead += wrapped.sizeBytes + emit(wrapped) + } + is StreamCompleteWrapped -> { + messageQueue.getChannel(key).close() + emit(wrapped) + log.info { "Read end-of-stream for $key" } + return@flow + } + } + recordsRead++ + } + + log.info { "Read $recordsRead records (${totalBytesRead}b) from stream $key" } + + return@flow + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/MessageQueueWriter.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/MessageQueueWriter.kt new file mode 100644 index 000000000000..898ce683ba2f --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/MessageQueueWriter.kt @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.message + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings +import io.airbyte.cdk.command.DestinationCatalog +import io.airbyte.cdk.command.DestinationStream +import io.airbyte.cdk.state.StateManager +import io.airbyte.cdk.state.StreamsManager +import jakarta.inject.Singleton + +/** A publishing interface for writing messages to a message queue. */ +interface MessageQueueWriter { + suspend fun publish(message: T, sizeBytes: Long) +} + +/** + * Routes @[DestinationRecordMessage]s by stream to the appropriate channel and @ + * [DestinationStateMessage]s to the state manager. + * + * TODO: Handle other message types. + */ +@Singleton +@SuppressFBWarnings( + "NP_NONNULL_PARAM_VIOLATION", + justification = "message is guaranteed to be non-null by Kotlin's type system" +) +class DestinationMessageQueueWriter( + private val catalog: DestinationCatalog, + private val messageQueue: MessageQueue, + private val streamsManager: StreamsManager, + private val stateManager: StateManager +) : MessageQueueWriter { + /** + * Deserialize and route the message to the appropriate channel. + * + * NOTE: Not thread-safe! Only a single writer should publish to the queue. + */ + override suspend fun publish(message: DestinationMessage, sizeBytes: Long) { + when (message) { + /* If the input message represents a record. */ + is DestinationRecordMessage -> { + val manager = streamsManager.getManager(message.stream) + val index = manager.countRecordIn(sizeBytes) + when (message) { + /* If a data record */ + is DestinationRecord -> { + val wrapped = + StreamRecordWrapped( + index = index, + sizeBytes = sizeBytes, + record = message + ) + messageQueue.getChannel(message.stream).send(wrapped) + } + + /* If an end-of-stream marker. */ + is DestinationStreamComplete -> { + val wrapped = StreamCompleteWrapped(index) + messageQueue.getChannel(message.stream).send(wrapped) + } + } + } + is DestinationStateMessage -> { + when (message) { + /** + * For a stream state message, mark the checkpoint and add the message with + * index and count to the state manager. Also, add the count to the destination + * stats. + */ + is DestinationStreamState -> { + val stream = message.streamState.stream + val manager = streamsManager.getManager(stream) + val (currentIndex, countSinceLast) = manager.markCheckpoint() + val messageWithCount = + message.withDestinationStats( + DestinationStateMessage.Stats(countSinceLast) + ) + stateManager.addStreamState(stream, currentIndex, messageWithCount) + } + /** + * For a global state message, collect the index per stream, but add the total + * count to the destination stats. + */ + is DestinationGlobalState -> { + val streamWithIndexAndCount = + catalog.streams.map { stream -> + val manager = streamsManager.getManager(stream) + val (currentIndex, countSinceLast) = manager.markCheckpoint() + Triple(stream, currentIndex, countSinceLast) + } + val totalCount = streamWithIndexAndCount.sumOf { it.third } + val messageWithCount = + message.withDestinationStats(DestinationStateMessage.Stats(totalCount)) + val streamIndexes = streamWithIndexAndCount.map { it.first to it.second } + stateManager.addGlobalState(streamIndexes, messageWithCount) + } + } + } + is Undefined -> {} // Do nothing + } + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/MemoryManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/MemoryManager.kt new file mode 100644 index 000000000000..4e03a2ab9b23 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/MemoryManager.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.state + +import jakarta.inject.Singleton +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + +/** + * Manages memory usage for the destination. + * + * TODO: Better initialization of available runtime memory? + * + * TODO: Some degree of logging/monitoring around how accurate we're actually being? + */ +@Singleton +class MemoryManager { + private val availableMemoryBytes: Long = Runtime.getRuntime().maxMemory() + private var usedMemoryBytes = AtomicLong(0L) + private val memoryLock = ReentrantLock() + private val memoryLockCondition = memoryLock.newCondition() + + suspend fun reserveBlocking(memoryBytes: Long) { + memoryLock.withLock { + while (usedMemoryBytes.get() + memoryBytes > availableMemoryBytes) { + memoryLockCondition.await() + } + usedMemoryBytes.addAndGet(memoryBytes) + } + } + + suspend fun reserveRatio(ratio: Double): Long { + val estimatedSize = (availableMemoryBytes.toDouble() * ratio).toLong() + reserveBlocking(estimatedSize) + return estimatedSize + } + + fun release(memoryBytes: Long) { + memoryLock.withLock { + usedMemoryBytes.addAndGet(-memoryBytes) + memoryLockCondition.signalAll() + } + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/StateManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/StateManager.kt new file mode 100644 index 000000000000..e6c47ecd6dd5 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/StateManager.kt @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.state + +import io.airbyte.cdk.command.DestinationCatalog +import io.airbyte.cdk.command.DestinationStream +import io.airbyte.cdk.message.AirbyteStateMessageFactory +import io.airbyte.cdk.message.DestinationStateMessage +import io.airbyte.cdk.output.OutputConsumer +import io.github.oshai.kotlinlogging.KotlinLogging +import jakarta.inject.Singleton +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +/** + * Interface for state management. Should accept stream and global state, as well as requests to + * flush all data-sufficient states. + */ +interface StateManager { + fun addStreamState( + stream: DestinationStream, + index: Long, + stateMessage: DestinationStateMessage + ) + fun addGlobalState( + streamIndexes: List>, + stateMessage: DestinationStateMessage + ) + fun flushStates() +} + +/** + * Destination state manager. + * + * Accepts global and stream states, and enforces that stream and global state are not mixed. + * Determines ready states by querying the StreamsManager for the state of the record index range + * associated with each state message. + * + * TODO: Force flush on a configured schedule + * + * TODO: Ensure that state is flushed at the end, and require that all state be flushed before the + * destination can succeed. + */ +@Singleton +class DefaultStateManager( + private val catalog: DestinationCatalog, + private val streamsManager: StreamsManager, + private val stateMessageFactory: AirbyteStateMessageFactory, + private val outputConsumer: OutputConsumer +) : StateManager { + private val log = KotlinLogging.logger {} + + data class GlobalState( + val streamIndexes: List>, + val stateMessage: DestinationStateMessage + ) + + private val stateIsGlobal: AtomicReference = AtomicReference(null) + private val streamStates: + ConcurrentHashMap> = + ConcurrentHashMap() + private val globalStates: ConcurrentLinkedQueue = ConcurrentLinkedQueue() + + override fun addStreamState( + stream: DestinationStream, + index: Long, + stateMessage: DestinationStateMessage + ) { + if (stateIsGlobal.getAndSet(false) != false) { + throw IllegalStateException("Global state cannot be mixed with non-global state") + } + + val streamStates = streamStates.getOrPut(stream) { LinkedHashMap() } + streamStates[index] = stateMessage + log.info { "Added state for stream: $stream at index: $index" } + } + + override fun addGlobalState( + streamIndexes: List>, + stateMessage: DestinationStateMessage + ) { + if (stateIsGlobal.getAndSet(true) != true) { + throw IllegalStateException("Global state cannot be mixed with non-global state") + } + + globalStates.add(GlobalState(streamIndexes, stateMessage)) + log.info { "Added global state with stream indexes: $streamIndexes" } + } + + override fun flushStates() { + /* + Iterate over the states in order, evicting each that passes + the persistence check. If a state is not persisted, then + we can break the loop since the states are ordered. For global + states, all streams must be persisted up to the checkpoint. + */ + when (stateIsGlobal.get()) { + null -> log.info { "No states to flush" } + true -> flushGlobalStates() + false -> flushStreamStates() + } + } + + private fun flushGlobalStates() { + if (globalStates.isEmpty()) { + return + } + + val head = globalStates.peek() + val allStreamsPersisted = + head.streamIndexes.all { (stream, index) -> + streamsManager.getManager(stream).areRecordsPersistedUntil(index) + } + if (allStreamsPersisted) { + globalStates.poll() + val outMessage = stateMessageFactory.fromDestinationStateMessage(head.stateMessage) + outputConsumer.accept(outMessage) + } + } + + private fun flushStreamStates() { + for (stream in catalog.streams) { + val manager = streamsManager.getManager(stream) + val streamStates = streamStates[stream] ?: return + for (index in streamStates.keys) { + if (manager.areRecordsPersistedUntil(index)) { + val stateMessage = + streamStates.remove(index) + ?: throw IllegalStateException("State not found for index: $index") + log.info { "Flushing state for stream: $stream at index: $index" } + val outMessage = stateMessageFactory.fromDestinationStateMessage(stateMessage) + outputConsumer.accept(outMessage) + } else { + break + } + } + } + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/StreamManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/StreamManager.kt new file mode 100644 index 000000000000..b0faf0188d08 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/StreamManager.kt @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.state + +import com.google.common.collect.Range +import com.google.common.collect.RangeSet +import com.google.common.collect.TreeRangeSet +import io.airbyte.cdk.command.DestinationCatalog +import io.airbyte.cdk.command.DestinationStream +import io.airbyte.cdk.message.Batch +import io.airbyte.cdk.message.BatchEnvelope +import io.github.oshai.kotlinlogging.KotlinLogging +import io.micronaut.context.annotation.Factory +import jakarta.inject.Singleton +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicLong +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext + +/** Manages the state of all streams in the destination. */ +interface StreamsManager { + fun getManager(stream: DestinationStream): StreamManager + suspend fun awaitAllStreamsComplete() +} + +class DefaultStreamsManager( + private val streamManagers: ConcurrentHashMap +) : StreamsManager { + override fun getManager(stream: DestinationStream): StreamManager { + return streamManagers[stream] ?: throw IllegalArgumentException("Stream not found: $stream") + } + + override suspend fun awaitAllStreamsComplete() { + streamManagers.forEach { (_, manager) -> manager.awaitStreamClosed() } + } +} + +/** Manages the state of a single stream. */ +interface StreamManager { + fun countRecordIn(sizeBytes: Long): Long + fun markCheckpoint(): Pair + fun updateBatchState(batch: BatchEnvelope) + fun isBatchProcessingComplete(): Boolean + fun areRecordsPersistedUntil(index: Long): Boolean + + fun markClosed() + fun streamIsClosed(): Boolean + suspend fun awaitStreamClosed() +} + +/** + * Maintains a map of stream -> status metadata, and a map of batch state -> record ranges for which + * that state has been reached. + * + * TODO: Log a detailed report of the stream status on a regular cadence. + */ +class DefaultStreamManager( + val stream: DestinationStream, +) : StreamManager { + private val log = KotlinLogging.logger {} + + data class StreamStatus( + val recordCount: AtomicLong = AtomicLong(0), + val totalBytes: AtomicLong = AtomicLong(0), + val enqueuedSize: AtomicLong = AtomicLong(0), + val lastCheckpoint: AtomicLong = AtomicLong(0L), + val closedLatch: CountDownLatch = CountDownLatch(1), + ) + + private val streamStatus: StreamStatus = StreamStatus() + private val rangesState: ConcurrentHashMap> = ConcurrentHashMap() + + init { + Batch.State.entries.forEach { rangesState[it] = TreeRangeSet.create() } + } + + override fun countRecordIn(sizeBytes: Long): Long { + val index = streamStatus.recordCount.getAndIncrement() + streamStatus.totalBytes.addAndGet(sizeBytes) + streamStatus.enqueuedSize.addAndGet(sizeBytes) + return index + } + + /** + * Mark a checkpoint in the stream and return the current index and the number of records since + * the last one. + */ + override fun markCheckpoint(): Pair { + val index = streamStatus.recordCount.get() + val lastCheckpoint = streamStatus.lastCheckpoint.getAndSet(index) + return Pair(index, index - lastCheckpoint) + } + + /** Record that the given batch's state has been reached for the associated range(s). */ + override fun updateBatchState(batch: BatchEnvelope) { + val stateRanges = + rangesState[batch.batch.state] + ?: throw IllegalArgumentException("Invalid batch state: ${batch.batch.state}") + + stateRanges.addAll(batch.ranges) + log.info { "Updated ranges for $stream[${batch.batch.state}]: $stateRanges" } + } + + /** True if all records in [0, index] have reached the given state. */ + private fun isProcessingCompleteForState(index: Long, state: Batch.State): Boolean { + + val completeRanges = rangesState[state]!! + return completeRanges.encloses(Range.closed(0L, index - 1)) + } + + /** True if all records have associated [Batch.State.COMPLETE] batches. */ + override fun isBatchProcessingComplete(): Boolean { + return isProcessingCompleteForState(streamStatus.recordCount.get(), Batch.State.COMPLETE) + } + + /** + * True if all records in [0, index] have at least reached [Batch.State.PERSISTED]. This is + * implicitly true if they have all reached [Batch.State.COMPLETE]. + */ + override fun areRecordsPersistedUntil(index: Long): Boolean { + return isProcessingCompleteForState(index, Batch.State.PERSISTED) || + isProcessingCompleteForState(index, Batch.State.COMPLETE) // complete => persisted + } + + override fun markClosed() { + streamStatus.closedLatch.countDown() + } + + override fun streamIsClosed(): Boolean { + return streamStatus.closedLatch.count == 0L + } + + override suspend fun awaitStreamClosed() { + withContext(Dispatchers.IO) { streamStatus.closedLatch.await() } + } +} + +@Factory +class StreamsManagerFactory( + private val catalog: DestinationCatalog, +) { + @Singleton + fun make(): StreamsManager { + val hashMap = ConcurrentHashMap() + catalog.streams.forEach { hashMap[it] = DefaultStreamManager(it) } + return DefaultStreamsManager(hashMap) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/CloseStreamTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/CloseStreamTask.kt new file mode 100644 index 000000000000..f0508bcba80f --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/CloseStreamTask.kt @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.task + +import io.airbyte.cdk.command.DestinationStream +import io.airbyte.cdk.state.StreamManager +import io.airbyte.cdk.state.StreamsManager +import io.airbyte.cdk.write.StreamLoader +import io.micronaut.context.annotation.Secondary +import jakarta.inject.Singleton +import org.apache.mina.util.ConcurrentHashSet + +/** + * Wraps @[StreamLoader.close] and marks the stream as closed in the stream manager. Also starts the + * teardown task. + */ +class CloseStreamTask( + private val streamLoader: StreamLoader, + private val streamManager: StreamManager, + private val taskLauncher: DestinationTaskLauncher +) : Task { + companion object { + val oncePerStream: ConcurrentHashSet = ConcurrentHashSet() + } + + override suspend fun execute() { + /** Guard against running this more than once per stream */ + if (oncePerStream.contains(streamLoader.stream) || streamManager.streamIsClosed()) { + return + } + oncePerStream.add(streamLoader.stream) + streamLoader.close() + streamManager.markClosed() + /* TODO: just signal to the launcher that the stream is closed + and let it decide what to do next */ + taskLauncher.startTeardownTask() + } +} + +@Singleton +@Secondary +class CloseStreamTaskFactory( + private val streamsManager: StreamsManager, +) { + fun make(taskLauncher: DestinationTaskLauncher, streamLoader: StreamLoader): CloseStreamTask { + return CloseStreamTask( + streamLoader, + streamsManager.getManager(streamLoader.stream), + taskLauncher + ) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/DestinationTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/DestinationTaskLauncher.kt new file mode 100644 index 000000000000..c713aa654db2 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/DestinationTaskLauncher.kt @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.task + +import io.airbyte.cdk.command.DestinationCatalog +import io.airbyte.cdk.message.BatchEnvelope +import io.airbyte.cdk.message.SpooledRawMessagesLocalFile +import io.airbyte.cdk.write.StreamLoader +import io.github.oshai.kotlinlogging.KotlinLogging +import io.micronaut.context.annotation.Factory +import io.micronaut.context.annotation.Secondary +import jakarta.inject.Provider +import jakarta.inject.Singleton + +/** + * Governs the task workflow for the entire destination life-cycle. + * + * The domain is "decide what to do next given the reported results of the individual task." + * + * TODO: Some of that logic still lives in the tasks. Migrate it here. + */ +class DestinationTaskLauncher( + private val catalog: DestinationCatalog, + override val taskRunner: TaskRunner, + private val setupTaskFactory: SetupTaskFactory, + private val openStreamTaskFactory: OpenStreamTaskFactory, + private val spillToDiskTaskFactory: SpillToDiskTaskFactory, + private val processRecordsTaskFactory: ProcessRecordsTaskFactory, + private val processBatchTaskFactory: ProcessBatchTaskFactory, + private val closeStreamTaskFactory: CloseStreamTaskFactory, + private val teardownTaskFactory: TeardownTaskFactory +) : TaskLauncher { + private val log = KotlinLogging.logger {} + + override suspend fun start() { + log.info { "Starting startup task" } + taskRunner.enqueue(setupTaskFactory.make(this)) + } + + suspend fun startOpenStreamTasks() { + catalog.streams.forEach { + log.info { "Starting open stream task for $it" } + taskRunner.enqueue(openStreamTaskFactory.make(this, it)) + } + } + + suspend fun startSpillToDiskTasks(streamLoader: StreamLoader) { + log.info { "Starting spill-to-disk task for ${streamLoader.stream}" } + val task = spillToDiskTaskFactory.make(this, streamLoader) + taskRunner.enqueue(task) + } + + suspend fun startProcessRecordsTask( + streamLoader: StreamLoader, + fileEnvelope: BatchEnvelope + ) { + log.info { + "Starting process records task for ${streamLoader.stream}, file ${fileEnvelope.batch}" + } + taskRunner.enqueue(processRecordsTaskFactory.make(this, streamLoader, fileEnvelope)) + } + + suspend fun startProcessBatchTask(streamLoader: StreamLoader, batch: BatchEnvelope<*>) { + log.info { "Starting process batch task for ${streamLoader.stream}, batch ${batch.batch}" } + taskRunner.enqueue(processBatchTaskFactory.make(this, streamLoader, batch)) + } + + suspend fun startCloseStreamTasks(streamLoader: StreamLoader) { + log.info { "Starting close stream task for ${streamLoader.stream}" } + taskRunner.enqueue(closeStreamTaskFactory.make(this, streamLoader)) + } + + suspend fun startTeardownTask() { + log.info { "Starting teardown task" } + taskRunner.enqueue(teardownTaskFactory.make(this)) + } +} + +@Factory +class DestinationTaskLauncherFactory( + private val catalog: DestinationCatalog, + private val taskRunner: TaskRunner, + private val setupTaskFactory: SetupTaskFactory, + private val openStreamTaskFactory: OpenStreamTaskFactory, + private val spillToDiskTaskFactory: SpillToDiskTaskFactory, + private val processRecordsTaskFactory: ProcessRecordsTaskFactory, + private val processBatchTaskFactory: ProcessBatchTaskFactory, + private val closeStreamTaskFactory: CloseStreamTaskFactory, + private val teardownTaskFactory: TeardownTaskFactory +) : Provider { + @Singleton + @Secondary + override fun get(): DestinationTaskLauncher { + return DestinationTaskLauncher( + catalog, + taskRunner, + setupTaskFactory, + openStreamTaskFactory, + spillToDiskTaskFactory, + processRecordsTaskFactory, + processBatchTaskFactory, + closeStreamTaskFactory, + teardownTaskFactory, + ) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/OpenStreamTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/OpenStreamTask.kt new file mode 100644 index 000000000000..b36f64c14a16 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/OpenStreamTask.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.task + +import io.airbyte.cdk.command.DestinationStream +import io.airbyte.cdk.write.Destination +import io.airbyte.cdk.write.StreamLoader +import io.micronaut.context.annotation.Secondary +import jakarta.inject.Singleton + +/** + * Wraps @[StreamLoader.open] and starts the spill-to-disk tasks. + * + * TODO: There's no reason to wait on initialization to start spilling to disk. + */ +class OpenStreamTask( + private val streamLoader: StreamLoader, + private val taskLauncher: DestinationTaskLauncher +) : Task { + override suspend fun execute() { + streamLoader.open() + taskLauncher.startSpillToDiskTasks(streamLoader) + } +} + +@Singleton +@Secondary +class OpenStreamTaskFactory( + private val destination: Destination, +) { + fun make(taskLauncher: DestinationTaskLauncher, stream: DestinationStream): OpenStreamTask { + return OpenStreamTask(destination.getStreamLoader(stream), taskLauncher) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/ProcessBatchTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/ProcessBatchTask.kt new file mode 100644 index 000000000000..43ba8c36f2a9 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/ProcessBatchTask.kt @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.task + +import io.airbyte.cdk.message.Batch +import io.airbyte.cdk.message.BatchEnvelope +import io.airbyte.cdk.state.StreamManager +import io.airbyte.cdk.state.StreamsManager +import io.airbyte.cdk.write.StreamLoader +import io.micronaut.context.annotation.Secondary +import jakarta.inject.Singleton + +/** + * Wraps @[StreamLoader.processBatch] and handles the resulting batch, possibly calling back into + * the task or initiating close stream if processing is complete. + * + * TODO: Move handling batch results into the task launcher. + */ +class ProcessBatchTask( + private val batchEnvelope: BatchEnvelope<*>, + private val streamLoader: StreamLoader, + private val streamManager: StreamManager, + private val taskLauncher: DestinationTaskLauncher +) : Task { + override suspend fun execute() { + val nextBatch = streamLoader.processBatch(batchEnvelope.batch) + val nextWrapped = batchEnvelope.withBatch(nextBatch) + streamManager.updateBatchState(nextWrapped) + + if (nextBatch.state != Batch.State.COMPLETE) { + taskLauncher.startProcessBatchTask(streamLoader, nextWrapped) + } else if (streamManager.isBatchProcessingComplete()) { + taskLauncher.startCloseStreamTasks(streamLoader) + } + } +} + +@Singleton +@Secondary +class ProcessBatchTaskFactory( + private val streamsManager: StreamsManager, +) { + fun make( + taskLauncher: DestinationTaskLauncher, + streamLoader: StreamLoader, + batchEnvelope: BatchEnvelope<*> + ): ProcessBatchTask { + return ProcessBatchTask( + batchEnvelope, + streamLoader, + streamsManager.getManager(streamLoader.stream), + taskLauncher + ) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/ProcessRecordsTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/ProcessRecordsTask.kt new file mode 100644 index 000000000000..1e1d1f8417d6 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/ProcessRecordsTask.kt @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.task + +import io.airbyte.cdk.message.Batch +import io.airbyte.cdk.message.BatchEnvelope +import io.airbyte.cdk.message.Deserializer +import io.airbyte.cdk.message.DestinationMessage +import io.airbyte.cdk.message.DestinationRecord +import io.airbyte.cdk.message.DestinationRecordMessage +import io.airbyte.cdk.message.DestinationStreamComplete +import io.airbyte.cdk.message.SpooledRawMessagesLocalFile +import io.airbyte.cdk.state.StreamManager +import io.airbyte.cdk.state.StreamsManager +import io.airbyte.cdk.write.StreamLoader +import io.micronaut.context.annotation.Secondary +import jakarta.inject.Singleton +import kotlin.io.path.bufferedReader +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext + +/** + * Wraps @[StreamLoader.processRecords] and feeds it a lazy iterator over the last batch of spooled + * records. On completion it rewraps the processed batch in the old envelope and kicks off batch + * handling and/or close stream tasks. + * + * TODO: The batch handling logic here is identical to that in @[ProcessBatchTask]. Both should be + * moved to the task launcher. + */ +class ProcessRecordsTask( + private val streamLoader: StreamLoader, + private val streamManager: StreamManager, + private val taskLauncher: DestinationTaskLauncher, + private val fileEnvelope: BatchEnvelope, + private val deserializer: Deserializer, +) : Task { + override suspend fun execute() { + val nextBatch = + withContext(Dispatchers.IO) { + val records = + fileEnvelope.batch.localPath + .bufferedReader(Charsets.UTF_8) + .lineSequence() + .map { + when (val record = deserializer.deserialize(it)) { + is DestinationRecordMessage -> record + else -> + throw IllegalStateException( + "Expected record message, got ${record::class}" + ) + } + } + .takeWhile { it !is DestinationStreamComplete } + .map { it as DestinationRecord } + .iterator() + streamLoader.processRecords(records, fileEnvelope.batch.totalSizeBytes) + } + + val wrapped = fileEnvelope.withBatch(nextBatch) + streamManager.updateBatchState(wrapped) + + // TODO: Move this logic into the task launcher + if (nextBatch.state != Batch.State.COMPLETE) { + taskLauncher.startProcessBatchTask(streamLoader, wrapped) + } else if (streamManager.isBatchProcessingComplete()) { + taskLauncher.startCloseStreamTasks(streamLoader) + } + } +} + +@Singleton +@Secondary +class ProcessRecordsTaskFactory( + private val streamsManager: StreamsManager, + private val deserializer: Deserializer, +) { + fun make( + taskLauncher: DestinationTaskLauncher, + streamLoader: StreamLoader, + fileEnvelope: BatchEnvelope, + ): ProcessRecordsTask { + return ProcessRecordsTask( + streamLoader, + streamsManager.getManager(streamLoader.stream), + taskLauncher, + fileEnvelope, + deserializer, + ) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/SetupTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/SetupTask.kt new file mode 100644 index 000000000000..5b6f4e6dd6e5 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/SetupTask.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.task + +import io.airbyte.cdk.write.Destination +import io.micronaut.context.annotation.Secondary +import jakarta.inject.Singleton + +/** + * Wraps @[Destination.setup] and starts the open stream tasks. + * + * TODO: This should call something like "TaskLauncher.setupComplete" and let it decide what to do + * next. + */ +class SetupTask( + private val destination: Destination, + private val taskLauncher: DestinationTaskLauncher +) : Task { + override suspend fun execute() { + destination.setup() + taskLauncher.startOpenStreamTasks() + } +} + +@Singleton +@Secondary +class SetupTaskFactory( + private val destination: Destination, +) { + fun make(taskLauncher: DestinationTaskLauncher): SetupTask { + return SetupTask(destination, taskLauncher) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/SpillToDiskTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/SpillToDiskTask.kt new file mode 100644 index 000000000000..467d0a1b311c --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/SpillToDiskTask.kt @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.task + +import com.google.common.collect.Range +import io.airbyte.cdk.command.DestinationStream +import io.airbyte.cdk.command.WriteConfiguration +import io.airbyte.cdk.message.BatchEnvelope +import io.airbyte.cdk.message.DestinationRecordWrapped +import io.airbyte.cdk.message.MessageQueueReader +import io.airbyte.cdk.message.SpooledRawMessagesLocalFile +import io.airbyte.cdk.message.StreamCompleteWrapped +import io.airbyte.cdk.message.StreamRecordWrapped +import io.airbyte.cdk.write.StreamLoader +import io.github.oshai.kotlinlogging.KotlinLogging +import jakarta.inject.Singleton +import java.nio.file.Files +import kotlin.io.path.bufferedWriter +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.runningFold +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.withContext +import kotlinx.coroutines.yield + +/** + * Reads records from the message queue and writes them to disk. This task is internal and does not + * interact with the task launcher. + * + * TODO: Use an injected interface for creating the filewriter (for testing, custom overrides). + * + * TODO: Allow for the record batch size to be supplied per-stream. (Needed?) + * + * TODO: Migrate the batch processing logic to the task launcher. Also, this batch should also be + * recorded, as it will allow the stream manager to report exactly how many records have been + * spilled. + */ +class SpillToDiskTask( + private val config: WriteConfiguration, + private val queueReader: MessageQueueReader, + private val streamLoader: StreamLoader, + private val launcher: DestinationTaskLauncher +) : Task { + private val log = KotlinLogging.logger {} + + data class ReadResult( + val range: Range? = null, + val sizeBytes: Long = 0, + val hasReadEndOfStream: Boolean = false, + ) + + // Necessary because Guava's has no "empty" range + private fun withIndex(range: Range?, index: Long): Range { + return if (range == null) { + Range.singleton(index) + } else if (index != range.upperEndpoint() + 1) { + throw IllegalStateException("Expected index ${range.upperEndpoint() + 1}, got $index") + } else { + range.span(Range.singleton(index)) + } + } + + override suspend fun execute() { + do { + val (path, result) = + withContext(Dispatchers.IO) { + /** Create a temporary file to write the records to */ + val path = Files.createTempFile(config.firstStageTmpFilePrefix, ".jsonl") + val result = + path.bufferedWriter(Charsets.UTF_8).use { + queueReader + .readChunk(streamLoader.stream, config.recordBatchSizeBytes) + .runningFold(ReadResult()) { (range, sizeBytes, _), wrapped -> + when (wrapped) { + is StreamRecordWrapped -> { + val nextRange = withIndex(range, wrapped.index) + it.write(wrapped.record.serialized) + it.write("\n") + ReadResult(nextRange, sizeBytes + wrapped.sizeBytes) + } + is StreamCompleteWrapped -> { + val nextRange = withIndex(range, wrapped.index) + return@runningFold ReadResult( + nextRange, + sizeBytes, + true + ) + } + } + } + .flowOn(Dispatchers.IO) + .toList() + } + Pair(path, result.last()) + } + + /** Handle the result */ + val (range, sizeBytes, endOfStream) = result + + log.info { "Finished writing $range records (${sizeBytes}b) to $path" } + + // This could happen if the chunk only contained end-of-stream + if (range == null) { + // We read 0 records, do nothing + return + } + + val wrapped = BatchEnvelope(SpooledRawMessagesLocalFile(path, sizeBytes), range) + launcher.startProcessRecordsTask(streamLoader, wrapped) + + yield() + } while (!endOfStream) + } +} + +@Singleton +class SpillToDiskTaskFactory( + private val config: WriteConfiguration, + private val queueReader: MessageQueueReader +) { + fun make( + taskLauncher: DestinationTaskLauncher, + streamLoader: StreamLoader, + ): SpillToDiskTask { + return SpillToDiskTask(config, queueReader, streamLoader, taskLauncher) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/TaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/TaskLauncher.kt new file mode 100644 index 000000000000..c554721536df --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/TaskLauncher.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.task + +import jakarta.inject.Singleton + +interface Task { + suspend fun execute() +} + +/** + * A TaskLauncher is responsible for starting and stopping the task workflow, and for managing + * transitions between tasks. + */ +interface TaskLauncher { + val taskRunner: TaskRunner + + suspend fun start() + suspend fun stop() { + taskRunner.enqueue(Done()) + } +} + +@Singleton +class Done : Task { + override suspend fun execute() { + throw IllegalStateException("The Done() task cannot be executed") + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/TaskRunner.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/TaskRunner.kt new file mode 100644 index 000000000000..bc7928b5d4df --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/TaskRunner.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.task + +import io.github.oshai.kotlinlogging.KotlinLogging +import jakarta.inject.Singleton +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield + +/** + * A Task is a unit of work that can be executed concurrently. Even though we aren't scheduling + * threads or enforcing concurrency limits here, launching tasks from a queue in a dedicated scope + * frees the caller not to have to await completion. + * + * TODO: Extend this to collect and report task completion. + * + * TODO: Set concurrency for this scope from the configuration. + */ +@Singleton +class TaskRunner { + private val queue = Channel(Channel.UNLIMITED) + + suspend fun enqueue(task: Task) { + queue.send(task) + } + + suspend fun run() = coroutineScope { + val log = KotlinLogging.logger {} + + while (true) { + val task = queue.receive() + + if (task is Done) { + log.info { "Task queue received Done() task, exiting" } + return@coroutineScope + } + + /** Launch the task concurrently and update counters. */ + launch { + log.info { "Executing task: $task" } + task.execute() + } + + yield() + } + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/TeardownTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/TeardownTask.kt new file mode 100644 index 000000000000..52fec0acaf3d --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/TeardownTask.kt @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.task + +import io.airbyte.cdk.state.StreamsManager +import io.airbyte.cdk.write.Destination +import io.github.oshai.kotlinlogging.KotlinLogging +import io.micronaut.context.annotation.Secondary +import jakarta.inject.Singleton +import java.util.concurrent.atomic.AtomicBoolean + +/** + * Wraps @[Destination.teardown] and stops the task launcher. + * + * TODO: Report teardown-complete and let the task launcher decide what to do next. + */ +class TeardownTask( + private val destination: Destination, + private val streamsManager: StreamsManager, + private val taskLauncher: DestinationTaskLauncher +) : Task { + val log = KotlinLogging.logger {} + + companion object { + val exactlyOnce = AtomicBoolean(false) + } + + override suspend fun execute() { + /** Guard against running this more than once */ + if (exactlyOnce.getAndSet(true)) { + return + } + + /** Ensure we don't run until all streams have completed */ + streamsManager.awaitAllStreamsComplete() + + destination.teardown() + taskLauncher.stop() + } +} + +@Singleton +@Secondary +class TeardownTaskFactory( + private val destination: Destination, + private val streamsManager: StreamsManager, +) { + fun make(taskLauncher: DestinationTaskLauncher): TeardownTask { + return TeardownTask(destination, streamsManager, taskLauncher) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/Destination.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/Destination.kt new file mode 100644 index 000000000000..db610ec8716a --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/Destination.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.write + +import io.airbyte.cdk.command.DestinationStream +import io.micronaut.context.annotation.Secondary +import jakarta.inject.Singleton + +/** + * Implementor interface. Extended this only if you need to perform initialization and teardown + * *across all streams*, or if your per-stream operations need shared global state. + * + * If initialization can be done on a per-stream basis, implement @[StreamLoaderFactory] instead. + */ +interface Destination { + // Called once before anything else + suspend fun setup() {} + + // Return a StreamLoader for the given stream + fun getStreamLoader(stream: DestinationStream): StreamLoader + + // Called once at the end of the job + suspend fun teardown(succeeded: Boolean = true) {} +} + +@Singleton +@Secondary +class DefaultDestination(private val streamLoaderFactory: StreamLoaderFactory) : Destination { + override fun getStreamLoader(stream: DestinationStream): StreamLoader { + return streamLoaderFactory.make(stream) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/InputConsumer.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/InputConsumer.kt new file mode 100644 index 000000000000..9adb2bc4ffc2 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/InputConsumer.kt @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.write + +import io.airbyte.cdk.message.Deserializer +import io.airbyte.cdk.message.DestinationMessage +import io.airbyte.cdk.message.MessageQueueWriter +import io.github.oshai.kotlinlogging.KLogger +import io.github.oshai.kotlinlogging.KotlinLogging +import io.micronaut.context.annotation.Factory +import jakarta.inject.Singleton +import java.io.InputStream +import java.nio.charset.StandardCharsets +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext + +/** Runnable input consumer. */ +interface InputConsumer { + suspend fun run() +} + +/** Input consumer that deserializes and publishes to a queue. */ +interface DeserializingInputStreamConsumer : InputConsumer { + val log: KLogger + val inputStream: InputStream + val deserializer: Deserializer + val messageQueue: MessageQueueWriter + + override suspend fun run() = + withContext(Dispatchers.IO) { + val log = KotlinLogging.logger {} + + log.info { "Starting consuming messages from the input stream" } + + var index = 0L + var bytes = 0L + inputStream.bufferedReader(StandardCharsets.UTF_8).lineSequence().forEach { line -> + val lineSize = line.length.toLong() + if (lineSize > 0L) { + val deserialized = deserializer.deserialize(line) + messageQueue.publish(deserialized, lineSize) + + bytes += lineSize + if (++index % 10_000L == 0L) { + log.info { + "Consumed $index messages (${bytes / 1024L}mb) from the input stream" + } + } + } + } + + log.info { "Finished consuming $index messages (${bytes}b) from the input stream" } + } +} + +@Singleton +class DefaultInputConsumer( + override val inputStream: InputStream, + override val deserializer: Deserializer, + override val messageQueue: MessageQueueWriter +) : DeserializingInputStreamConsumer { + override val log = KotlinLogging.logger {} +} + +/** Override to provide a custom input stream. */ +@Factory +class InputStreamFactory { + @Singleton + fun make(): InputStream { + return System.`in` + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/StreamLoader.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/StreamLoader.kt new file mode 100644 index 000000000000..d038eeea985e --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/StreamLoader.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.write + +import io.airbyte.cdk.command.DestinationStream +import io.airbyte.cdk.message.Batch +import io.airbyte.cdk.message.DestinationRecord +import io.airbyte.cdk.message.SimpleBatch +import io.github.oshai.kotlinlogging.KotlinLogging +import io.micronaut.context.annotation.Secondary +import jakarta.inject.Singleton + +/** + * Implementor interface. The framework calls open and close once per stream at the beginning and + * end of processing. The framework calls processRecords once per batch of records as batches of the + * configured size become available. (Specified in @ + * [io.airbyte.cdk.command.WriteConfiguration.recordBatchSizeBytes] + * + * processBatch is called once per incomplete batch returned by either processRecords or + * processBatch itself. See @[io.airbyte.cdk.message.Batch] for more details. + */ +interface StreamLoader { + val stream: DestinationStream + + suspend fun open() {} + suspend fun processRecords(records: Iterator, totalSizeBytes: Long): Batch + suspend fun processBatch(batch: Batch): Batch = SimpleBatch(state = Batch.State.COMPLETE) + suspend fun close() {} +} + +/** + * Default stream loader (Not yet implemented) will process the records into a locally staged file + * of a format specified in the configuration. + */ +class DefaultStreamLoader( + override val stream: DestinationStream, +) : StreamLoader { + val log = KotlinLogging.logger {} + + override suspend fun processRecords( + records: Iterator, + totalSizeBytes: Long + ): Batch { + TODO( + "Default implementation adds airbyte metadata, maybe flattens, no-op maps, and converts to destination format" + ) + } +} + +/** + * If you do not need to perform initialization and teardown across all streams, or if your + * per-stream operations do not need shared global state, implement this interface instead of @ + * [Destination]. The framework will call it exactly once per stream to create instances that will + * be used for the life cycle of the stream. + */ +interface StreamLoaderFactory { + fun make(stream: DestinationStream): StreamLoader +} + +@Singleton +@Secondary +class DefaultStreamLoaderFactory() : StreamLoaderFactory { + override fun make(stream: DestinationStream): StreamLoader { + TODO("See above") + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/WriteOperation.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/WriteOperation.kt new file mode 100644 index 000000000000..69fa3dfacc43 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/WriteOperation.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.write + +import io.airbyte.cdk.Operation +import io.airbyte.cdk.message.DestinationMessage +import io.airbyte.cdk.task.TaskLauncher +import io.airbyte.cdk.task.TaskRunner +import io.micronaut.context.annotation.Requires +import javax.inject.Singleton +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking + +/** + * Write operation. Executed by the core framework when the operation is "write". Launches the core + * services and awaits completion. + */ +@Singleton +@Requires(property = Operation.PROPERTY, value = "write") +class WriteOperation( + private val inputConsumer: InputConsumer, + private val taskLauncher: TaskLauncher, + private val taskRunner: TaskRunner +) : Operation { + override fun execute() { + runBlocking { + launch { inputConsumer.run() } + + launch { taskLauncher.start() } + + launch { taskRunner.run() } + } + } +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/write/InputConsumerTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/write/InputConsumerTest.kt new file mode 100644 index 000000000000..2fa3113a1d61 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/write/InputConsumerTest.kt @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.write + +import io.airbyte.cdk.message.Deserializer +import io.airbyte.cdk.message.MessageQueueWriter +import io.github.oshai.kotlinlogging.KotlinLogging +import io.micronaut.context.annotation.Prototype +import io.micronaut.test.extensions.junit5.annotation.MicronautTest +import jakarta.inject.Inject +import jakarta.inject.Singleton +import java.util.stream.Stream +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.extension.ExtensionContext +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.ArgumentsProvider +import org.junit.jupiter.params.provider.ArgumentsSource + +@MicronautTest +class InputConsumerTest { + @Inject lateinit var consumerFactory: MockInputConsumerFactory + + @Singleton + class MockDeserializer : Deserializer { + override fun deserialize(serialized: String): String { + return serialized.reversed() + "!" + } + } + + @Prototype + class MockMessageQueueWriter : MessageQueueWriter { + val collectedStrings = mutableListOf() + val collectedSizes = mutableListOf() + + override suspend fun publish(message: String, sizeBytes: Long) { + collectedStrings.add(message) + collectedSizes.add(sizeBytes) + } + } + + @Prototype + class MockInputConsumerFactory( + val testDeserializer: Deserializer, + val testMessageQueue: MessageQueueWriter + ) { + fun make(testInput: List): InputConsumer { + return object : DeserializingInputStreamConsumer { + override val log = KotlinLogging.logger {} + override val inputStream = testInput.joinToString("\n").byteInputStream() + override val deserializer = testDeserializer + override val messageQueue = testMessageQueue + } + } + + fun getOutputCollector(): MockMessageQueueWriter { + return testMessageQueue as MockMessageQueueWriter + } + } + + class InputConsumerTestArgumentsProvider : ArgumentsProvider { + override fun provideArguments(context: ExtensionContext): Stream { + return Stream.of( + Arguments.of(listOf("cat", "dog", "turtle")), + Arguments.of(listOf("", "109j321dcDASD", "2023", "1", "2", "3")) + ) + } + } + + @ParameterizedTest + @ArgumentsSource(InputConsumerTestArgumentsProvider::class) + fun testInputConsumer(testInput: List) = runTest { + val consumer = consumerFactory.make(testInput) + consumer.run() + Assertions.assertEquals( + testInput.filter { it != "" }.map { it.reversed() + "!" }, + consumerFactory.getOutputCollector().collectedStrings + ) + Assertions.assertEquals( + testInput.filter { it != "" }.map { it.length.toLong() }, + consumerFactory.getOutputCollector().collectedSizes + ) + } +}