Skip to content

Commit

Permalink
(Incomplete) First Cut Load CDK with E2E Destination (#44822)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Aug 30, 2024
1 parent bf2295d commit 89f2db4
Show file tree
Hide file tree
Showing 32 changed files with 2,154 additions and 0 deletions.
12 changes: 12 additions & 0 deletions airbyte-cdk/bulk/core/load/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
dependencies {
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
implementation 'org.apache.commons:commons-lang3:3.14.0'

// For ranges and rangesets
implementation("com.google.guava:guava:33.3.0-jre")

testFixturesApi testFixtures(project(':airbyte-cdk:bulk:core:bulk-cdk-core-base'))

testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.7.1")
implementation "org.jetbrains.kotlin:kotlin-reflect:2.0.0"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.command

import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton

/**
* Internal representation of destination streams. This is intended to be a case class specialized
* for usability.
*/
data class DestinationCatalog(
val streams: List<DestinationStream> = emptyList(),
) {
private val byDescriptor: Map<DestinationStream.Descriptor, DestinationStream> =
streams.associateBy { it.descriptor }

fun getStream(name: String, namespace: String): DestinationStream {
val descriptor = DestinationStream.Descriptor(namespace = namespace, name = name)
return byDescriptor[descriptor]
?: throw IllegalArgumentException("Stream not found: namespace=$namespace, name=$name")
}
}

@Factory
class DestinationCatalogFactory(
private val catalog: ConfiguredAirbyteCatalog,
private val streamFactory: DestinationStreamFactory
) {
@Singleton
fun make(): DestinationCatalog {
return DestinationCatalog(streams = catalog.streams.map { streamFactory.make(it) })
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.command

import io.micronaut.context.annotation.ConfigurationProperties
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton

@ConfigurationProperties("destination.config")
interface DestinationConfiguration : Configuration {
/**
* Micronaut factory which glues [ConfigurationJsonObjectSupplier] and
* [DestinationConfigurationFactory] together to produce a [DestinationConfiguration] singleton.
*/
@Factory
private class MicronautFactory {
@Singleton
fun <I : ConfigurationJsonObjectBase> sourceConfig(
pojoSupplier: ConfigurationJsonObjectSupplier<I>,
factory: DestinationConfigurationFactory<I, out DestinationConfiguration>,
): DestinationConfiguration = factory.make(pojoSupplier.get())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.command

import io.airbyte.cdk.ConfigErrorException

interface DestinationConfigurationFactory<
I : ConfigurationJsonObjectBase, O : DestinationConfiguration> {
fun makeWithoutExceptionHandling(pojo: I): O

/** Wraps [makeWithoutExceptionHandling] exceptions in [ConfigErrorException]. */
fun make(pojo: I): O =
try {
makeWithoutExceptionHandling(pojo)
} catch (e: Exception) {
// Wrap NPEs (mostly) in ConfigErrorException.
throw ConfigErrorException("Failed to build ConnectorConfiguration.", e)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.command

import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import jakarta.inject.Singleton

/**
* Internal representation of destination streams. This is intended to be a case class specialized
* for usability.
*
* TODO: Add missing info like sync type, generation_id, etc.
*
* TODO: Add dedicated schema type, converted from json-schema.
*/
class DestinationStream(val descriptor: Descriptor) {
data class Descriptor(val namespace: String, val name: String)

override fun hashCode(): Int {
return descriptor.hashCode()
}

override fun equals(other: Any?): Boolean {
return other is DestinationStream && descriptor == other.descriptor
}

override fun toString(): String {
return "DestinationStream(descriptor=$descriptor)"
}
}

@Singleton
class DestinationStreamFactory {
fun make(stream: ConfiguredAirbyteStream): DestinationStream {
return DestinationStream(
descriptor =
DestinationStream.Descriptor(
namespace = stream.stream.namespace,
name = stream.stream.name
)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.command

import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton

/**
* General configuration for the write operation. The implementor can override this to tweak runtime
* behavior.
*/
interface WriteConfiguration {
/** Batch accumulation settings. */
val recordBatchSizeBytes: Long
val firstStageTmpFilePrefix: String

/** Memory queue settings */
val maxMessageQueueMemoryUsageRatio: Double // as fraction of available memory
val estimatedRecordMemoryOverheadRatio: Double // 0 => No overhead, 1.0 => 2x overhead
}

@Singleton
@Secondary
open class DefaultWriteConfiguration : WriteConfiguration {
override val recordBatchSizeBytes: Long = 200L * 1024L * 1024L
override val firstStageTmpFilePrefix = "airbyte-cdk-load-staged-raw-records"

override val maxMessageQueueMemoryUsageRatio: Double = 0.2
override val estimatedRecordMemoryOverheadRatio: Double = 0.1
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.message

import io.airbyte.protocol.models.v0.AirbyteGlobalState
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStateStats
import io.airbyte.protocol.models.v0.AirbyteStreamState
import io.airbyte.protocol.models.v0.StreamDescriptor
import jakarta.inject.Singleton

/**
* Converts the internal @[DestinationStateMessage] case class to the Protocol state messages
* required by @[io.airbyte.cdk.output.OutputConsumer]
*/
interface AirbyteStateMessageFactory {
fun fromDestinationStateMessage(message: DestinationStateMessage): AirbyteStateMessage
}

@Singleton
class DefaultAirbyteStateMessageFactory : AirbyteStateMessageFactory {
override fun fromDestinationStateMessage(
message: DestinationStateMessage
): AirbyteStateMessage {
return when (message) {
is DestinationStreamState ->
AirbyteStateMessage()
.withSourceStats(
AirbyteStateStats()
.withRecordCount(message.sourceStats.recordCount.toDouble())
)
.withDestinationStats(
message.destinationStats?.let {
AirbyteStateStats().withRecordCount(it.recordCount.toDouble())
}
?: throw IllegalStateException(
"Destination stats must be provided for DestinationStreamState"
)
)
.withType(AirbyteStateMessage.AirbyteStateType.STREAM)
.withStream(fromStreamState(message.streamState))
is DestinationGlobalState ->
AirbyteStateMessage()
.withSourceStats(
AirbyteStateStats()
.withRecordCount(message.sourceStats.recordCount.toDouble())
)
.withDestinationStats(
message.destinationStats?.let {
AirbyteStateStats().withRecordCount(it.recordCount.toDouble())
}
)
.withType(AirbyteStateMessage.AirbyteStateType.GLOBAL)
.withGlobal(
AirbyteGlobalState()
.withSharedState(message.state)
.withStreamStates(message.streamStates.map { fromStreamState(it) })
)
}
}

private fun fromStreamState(
streamState: DestinationStateMessage.StreamState
): AirbyteStreamState {
return AirbyteStreamState()
.withStreamDescriptor(
StreamDescriptor()
.withNamespace(streamState.stream.descriptor.namespace)
.withName(streamState.stream.descriptor.name)
)
.withStreamState(streamState.state)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.message

import com.google.common.collect.Range
import com.google.common.collect.RangeSet
import com.google.common.collect.TreeRangeSet
import java.nio.file.Path

/**
* Represents an accumulated batch of records in some stage of processing.
*
* Emitted by @[io.airbyte.cdk.write.StreamLoader.processRecords] to describe the batch accumulated.
* Non-[State.COMPLETE] batches are routed to @[io.airbyte.cdk.write.StreamLoader.processBatch]
* re-entrantly until completion.
*
* The framework will track the association between the Batch and the range of records it
* represents, by [Batch.State]s. The [State.PERSISTED] state has special meaning: it indicates that
* the associated ranges have been persisted remotely, and that platform checkpoint messages can be
* emitted.
*
* [State.SPOOLED] is used internally to indicate that records have been spooled to disk for
* processing and should not be used by implementors.
*
* When a stream has been read to End-of-stream, and all ranges between 0 and End-of-stream are
* [State.COMPLETE], then all records are considered to have been processed.
*
* The intended usage for implementors is to implement the provided interfaces in case classes that
* contain the necessary metadata for processing, using them in @
* [io.airbyte.cdk.write.StreamLoader.processBatch] to route to the appropriate handler(s).
*
* For example:
*
* ```kotlin
* sealed class MyBatch: Batch
* data class MyLocalFile(
* override val path: Path,
* override val totalSizeBytes: Long
* ): StagedLocalFile
* data class MyRemoteObject(
* override val key: String
* ): RemoteObject
* // etc...
* ```
*/
interface Batch {
enum class State {
SPOOLED,
LOCAL,
PERSISTED,
COMPLETE
}

val state: State
}

/** Simple batch: use if you need no other metadata for processing. */
data class SimpleBatch(override val state: Batch.State) : Batch

/** Represents a file of records locally staged. */
abstract class StagedLocalFile() : Batch {
override val state: Batch.State = Batch.State.LOCAL
abstract val localPath: Path
abstract val totalSizeBytes: Long
}

/** Represents a remote object containing persisted records. */
abstract class RemoteObject() : Batch {
override val state: Batch.State = Batch.State.PERSISTED
abstract val key: String
}

/**
* Represents a file of raw records staged to disk for pre-processing. Used internally by the
* framework
*/
data class SpooledRawMessagesLocalFile(
override val localPath: Path,
override val totalSizeBytes: Long,
override val state: Batch.State = Batch.State.SPOOLED
) : StagedLocalFile()

/**
* Internally-used wrapper for tracking the association between a batch and the range of records it
* contains.
*/
data class BatchEnvelope<B : Batch>(
val batch: B,
val ranges: RangeSet<Long> = TreeRangeSet.create()
) {
constructor(
batch: B,
range: Range<Long>
) : this(batch = batch, ranges = TreeRangeSet.create(listOf(range)))

fun <C : Batch> withBatch(newBatch: C): BatchEnvelope<C> {
return BatchEnvelope(newBatch, ranges)
}
}
Loading

0 comments on commit 89f2db4

Please sign in to comment.