Skip to content

Commit

Permalink
Connector operations by responsibility
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa committed May 16, 2024
1 parent ffc613e commit 2f3380f
Show file tree
Hide file tree
Showing 17 changed files with 1,634 additions and 292 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination.operation

import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.protocol.models.v0.StreamDescriptor
import java.util.stream.Stream

/** Connector sync operations */
interface SyncOperation {

/** DestinationFlush function sends per stream with descriptor. */
fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>)
fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination.staging.operation

import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer
import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer
import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator
import io.airbyte.commons.json.Jsons
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation
import io.airbyte.integrations.base.destination.operation.StorageOperations
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.stream.Stream
import org.apache.commons.io.FileUtils

class StagingStreamOperations<DestinationState : MinimumDestinationState>(
private val storageOperations: StorageOperations<SerializableBuffer>,
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
disableTypeDedupe: Boolean = false
) :
AbstractStreamOperation<DestinationState, SerializableBuffer>(
storageOperations,
destinationInitialStatus,
disableTypeDedupe
) {

private val log = KotlinLogging.logger {}
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
val writeBuffer =
CsvSerializedBuffer(
FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX),
StagingDatabaseCsvSheetGenerator(
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META
),
true
)

writeBuffer.use {
stream.forEach { record: PartialAirbyteMessage ->
it.accept(
record.serialized!!,
Jsons.serialize(record.record!!.meta),
record.record!!.emittedAt
)
}
it.flush()
log.info {
"Buffer flush complete for stream ${streamConfig.id.originalName} (${FileUtils.byteCountToDisplaySize(it.byteCount)}) to staging"
}
storageOperations.writeToStage(streamConfig.id, writeBuffer)
}
}
}
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/typing-deduping/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ dependencies {
testFixturesApi testFixtures(project(':airbyte-cdk:java:airbyte-cdk:airbyte-cdk-core'))
testFixturesImplementation 'org.mockito.kotlin:mockito-kotlin:5.2.1'
testImplementation 'org.mockito.kotlin:mockito-kotlin:5.2.1'
testImplementation "io.mockk:mockk:1.13.10"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.operation

import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.Optional
import java.util.stream.Stream

abstract class AbstractStreamOperation<DestinationState : MinimumDestinationState, Data>(
private val storageOperations: StorageOperations<Data>,
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
private val disableTypeDedupe: Boolean = false
) : StreamOperation<DestinationState> {
private val log = KotlinLogging.logger {}

// State maintained to make decision between async calls
private val finalTmpTableSuffix: String
private val initialRawTableStatus: InitialRawTableStatus =
destinationInitialStatus.initialRawTableStatus

/**
* After running any sync setup code, we may update the destination state. This field holds that
* updated destination state.
*/
final override val updatedDestinationState: DestinationState

init {
val stream = destinationInitialStatus.streamConfig
storageOperations.prepareStage(stream.id, stream.destinationSyncMode)
if (!disableTypeDedupe) {
storageOperations.createFinalSchema(stream.id)
// Prepare final tables based on sync mode.
finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus)
} else {
log.info { "Typing and deduping disabled, skipping final table initialization" }
finalTmpTableSuffix = NO_SUFFIX
}
updatedDestinationState = destinationInitialStatus.destinationState.withSoftReset(false)
}

companion object {
private const val NO_SUFFIX = ""
private const val TMP_OVERWRITE_TABLE_SUFFIX = "_airbyte_tmp"
}

private fun prepareFinalTable(
initialStatus: DestinationInitialStatus<DestinationState>
): String {
val stream = initialStatus.streamConfig
// No special handling if final table doesn't exist, just create and return
if (!initialStatus.isFinalTablePresent) {
log.info {
"Final table does not exist for stream ${initialStatus.streamConfig.id.finalName}, creating."
}
storageOperations.createFinalTable(stream, NO_SUFFIX, false)
return NO_SUFFIX
}

log.info { "Final Table exists for stream ${stream.id.finalName}" }
// The table already exists. Decide whether we're writing to it directly, or
// using a tmp table.
when (stream.destinationSyncMode) {
DestinationSyncMode.OVERWRITE -> return prepareFinalTableForOverwrite(initialStatus)
DestinationSyncMode.APPEND,
DestinationSyncMode.APPEND_DEDUP -> {
if (
initialStatus.isSchemaMismatch ||
initialStatus.destinationState.needsSoftReset()
) {
// We're loading data directly into the existing table.
// Make sure it has the right schema.
// Also, if a raw table migration wants us to do a soft reset, do that
// here.
log.info { "Executing soft-reset on final table of stream $stream" }
storageOperations.softResetFinalTable(stream)
}
return NO_SUFFIX
}
}
}

private fun prepareFinalTableForOverwrite(
initialStatus: DestinationInitialStatus<DestinationState>
): String {
val stream = initialStatus.streamConfig
if (!initialStatus.isFinalTableEmpty || initialStatus.isSchemaMismatch) {
// overwrite an existing tmp table if needed.
storageOperations.createFinalTable(stream, TMP_OVERWRITE_TABLE_SUFFIX, true)
log.info {
"Using temp final table for table ${stream.id.finalName}, this will be overwritten at end of sync"
}
// We want to overwrite an existing table. Write into a tmp table.
// We'll overwrite the table at the
// end of the sync.
return TMP_OVERWRITE_TABLE_SUFFIX
}

log.info {
"Final Table for stream ${stream.id.finalName} is empty and matches the expected v2 format, writing to table directly"
}
return NO_SUFFIX
}

/** Write records will be destination type specific, Insert vs staging based on format */
abstract override fun writeRecords(
streamConfig: StreamConfig,
stream: Stream<PartialAirbyteMessage>
)

override fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary) {
// Delete staging directory, implementation will handle if it has to do it or not or a No-OP
storageOperations.cleanupStage(streamConfig.id)

// Legacy logic that if recordsWritten or not tracked then it could be non-zero
val isOverwriteSync = streamConfig.destinationSyncMode != DestinationSyncMode.OVERWRITE
// Legacy logic that if recordsWritten or not tracked then it could be non-zero.
// But for OVERWRITE syncs, we don't need to look at old records.
val shouldRunTypingDeduping =
syncSummary.recordsWritten.map { it > 0 }.orElse(true) ||
(initialRawTableStatus.hasUnprocessedRecords && isOverwriteSync)
if (disableTypeDedupe) {
log.info {
"Typing and deduping disabled, skipping final table finalization. " +
"Raw records can be found at ${streamConfig.id.rawNamespace}.${streamConfig.id.rawName}"
}
} else if (!shouldRunTypingDeduping) {
log.info {
"Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} " +
"because it had no records during this sync and no unprocessed records from a previous sync."
}
} else {
// In overwrite mode, we want to read all the raw records. Typically, this is equivalent
// to filtering on timestamp, but might as well be explicit.
val timestampFilter =
if (isOverwriteSync) {
initialRawTableStatus.maxProcessedTimestamp
} else {
Optional.empty()
}
storageOperations.typeAndDedupe(streamConfig, timestampFilter, finalTmpTableSuffix)
}

// For overwrite, it's wasteful to do T+D, so we don't do soft-reset in prepare. Instead, we
// do
// type-dedupe
// on a suffixed table and do a swap here when we have to for schema mismatches
if (
streamConfig.destinationSyncMode == DestinationSyncMode.OVERWRITE &&
finalTmpTableSuffix.isNotBlank()
) {
storageOperations.overwriteFinalTable(streamConfig, finalTmpTableSuffix)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.operation

import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.cdk.integrations.destination.operation.SyncOperation
import io.airbyte.protocol.models.v0.StreamDescriptor
import java.util.stream.Stream

class DefaultFlush(
override val optimalBatchSizeBytes: Long,
private val syncOperation: SyncOperation
) : DestinationFlushFunction {
override fun flush(streamDescriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
syncOperation.flushStream(streamDescriptor, stream)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.operation

import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.cdk.integrations.destination.operation.SyncOperation
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil as exceptions
import io.airbyte.commons.concurrency.CompletableFutures.allOf
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil as tdutils
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
import io.airbyte.protocol.models.v0.StreamDescriptor
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.stream.Stream
import org.apache.commons.lang3.concurrent.BasicThreadFactory

class DefaultSyncOperation<DestinationState : MinimumDestinationState>(
private val parsedCatalog: ParsedCatalog,
private val destinationHandler: DestinationHandler<DestinationState>,
private val defaultNamespace: String,
private val streamOperationsFactory: StreamOperationsFactory<DestinationState>,
private val migrations: List<Migration<DestinationState>>,
private val executorService: ExecutorService =
Executors.newFixedThreadPool(
10,
BasicThreadFactory.Builder().namingPattern("sync-operations-%d").build(),
)
) : SyncOperation {
companion object {
// Use companion to be accessible during instantiation with init
private val log = KotlinLogging.logger {}
}

private val streamOpsMap: Map<StreamId, StreamOperation<DestinationState>>
init {
streamOpsMap = createPerStreamOpClients()
}

private fun createPerStreamOpClients(): Map<StreamId, StreamOperation<DestinationState>> {
log.info { "Preparing required schemas and tables for all streams" }
val streamsInitialStates = destinationHandler.gatherInitialState(parsedCatalog.streams)

val postMigrationInitialStates =
tdutils.executeRawTableMigrations(
executorService,
destinationHandler,
migrations,
streamsInitialStates
)
destinationHandler.commitDestinationStates(
postMigrationInitialStates.associate { it.streamConfig.id to it.destinationState }
)

val initializationFutures =
postMigrationInitialStates
.map {
CompletableFuture.supplyAsync(
{ Pair(it.streamConfig.id, streamOperationsFactory.createInstance(it)) },
executorService,
)
}
.toList()
val futuresResult = allOf(initializationFutures).toCompletableFuture().get()
val result =
exceptions.getResultsOrLogAndThrowFirst(
"Following exceptions occurred during sync initialization",
futuresResult,
)
destinationHandler.commitDestinationStates(
futuresResult
// If we're here, then all the futures were successful, so we're in the Right case
// of every Either
.map { it.right!! }
.associate { (id, streamOps) -> id to streamOps.updatedDestinationState }
)
return result.toMap()
}

override fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
val streamConfig =
parsedCatalog.getStream(descriptor.namespace ?: defaultNamespace, descriptor.name)
streamOpsMap[streamConfig.id]?.writeRecords(streamConfig, stream)
}

override fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>) {
try {
// Only call finalizeTable operations which has summary. rest will be skipped
val finalizeFutures =
streamSyncSummaries.entries
.map {
CompletableFuture.supplyAsync(
{
val streamConfig =
parsedCatalog.getStream(
it.key.namespace ?: defaultNamespace,
it.key.name,
)
streamOpsMap[streamConfig.id]?.finalizeTable(streamConfig, it.value)
},
executorService,
)
}
.toList()
val futuresResult = allOf(finalizeFutures).toCompletableFuture().join()
exceptions.getResultsOrLogAndThrowFirst(
"Following exceptions occurred while finalizing the sync",
futuresResult,
)
} finally {
log.info { "Cleaning up sync operation thread pools" }
executorService.shutdown()
}
}
}
Loading

0 comments on commit 2f3380f

Please sign in to comment.