From 2f3380f1744d603f965fa901d4cadfa133ba3807 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Thu, 9 May 2024 16:42:21 -0700 Subject: [PATCH] Connector operations by responsibility --- .../destination/operation/SyncOperation.kt | 18 + .../operation/StagingStreamOperations.kt | 60 ++ .../airbyte-cdk/typing-deduping/build.gradle | 1 + .../operation/AbstractStreamOperation.kt | 164 ++++++ .../destination/operation/DefaultFlush.kt | 20 + .../operation/DefaultSyncOperation.kt | 123 ++++ .../operation/StorageOperations.kt | 57 ++ .../destination/operation/StreamOperation.kt | 20 + .../operation/StreamOperationsFactory.kt | 18 + .../typing_deduping/DefaultTyperDeduper.kt | 10 +- .../NoOpTyperDeduperWithV1V2Migrations.kt | 6 +- .../typing_deduping/SqlGenerator.kt | 2 +- .../TypeAndDedupeTransaction.kt | 103 ---- .../typing_deduping/TyperDeduperUtil.kt | 427 ++++++++------ .../operation/AbstractStreamOperationTest.kt | 549 ++++++++++++++++++ .../operation/DefaultSyncOperationTest.kt | 334 +++++++++++ .../BaseSqlGeneratorIntegrationTest.kt | 14 +- 17 files changed, 1634 insertions(+), 292 deletions(-) create mode 100644 airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/operation/SyncOperation.kt create mode 100644 airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt create mode 100644 airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt create mode 100644 airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultFlush.kt create mode 100644 airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt create mode 100644 airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperations.kt create mode 100644 airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperation.kt create mode 100644 airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperationsFactory.kt delete mode 100644 airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.kt create mode 100644 airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt create mode 100644 airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/operation/SyncOperation.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/operation/SyncOperation.kt new file mode 100644 index 000000000000..bbe695a3cb80 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/operation/SyncOperation.kt @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.destination.operation + +import io.airbyte.cdk.integrations.destination.StreamSyncSummary +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.protocol.models.v0.StreamDescriptor +import java.util.stream.Stream + +/** Connector sync operations */ +interface SyncOperation { + + /** DestinationFlush function sends per stream with descriptor. */ + fun flushStream(descriptor: StreamDescriptor, stream: Stream) + fun finalizeStreams(streamSyncSummaries: Map) +} diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt new file mode 100644 index 000000000000..1b0a5018e29d --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.destination.staging.operation + +import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer +import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer +import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer +import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator +import io.airbyte.commons.json.Jsons +import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation +import io.airbyte.integrations.base.destination.operation.StorageOperations +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.github.oshai.kotlinlogging.KotlinLogging +import java.util.stream.Stream +import org.apache.commons.io.FileUtils + +class StagingStreamOperations( + private val storageOperations: StorageOperations, + destinationInitialStatus: DestinationInitialStatus, + disableTypeDedupe: Boolean = false +) : + AbstractStreamOperation( + storageOperations, + destinationInitialStatus, + disableTypeDedupe + ) { + + private val log = KotlinLogging.logger {} + override fun writeRecords(streamConfig: StreamConfig, stream: Stream) { + val writeBuffer = + CsvSerializedBuffer( + FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX), + StagingDatabaseCsvSheetGenerator( + JavaBaseConstants.DestinationColumns.V2_WITHOUT_META + ), + true + ) + + writeBuffer.use { + stream.forEach { record: PartialAirbyteMessage -> + it.accept( + record.serialized!!, + Jsons.serialize(record.record!!.meta), + record.record!!.emittedAt + ) + } + it.flush() + log.info { + "Buffer flush complete for stream ${streamConfig.id.originalName} (${FileUtils.byteCountToDisplaySize(it.byteCount)}) to staging" + } + storageOperations.writeToStage(streamConfig.id, writeBuffer) + } + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/build.gradle b/airbyte-cdk/java/airbyte-cdk/typing-deduping/build.gradle index bb8f72624672..f77661724321 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/build.gradle +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/build.gradle @@ -22,4 +22,5 @@ dependencies { testFixturesApi testFixtures(project(':airbyte-cdk:java:airbyte-cdk:airbyte-cdk-core')) testFixturesImplementation 'org.mockito.kotlin:mockito-kotlin:5.2.1' testImplementation 'org.mockito.kotlin:mockito-kotlin:5.2.1' + testImplementation "io.mockk:mockk:1.13.10" } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt new file mode 100644 index 000000000000..60808f9abc06 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.StreamSyncSummary +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus +import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.airbyte.protocol.models.v0.DestinationSyncMode +import io.github.oshai.kotlinlogging.KotlinLogging +import java.util.Optional +import java.util.stream.Stream + +abstract class AbstractStreamOperation( + private val storageOperations: StorageOperations, + destinationInitialStatus: DestinationInitialStatus, + private val disableTypeDedupe: Boolean = false +) : StreamOperation { + private val log = KotlinLogging.logger {} + + // State maintained to make decision between async calls + private val finalTmpTableSuffix: String + private val initialRawTableStatus: InitialRawTableStatus = + destinationInitialStatus.initialRawTableStatus + + /** + * After running any sync setup code, we may update the destination state. This field holds that + * updated destination state. + */ + final override val updatedDestinationState: DestinationState + + init { + val stream = destinationInitialStatus.streamConfig + storageOperations.prepareStage(stream.id, stream.destinationSyncMode) + if (!disableTypeDedupe) { + storageOperations.createFinalSchema(stream.id) + // Prepare final tables based on sync mode. + finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus) + } else { + log.info { "Typing and deduping disabled, skipping final table initialization" } + finalTmpTableSuffix = NO_SUFFIX + } + updatedDestinationState = destinationInitialStatus.destinationState.withSoftReset(false) + } + + companion object { + private const val NO_SUFFIX = "" + private const val TMP_OVERWRITE_TABLE_SUFFIX = "_airbyte_tmp" + } + + private fun prepareFinalTable( + initialStatus: DestinationInitialStatus + ): String { + val stream = initialStatus.streamConfig + // No special handling if final table doesn't exist, just create and return + if (!initialStatus.isFinalTablePresent) { + log.info { + "Final table does not exist for stream ${initialStatus.streamConfig.id.finalName}, creating." + } + storageOperations.createFinalTable(stream, NO_SUFFIX, false) + return NO_SUFFIX + } + + log.info { "Final Table exists for stream ${stream.id.finalName}" } + // The table already exists. Decide whether we're writing to it directly, or + // using a tmp table. + when (stream.destinationSyncMode) { + DestinationSyncMode.OVERWRITE -> return prepareFinalTableForOverwrite(initialStatus) + DestinationSyncMode.APPEND, + DestinationSyncMode.APPEND_DEDUP -> { + if ( + initialStatus.isSchemaMismatch || + initialStatus.destinationState.needsSoftReset() + ) { + // We're loading data directly into the existing table. + // Make sure it has the right schema. + // Also, if a raw table migration wants us to do a soft reset, do that + // here. + log.info { "Executing soft-reset on final table of stream $stream" } + storageOperations.softResetFinalTable(stream) + } + return NO_SUFFIX + } + } + } + + private fun prepareFinalTableForOverwrite( + initialStatus: DestinationInitialStatus + ): String { + val stream = initialStatus.streamConfig + if (!initialStatus.isFinalTableEmpty || initialStatus.isSchemaMismatch) { + // overwrite an existing tmp table if needed. + storageOperations.createFinalTable(stream, TMP_OVERWRITE_TABLE_SUFFIX, true) + log.info { + "Using temp final table for table ${stream.id.finalName}, this will be overwritten at end of sync" + } + // We want to overwrite an existing table. Write into a tmp table. + // We'll overwrite the table at the + // end of the sync. + return TMP_OVERWRITE_TABLE_SUFFIX + } + + log.info { + "Final Table for stream ${stream.id.finalName} is empty and matches the expected v2 format, writing to table directly" + } + return NO_SUFFIX + } + + /** Write records will be destination type specific, Insert vs staging based on format */ + abstract override fun writeRecords( + streamConfig: StreamConfig, + stream: Stream + ) + + override fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary) { + // Delete staging directory, implementation will handle if it has to do it or not or a No-OP + storageOperations.cleanupStage(streamConfig.id) + + // Legacy logic that if recordsWritten or not tracked then it could be non-zero + val isOverwriteSync = streamConfig.destinationSyncMode != DestinationSyncMode.OVERWRITE + // Legacy logic that if recordsWritten or not tracked then it could be non-zero. + // But for OVERWRITE syncs, we don't need to look at old records. + val shouldRunTypingDeduping = + syncSummary.recordsWritten.map { it > 0 }.orElse(true) || + (initialRawTableStatus.hasUnprocessedRecords && isOverwriteSync) + if (disableTypeDedupe) { + log.info { + "Typing and deduping disabled, skipping final table finalization. " + + "Raw records can be found at ${streamConfig.id.rawNamespace}.${streamConfig.id.rawName}" + } + } else if (!shouldRunTypingDeduping) { + log.info { + "Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} " + + "because it had no records during this sync and no unprocessed records from a previous sync." + } + } else { + // In overwrite mode, we want to read all the raw records. Typically, this is equivalent + // to filtering on timestamp, but might as well be explicit. + val timestampFilter = + if (isOverwriteSync) { + initialRawTableStatus.maxProcessedTimestamp + } else { + Optional.empty() + } + storageOperations.typeAndDedupe(streamConfig, timestampFilter, finalTmpTableSuffix) + } + + // For overwrite, it's wasteful to do T+D, so we don't do soft-reset in prepare. Instead, we + // do + // type-dedupe + // on a suffixed table and do a swap here when we have to for schema mismatches + if ( + streamConfig.destinationSyncMode == DestinationSyncMode.OVERWRITE && + finalTmpTableSuffix.isNotBlank() + ) { + storageOperations.overwriteFinalTable(streamConfig, finalTmpTableSuffix) + } + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultFlush.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultFlush.kt new file mode 100644 index 000000000000..b4516a0a331f --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultFlush.kt @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.destination.operation.SyncOperation +import io.airbyte.protocol.models.v0.StreamDescriptor +import java.util.stream.Stream + +class DefaultFlush( + override val optimalBatchSizeBytes: Long, + private val syncOperation: SyncOperation +) : DestinationFlushFunction { + override fun flush(streamDescriptor: StreamDescriptor, stream: Stream) { + syncOperation.flushStream(streamDescriptor, stream) + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt new file mode 100644 index 000000000000..0e20ebcfaee2 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.StreamSyncSummary +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.destination.operation.SyncOperation +import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil as exceptions +import io.airbyte.commons.concurrency.CompletableFutures.allOf +import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler +import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog +import io.airbyte.integrations.base.destination.typing_deduping.StreamId +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil as tdutils +import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.airbyte.protocol.models.v0.StreamDescriptor +import io.github.oshai.kotlinlogging.KotlinLogging +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.stream.Stream +import org.apache.commons.lang3.concurrent.BasicThreadFactory + +class DefaultSyncOperation( + private val parsedCatalog: ParsedCatalog, + private val destinationHandler: DestinationHandler, + private val defaultNamespace: String, + private val streamOperationsFactory: StreamOperationsFactory, + private val migrations: List>, + private val executorService: ExecutorService = + Executors.newFixedThreadPool( + 10, + BasicThreadFactory.Builder().namingPattern("sync-operations-%d").build(), + ) +) : SyncOperation { + companion object { + // Use companion to be accessible during instantiation with init + private val log = KotlinLogging.logger {} + } + + private val streamOpsMap: Map> + init { + streamOpsMap = createPerStreamOpClients() + } + + private fun createPerStreamOpClients(): Map> { + log.info { "Preparing required schemas and tables for all streams" } + val streamsInitialStates = destinationHandler.gatherInitialState(parsedCatalog.streams) + + val postMigrationInitialStates = + tdutils.executeRawTableMigrations( + executorService, + destinationHandler, + migrations, + streamsInitialStates + ) + destinationHandler.commitDestinationStates( + postMigrationInitialStates.associate { it.streamConfig.id to it.destinationState } + ) + + val initializationFutures = + postMigrationInitialStates + .map { + CompletableFuture.supplyAsync( + { Pair(it.streamConfig.id, streamOperationsFactory.createInstance(it)) }, + executorService, + ) + } + .toList() + val futuresResult = allOf(initializationFutures).toCompletableFuture().get() + val result = + exceptions.getResultsOrLogAndThrowFirst( + "Following exceptions occurred during sync initialization", + futuresResult, + ) + destinationHandler.commitDestinationStates( + futuresResult + // If we're here, then all the futures were successful, so we're in the Right case + // of every Either + .map { it.right!! } + .associate { (id, streamOps) -> id to streamOps.updatedDestinationState } + ) + return result.toMap() + } + + override fun flushStream(descriptor: StreamDescriptor, stream: Stream) { + val streamConfig = + parsedCatalog.getStream(descriptor.namespace ?: defaultNamespace, descriptor.name) + streamOpsMap[streamConfig.id]?.writeRecords(streamConfig, stream) + } + + override fun finalizeStreams(streamSyncSummaries: Map) { + try { + // Only call finalizeTable operations which has summary. rest will be skipped + val finalizeFutures = + streamSyncSummaries.entries + .map { + CompletableFuture.supplyAsync( + { + val streamConfig = + parsedCatalog.getStream( + it.key.namespace ?: defaultNamespace, + it.key.name, + ) + streamOpsMap[streamConfig.id]?.finalizeTable(streamConfig, it.value) + }, + executorService, + ) + } + .toList() + val futuresResult = allOf(finalizeFutures).toCompletableFuture().join() + exceptions.getResultsOrLogAndThrowFirst( + "Following exceptions occurred while finalizing the sync", + futuresResult, + ) + } finally { + log.info { "Cleaning up sync operation thread pools" } + executorService.shutdown() + } + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperations.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperations.kt new file mode 100644 index 000000000000..10048d96aa1a --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperations.kt @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.StreamId +import io.airbyte.protocol.models.v0.DestinationSyncMode +import java.time.Instant +import java.util.Optional + +interface StorageOperations { + /* + * ==================== Staging Operations ================================ + */ + + /** + * Prepare staging area which cloud be creating any object storage, temp tables or file storage + */ + fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) + + /** Delete previously staged data, using deterministic information from streamId. */ + fun cleanupStage(streamId: StreamId) + + /** Write data to stage. */ + fun writeToStage(streamId: StreamId, data: Data) + + /* + * ==================== Final Table Operations ================================ + */ + + /** Create final schema extracted from [StreamId] */ + fun createFinalSchema(streamId: StreamId) + + /** Create final table extracted from [StreamId] */ + fun createFinalTable(streamConfig: StreamConfig, suffix: String, replace: Boolean) + + /** Reset the final table using a temp table or ALTER existing table's columns. */ + fun softResetFinalTable(streamConfig: StreamConfig) + + /** + * Attempt to atomically swap the final table (name and namespace extracted from [StreamId]). + * This could be destination specific, INSERT INTO..SELECT * and DROP TABLE OR CREATE OR REPLACE + * ... SELECT *, DROP TABLE + */ + fun overwriteFinalTable(streamConfig: StreamConfig, tmpTableSuffix: String) + + /** + */ + fun typeAndDedupe( + streamConfig: StreamConfig, + maxProcessedTimestamp: Optional, + finalTableSuffix: String + ) +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperation.kt new file mode 100644 index 000000000000..fea5df12bb58 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperation.kt @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.StreamSyncSummary +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import java.util.stream.Stream + +/** Operations on individual streams. */ +interface StreamOperation { + + val updatedDestinationState: T + + fun writeRecords(streamConfig: StreamConfig, stream: Stream) + + fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary) +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperationsFactory.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperationsFactory.kt new file mode 100644 index 000000000000..95f3eccf5196 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperationsFactory.kt @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus + +fun interface StreamOperationsFactory { + + /** + * Create an instance with required dependencies injected using a concrete factory + * implementation. + */ + fun createInstance( + destinationInitialStatus: DestinationInitialStatus + ): StreamOperation +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt index 903ab16df20e..79585dca47c5 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt @@ -7,9 +7,9 @@ import io.airbyte.cdk.integrations.base.IntegrationRunner import io.airbyte.cdk.integrations.destination.StreamSyncSummary import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst import io.airbyte.commons.concurrency.CompletableFutures -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.executeRawTableMigrations -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.executeWeirdMigrations -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.prepareSchemas +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeRawTableMigrations +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeWeirdMigrations +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.prepareSchemas import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState import io.airbyte.protocol.models.v0.DestinationSyncMode @@ -186,7 +186,7 @@ class DefaultTyperDeduper( // Make sure it has the right schema. // Also, if a raw table migration wants us to do a soft reset, do that // here. - TypeAndDedupeTransaction.executeSoftReset( + TyperDeduperUtil.executeSoftReset( sqlGenerator, destinationHandler, stream @@ -267,7 +267,7 @@ class DefaultTyperDeduper( val initialRawTableStatus = initialRawTableStateByStream.getValue(streamConfig.id) - TypeAndDedupeTransaction.executeTypeAndDedupe( + TyperDeduperUtil.executeTypeAndDedupe( sqlGenerator, destinationHandler, streamConfig, diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.kt index 6a55c364852b..7d9206f8810c 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.kt @@ -5,9 +5,9 @@ package io.airbyte.integrations.base.destination.typing_deduping import io.airbyte.cdk.integrations.base.IntegrationRunner import io.airbyte.cdk.integrations.destination.StreamSyncSummary -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.executeRawTableMigrations -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.executeWeirdMigrations -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.prepareSchemas +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeRawTableMigrations +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeWeirdMigrations +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.prepareSchemas import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState import io.airbyte.protocol.models.v0.StreamDescriptor diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.kt index c37b25926467..50d318b5933c 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.kt @@ -95,7 +95,7 @@ interface SqlGenerator { * @return */ fun prepareTablesForSoftReset(stream: StreamConfig): Sql { - val createTempTable = createTable(stream, TypeAndDedupeTransaction.SOFT_RESET_SUFFIX, true) + val createTempTable = createTable(stream, TyperDeduperUtil.SOFT_RESET_SUFFIX, true) val clearLoadedAt = clearLoadedAt(stream.id) return Sql.Companion.concat(createTempTable, clearLoadedAt) } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.kt deleted file mode 100644 index ca36a51f9233..000000000000 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.kt +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.base.destination.typing_deduping - -import java.time.Instant -import java.util.* -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -object TypeAndDedupeTransaction { - const val SOFT_RESET_SUFFIX: String = "_ab_soft_reset" - private val LOGGER: Logger = LoggerFactory.getLogger(TypeAndDedupeTransaction::class.java) - - /** - * It can be expensive to build the errors array in the airbyte_meta column, so we first attempt - * an 'unsafe' transaction which assumes everything is typed correctly. If that fails, we will - * run a more expensive query which handles casting errors - * - * @param sqlGenerator for generating sql for the destination - * @param destinationHandler for executing sql created - * @param streamConfig which stream to operate on - * @param minExtractedAt to reduce the amount of data in the query - * @param suffix table suffix for temporary tables - * @throws Exception if the safe query fails - */ - @JvmStatic - @Throws(Exception::class) - fun executeTypeAndDedupe( - sqlGenerator: SqlGenerator, - destinationHandler: DestinationHandler<*>, - streamConfig: StreamConfig?, - minExtractedAt: Optional, - suffix: String - ) { - try { - LOGGER.info( - "Attempting typing and deduping for {}.{} with suffix {}", - streamConfig!!.id.originalNamespace, - streamConfig.id.originalName, - suffix - ) - val unsafeSql = sqlGenerator.updateTable(streamConfig, suffix, minExtractedAt, false) - destinationHandler.execute(unsafeSql) - } catch (e: Exception) { - if (sqlGenerator.shouldRetry(e)) { - // TODO Destination specific non-retryable exceptions should be added. - LOGGER.error( - "Encountered Exception on unsafe SQL for stream {} {} with suffix {}, attempting with error handling", - streamConfig!!.id.originalNamespace, - streamConfig.id.originalName, - suffix, - e - ) - val saferSql = sqlGenerator.updateTable(streamConfig, suffix, minExtractedAt, true) - destinationHandler.execute(saferSql) - } else { - LOGGER.error( - "Encountered Exception on unsafe SQL for stream {} {} with suffix {}, Retry is skipped", - streamConfig!!.id.originalNamespace, - streamConfig.id.originalName, - suffix, - e - ) - throw e - } - } - } - - /** - * Everything in [TypeAndDedupeTransaction.executeTypeAndDedupe] but with a little extra prep - * work for the soft reset temp tables - * - * @param sqlGenerator for generating sql for the destination - * @param destinationHandler for executing sql created - * @param streamConfig which stream to operate on - * @throws Exception if the safe query fails - */ - @JvmStatic - @Throws(Exception::class) - fun executeSoftReset( - sqlGenerator: SqlGenerator, - destinationHandler: DestinationHandler<*>, - streamConfig: StreamConfig - ) { - LOGGER.info( - "Attempting soft reset for stream {} {}", - streamConfig.id.originalNamespace, - streamConfig.id.originalName - ) - destinationHandler.execute(sqlGenerator.prepareTablesForSoftReset(streamConfig)) - executeTypeAndDedupe( - sqlGenerator, - destinationHandler, - streamConfig, - Optional.empty(), - SOFT_RESET_SUFFIX - ) - destinationHandler.execute( - sqlGenerator.overwriteFinalTable(streamConfig.id, SOFT_RESET_SUFFIX) - ) - } -} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt index 961e1ff5b9e6..cfcb1f92106f 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt @@ -8,6 +8,7 @@ import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAn import io.airbyte.commons.concurrency.CompletableFutures import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import java.time.Instant import java.util.* import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletionStage @@ -16,190 +17,274 @@ import java.util.stream.Collectors.toMap import org.slf4j.Logger import org.slf4j.LoggerFactory -class TyperDeduperUtil { - companion object { - private val LOGGER: Logger = LoggerFactory.getLogger(TyperDeduperUtil::class.java) +object TyperDeduperUtil { + const val SOFT_RESET_SUFFIX: String = "_ab_soft_reset" + private val LOGGER: Logger = LoggerFactory.getLogger(TyperDeduperUtil::class.java) - @JvmStatic - fun executeRawTableMigrations( - executorService: ExecutorService, - destinationHandler: DestinationHandler, - migrations: List>, - initialStates: List> - ): List> { - // TODO: Either the migrations run the soft reset and create v2 tables or the actual - // prepare tables. - // unify the logic - // with current state of raw tables & final tables. This is done first before gather - // initial state - // to avoid recreating - // final tables later again. + @JvmStatic + fun executeRawTableMigrations( + executorService: ExecutorService, + destinationHandler: DestinationHandler, + migrations: List>, + initialStates: List> + ): List> { + // TODO: Either the migrations run the soft reset and create v2 tables or the actual + // prepare tables. + // unify the logic + // with current state of raw tables & final tables. This is done first before gather + // initial state + // to avoid recreating + // final tables later again. - // Run migrations in lockstep. Some migrations may require us to refetch the initial - // state. - // We want to be able to batch those calls together across streams. - // If a migration runs on one stream, it's likely to also run on other streams. - // So we can bundle the gatherInitialState calls together. - var currentStates = initialStates - for (migration in migrations) { - // Execute the migration on all streams in parallel - val futures: - Map>> = - currentStates - .stream() - .collect( - toMap( - { it.streamConfig.id }, - { initialState -> - runMigrationsAsync( - executorService, - destinationHandler, - migration, - initialState - ) - } - ) + // Run migrations in lockstep. Some migrations may require us to refetch the initial + // state. + // We want to be able to batch those calls together across streams. + // If a migration runs on one stream, it's likely to also run on other streams. + // So we can bundle the gatherInitialState calls together. + var currentStates = initialStates + for (migration in migrations) { + // Execute the migration on all streams in parallel + val futures: + Map>> = + currentStates + .stream() + .collect( + toMap( + { it.streamConfig.id }, + { initialState -> + runMigrationsAsync( + executorService, + destinationHandler, + migration, + initialState + ) + } ) - val migrationResultFutures = - CompletableFutures.allOf(futures.values.toList()).toCompletableFuture().join() - getResultsOrLogAndThrowFirst( - "The following exceptions were thrown attempting to run migrations:\n", - migrationResultFutures - ) - val migrationResults: Map> = - futures.mapValues { it.value.toCompletableFuture().join() } - - // Check if we need to refetch DestinationInitialState - val invalidatedStreams: Set = - migrationResults.filter { it.value.invalidateInitialState }.keys - val updatedStates: List> - if (invalidatedStreams.isNotEmpty()) { - LOGGER.info("Refetching initial state for streams: $invalidatedStreams") - updatedStates = - destinationHandler.gatherInitialState( - currentStates - .filter { invalidatedStreams.contains(it.streamConfig.id) } - .map { it.streamConfig } - ) - LOGGER.info("Updated states: $updatedStates") - } else { - updatedStates = emptyList() - } - - // Update the DestinationInitialStates with the new DestinationStates, - // and also update initialStates with the refetched states. - currentStates = - currentStates.map { initialState -> - // migrationResults will always contain an entry for each stream, so we can - // safely use !! - val updatedDestinationState = - migrationResults[initialState.streamConfig.id]!!.updatedDestinationState - if (invalidatedStreams.contains(initialState.streamConfig.id)) { - // We invalidated this stream's DestinationInitialState. - // Find the updated DestinationInitialState, and update it with our new - // DestinationState - return@map updatedStates - .filter { updatedState -> - updatedState.streamConfig.id.equals( - initialState.streamConfig.id - ) - } - .first() - .copy(destinationState = updatedDestinationState) - } else { - // Just update the original DestinationInitialState with the new - // DestinationState. - return@map initialState.copy(destinationState = updatedDestinationState) - } - } - } - return currentStates - } - - /** - * The legacy-style migrations (V1V2Migrator, V2TableMigrator) need to run before we gather - * initial state, because they're dumb and weird. (specifically: SnowflakeV2TableMigrator - * inspects the final tables and triggers a soft reset directly within the migration). TODO: - * Migrate these migrations to the new migration system. This will also reduce the number of - * times we need to query DB metadata, since (a) we can rely on the gatherInitialState - * values, and (b) we can add a DestinationState field for these migrations. It also enables - * us to not trigger multiple soft resets in a single sync. - */ - @JvmStatic - fun executeWeirdMigrations( - executorService: ExecutorService, - sqlGenerator: SqlGenerator, - destinationHandler: DestinationHandler, - v1V2Migrator: DestinationV1V2Migrator, - v2TableMigrator: V2TableMigrator, - parsedCatalog: ParsedCatalog - ) { - val futures = - parsedCatalog.streams.map { - CompletableFuture.supplyAsync( - { - v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, it) - v2TableMigrator.migrateIfNecessary(it) - }, - executorService ) - } + val migrationResultFutures = + CompletableFutures.allOf(futures.values.toList()).toCompletableFuture().join() getResultsOrLogAndThrowFirst( "The following exceptions were thrown attempting to run migrations:\n", - CompletableFutures.allOf(futures.toList()).toCompletableFuture().join() + migrationResultFutures ) - } + val migrationResults: Map> = + futures.mapValues { it.value.toCompletableFuture().join() } + + // Check if we need to refetch DestinationInitialState + val invalidatedStreams: Set = + migrationResults.filter { it.value.invalidateInitialState }.keys + val updatedStates: List> + if (invalidatedStreams.isNotEmpty()) { + LOGGER.info("Refetching initial state for streams: $invalidatedStreams") + updatedStates = + destinationHandler.gatherInitialState( + currentStates + .filter { invalidatedStreams.contains(it.streamConfig.id) } + .map { it.streamConfig } + ) + LOGGER.info("Updated states: $updatedStates") + } else { + updatedStates = emptyList() + } - /** - * Extracts all the "raw" and "final" schemas identified in the [parsedCatalog] and ensures - * they exist in the Destination Database. - */ - @JvmStatic - fun prepareSchemas( - sqlGenerator: SqlGenerator, - destinationHandler: DestinationHandler, - parsedCatalog: ParsedCatalog - ) { - val rawSchema = parsedCatalog.streams.map { it.id.rawNamespace } - val finalSchema = parsedCatalog.streams.map { it.id.finalNamespace } - val createAllSchemasSql = - (rawSchema + finalSchema).distinct().map { sqlGenerator.createSchema(it) } - destinationHandler.execute(Sql.concat(createAllSchemasSql)) + // Update the DestinationInitialStates with the new DestinationStates, + // and also update initialStates with the refetched states. + currentStates = + currentStates.map { initialState -> + // migrationResults will always contain an entry for each stream, so we can + // safely use !! + val updatedDestinationState = + migrationResults[initialState.streamConfig.id]!!.updatedDestinationState + if (invalidatedStreams.contains(initialState.streamConfig.id)) { + // We invalidated this stream's DestinationInitialState. + // Find the updated DestinationInitialState, and update it with our new + // DestinationState + return@map updatedStates + .filter { updatedState -> + updatedState.streamConfig.id.equals(initialState.streamConfig.id) + } + .first() + .copy(destinationState = updatedDestinationState) + } else { + // Just update the original DestinationInitialState with the new + // DestinationState. + return@map initialState.copy(destinationState = updatedDestinationState) + } + } } + return currentStates + } - private fun runMigrationsAsync( - executorService: ExecutorService, - destinationHandler: DestinationHandler, - migration: Migration, - initialStatus: DestinationInitialStatus - ): CompletionStage> { - return CompletableFuture.supplyAsync( - { - LOGGER.info( - "Maybe executing ${migration.javaClass.simpleName} migration for stream ${initialStatus.streamConfig.id.originalNamespace}.${initialStatus.streamConfig.id.originalName}." - ) + /** + * The legacy-style migrations (V1V2Migrator, V2TableMigrator) need to run before we gather + * initial state, because they're dumb and weird. (specifically: SnowflakeV2TableMigrator + * inspects the final tables and triggers a soft reset directly within the migration). TODO: + * Migrate these migrations to the new migration system. This will also reduce the number of + * times we need to query DB metadata, since (a) we can rely on the gatherInitialState values, + * and (b) we can add a DestinationState field for these migrations. It also enables us to not + * trigger multiple soft resets in a single sync. + */ + @JvmStatic + fun executeWeirdMigrations( + executorService: ExecutorService, + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler, + v1V2Migrator: DestinationV1V2Migrator, + v2TableMigrator: V2TableMigrator, + parsedCatalog: ParsedCatalog + ) { + val futures = + parsedCatalog.streams.map { + CompletableFuture.supplyAsync( + { + v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, it) + v2TableMigrator.migrateIfNecessary(it) + }, + executorService + ) + } + getResultsOrLogAndThrowFirst( + "The following exceptions were thrown attempting to run migrations:\n", + CompletableFutures.allOf(futures.toList()).toCompletableFuture().join() + ) + } - // We technically don't need to track this, but might as well hedge against - // migrations - // accidentally setting softReset=false - val softReset = initialStatus.destinationState.needsSoftReset() - val migrationResult = - migration.migrateIfNecessary( - destinationHandler, - initialStatus.streamConfig, - initialStatus - ) - val updatedNeedsSoftReset = - softReset || migrationResult.updatedDestinationState.needsSoftReset() - return@supplyAsync migrationResult.copy( - updatedDestinationState = - migrationResult.updatedDestinationState.withSoftReset( - updatedNeedsSoftReset - ) + /** + * Extracts all the "raw" and "final" schemas identified in the [parsedCatalog] and ensures they + * exist in the Destination Database. + */ + @JvmStatic + fun prepareSchemas( + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler, + parsedCatalog: ParsedCatalog + ) { + val rawSchema = parsedCatalog.streams.map { it.id.rawNamespace } + val finalSchema = parsedCatalog.streams.map { it.id.finalNamespace } + val createAllSchemasSql = + (rawSchema + finalSchema).distinct().map { sqlGenerator.createSchema(it) } + destinationHandler.execute(Sql.concat(createAllSchemasSql)) + } + + private fun runMigrationsAsync( + executorService: ExecutorService, + destinationHandler: DestinationHandler, + migration: Migration, + initialStatus: DestinationInitialStatus + ): CompletionStage> { + return CompletableFuture.supplyAsync( + { + LOGGER.info( + "Maybe executing ${migration.javaClass.simpleName} migration for stream ${initialStatus.streamConfig.id.originalNamespace}.${initialStatus.streamConfig.id.originalName}." + ) + + // We technically don't need to track this, but might as well hedge against + // migrations + // accidentally setting softReset=false + val softReset = initialStatus.destinationState.needsSoftReset() + val migrationResult = + migration.migrateIfNecessary( + destinationHandler, + initialStatus.streamConfig, + initialStatus ) - }, - executorService + val updatedNeedsSoftReset = + softReset || migrationResult.updatedDestinationState.needsSoftReset() + return@supplyAsync migrationResult.copy( + updatedDestinationState = + migrationResult.updatedDestinationState.withSoftReset(updatedNeedsSoftReset) + ) + }, + executorService + ) + } + + /** + * It can be expensive to build the errors array in the airbyte_meta column, so we first attempt + * an 'unsafe' transaction which assumes everything is typed correctly. If that fails, we will + * run a more expensive query which handles casting errors + * + * @param sqlGenerator for generating sql for the destination + * @param destinationHandler for executing sql created + * @param streamConfig which stream to operate on + * @param minExtractedAt to reduce the amount of data in the query + * @param suffix table suffix for temporary tables + * @throws Exception if the safe query fails + */ + @JvmStatic + @Throws(Exception::class) + fun executeTypeAndDedupe( + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler<*>, + streamConfig: StreamConfig?, + minExtractedAt: Optional, + suffix: String + ) { + try { + LOGGER.info( + "Attempting typing and deduping for {}.{} with suffix {}", + streamConfig!!.id.originalNamespace, + streamConfig.id.originalName, + suffix ) + val unsafeSql = sqlGenerator.updateTable(streamConfig, suffix, minExtractedAt, false) + destinationHandler.execute(unsafeSql) + } catch (e: Exception) { + if (sqlGenerator.shouldRetry(e)) { + // TODO Destination specific non-retryable exceptions should be added. + LOGGER.error( + "Encountered Exception on unsafe SQL for stream {} {} with suffix {}, attempting with error handling", + streamConfig!!.id.originalNamespace, + streamConfig.id.originalName, + suffix, + e + ) + val saferSql = sqlGenerator.updateTable(streamConfig, suffix, minExtractedAt, true) + destinationHandler.execute(saferSql) + } else { + LOGGER.error( + "Encountered Exception on unsafe SQL for stream {} {} with suffix {}, Retry is skipped", + streamConfig!!.id.originalNamespace, + streamConfig.id.originalName, + suffix, + e + ) + throw e + } } } + + /** + * Everything in [TyperDeduperUtil.executeTypeAndDedupe] but with a little extra prep work for + * the soft reset temp tables + * + * @param sqlGenerator for generating sql for the destination + * @param destinationHandler for executing sql created + * @param streamConfig which stream to operate on + * @throws Exception if the safe query fails + */ + @JvmStatic + @Throws(Exception::class) + fun executeSoftReset( + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler<*>, + streamConfig: StreamConfig + ) { + LOGGER.info( + "Attempting soft reset for stream {} {}", + streamConfig.id.originalNamespace, + streamConfig.id.originalName + ) + destinationHandler.execute(sqlGenerator.prepareTablesForSoftReset(streamConfig)) + executeTypeAndDedupe( + sqlGenerator, + destinationHandler, + streamConfig, + Optional.empty(), + SOFT_RESET_SUFFIX + ) + destinationHandler.execute( + sqlGenerator.overwriteFinalTable(streamConfig.id, SOFT_RESET_SUFFIX) + ) + } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt new file mode 100644 index 000000000000..1a0b7971ab98 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt @@ -0,0 +1,549 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.StreamSyncSummary +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType +import io.airbyte.integrations.base.destination.typing_deduping.ColumnId +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus +import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.StreamId +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.airbyte.protocol.models.v0.DestinationSyncMode +import io.mockk.checkUnnecessaryStub +import io.mockk.clearMocks +import io.mockk.confirmVerified +import io.mockk.every +import io.mockk.mockk +import io.mockk.verifySequence +import java.time.Instant +import java.util.Optional +import java.util.stream.Stream +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.MethodSource +import org.junit.jupiter.params.provider.ValueSource + +/** + * Verify that [AbstractStreamOperation] behaves correctly, given various initial states. We + * intentionally mock the [DestinationInitialStatus]. This allows us to verify that the stream ops + * only looks at specific fields - the mocked initial statuses will throw exceptions for unstubbed + * methods. + * + * For example, we don't need to write separate test cases for "final table does not exist and + * destination state has softReset=true/false" - instead we have a single test case for "final table + * does not exist", and it doesn't stub the `needsSoftReset` method. If we introduce a bug in stream + * ops and it starts checking needsSoftReset even though the final table doesn't exist, then these + * tests will start failing. + */ +class AbstractStreamOperationTest { + class TestStreamOperation( + storageOperations: StorageOperations, + destinationInitialStatus: DestinationInitialStatus + ) : + AbstractStreamOperation( + storageOperations, + destinationInitialStatus, + ) { + override fun writeRecords( + streamConfig: StreamConfig, + stream: Stream + ) { + // noop + } + } + + // This mock is purely for verification. Set relaxed=true so we don't need to stub every call. + // Our tests use confirmVerified() to check that we didn't miss any actions. + private val storageOperations = mockk(relaxed = true) + + @Nested + inner class Overwrite { + private val streamConfig = + StreamConfig( + streamId, + DestinationSyncMode.OVERWRITE, + listOf(), + Optional.empty(), + columns, + // TODO currently these values are unused. Eventually we should restructure this + // class + // to test based on generation ID instead of sync mode. + 0, + 0, + 0 + ) + + @Test + fun emptyDestination() { + val initialState = mockk>() + every { initialState.streamConfig } returns streamConfig + every { initialState.initialRawTableStatus } returns mockk() + every { initialState.isFinalTablePresent } returns false + every { + initialState.destinationState.withSoftReset(any()) + } returns initialState.destinationState + + val streamOperations = TestStreamOperation(storageOperations, initialState) + + verifySequence { + storageOperations.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperations.createFinalSchema(streamId) + storageOperations.createFinalTable(streamConfig, "", false) + } + confirmVerified(storageOperations) + + clearMocks(storageOperations) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + + verifySequence { + storageOperations.cleanupStage(streamId) + storageOperations.typeAndDedupe( + streamConfig, + Optional.empty(), + "", + ) + } + confirmVerified(storageOperations) + checkUnnecessaryStub(initialState, initialState.initialRawTableStatus) + } + + @Test + fun existingEmptyTable() { + val initialState = mockk>() + every { initialState.streamConfig } returns streamConfig + every { initialState.initialRawTableStatus } returns mockk() + every { initialState.isFinalTablePresent } returns true + every { initialState.isFinalTableEmpty } returns true + // Even though there's a schema mismatch, we're running in overwrite mode, + // so we shouldn't execute a soft reset. + // We do need to use a temp final table though. + every { initialState.isSchemaMismatch } returns true + every { + initialState.destinationState.withSoftReset(any()) + } returns initialState.destinationState + + val streamOperations = TestStreamOperation(storageOperations, initialState) + + verifySequence { + storageOperations.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperations.createFinalSchema(streamId) + storageOperations.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true) + } + confirmVerified(storageOperations) + + clearMocks(storageOperations) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + + verifySequence { + storageOperations.cleanupStage(streamId) + storageOperations.typeAndDedupe( + streamConfig, + Optional.empty(), + EXPECTED_OVERWRITE_SUFFIX, + ) + storageOperations.overwriteFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX) + } + confirmVerified(storageOperations) + checkUnnecessaryStub(initialState, initialState.initialRawTableStatus) + } + + @Test + fun existingEmptyTableMatchingSchema() { + val initialState = mockk>() + every { initialState.streamConfig } returns streamConfig + every { initialState.initialRawTableStatus } returns mockk() + every { initialState.isFinalTablePresent } returns true + every { initialState.isFinalTableEmpty } returns true + every { initialState.isSchemaMismatch } returns false + every { + initialState.destinationState.withSoftReset(any()) + } returns initialState.destinationState + + val streamOperations = TestStreamOperation(storageOperations, initialState) + + verifySequence { + storageOperations.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperations.createFinalSchema(streamId) + // No table creation - we can just reuse the existing table. + } + confirmVerified(storageOperations) + + clearMocks(storageOperations) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + + verifySequence { + storageOperations.cleanupStage(streamId) + storageOperations.typeAndDedupe( + streamConfig, + Optional.empty(), + "", + ) + } + confirmVerified(storageOperations) + checkUnnecessaryStub(initialState, initialState.initialRawTableStatus) + } + + @Test + fun existingNonEmptyTable() { + val initialState = mockk>() + every { initialState.streamConfig } returns streamConfig + every { initialState.initialRawTableStatus } returns mockk() + every { initialState.isFinalTablePresent } returns true + every { initialState.isFinalTableEmpty } returns false + every { + initialState.destinationState.withSoftReset(any()) + } returns initialState.destinationState + + val streamOperations = TestStreamOperation(storageOperations, initialState) + + verifySequence { + storageOperations.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperations.createFinalSchema(streamId) + storageOperations.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true) + } + confirmVerified(storageOperations) + + clearMocks(storageOperations) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + + verifySequence { + storageOperations.cleanupStage(streamId) + storageOperations.typeAndDedupe( + streamConfig, + Optional.empty(), + EXPECTED_OVERWRITE_SUFFIX, + ) + storageOperations.overwriteFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX) + } + confirmVerified(storageOperations) + checkUnnecessaryStub(initialState, initialState.initialRawTableStatus) + } + + @ParameterizedTest + @ValueSource(booleans = [true, false]) + fun existingNonEmptyTableNoNewRecords(hasUnprocessedRecords: Boolean) { + val initialState = mockk>() + every { initialState.streamConfig } returns streamConfig + every { initialState.initialRawTableStatus } returns mockk() + // This is an overwrite sync, so we can ignore the old raw records. + // We should skip T+D if the current sync emitted 0 records. + every { initialState.initialRawTableStatus.hasUnprocessedRecords } returns + hasUnprocessedRecords + every { initialState.isFinalTablePresent } returns true + every { initialState.isFinalTableEmpty } returns false + every { + initialState.destinationState.withSoftReset(any()) + } returns initialState.destinationState + + val streamOperations = TestStreamOperation(storageOperations, initialState) + + verifySequence { + storageOperations.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperations.createFinalSchema(streamId) + storageOperations.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true) + } + confirmVerified(storageOperations) + + clearMocks(storageOperations) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(0))) + + verifySequence { + storageOperations.cleanupStage(streamId) + storageOperations.overwriteFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX) + } + confirmVerified(storageOperations) + checkUnnecessaryStub(initialState, initialState.initialRawTableStatus) + } + } + + @Nested + inner class NonOverwrite { + @ParameterizedTest + @MethodSource( + "io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs" + ) + fun emptyDestination(streamConfig: StreamConfig) { + val initialState = mockk>() + every { initialState.streamConfig } returns streamConfig + every { initialState.initialRawTableStatus.maxProcessedTimestamp } returns + Optional.empty() + every { initialState.isFinalTablePresent } returns false + every { + initialState.destinationState.withSoftReset(any()) + } returns initialState.destinationState + + val streamOperations = TestStreamOperation(storageOperations, initialState) + + verifySequence { + storageOperations.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperations.createFinalSchema(streamId) + storageOperations.createFinalTable(streamConfig, "", false) + } + confirmVerified(storageOperations) + + clearMocks(storageOperations) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + + verifySequence { + storageOperations.cleanupStage(streamId) + storageOperations.typeAndDedupe( + streamConfig, + Optional.empty(), + "", + ) + } + confirmVerified(storageOperations) + checkUnnecessaryStub(initialState, initialState.initialRawTableStatus) + } + + @ParameterizedTest + @MethodSource( + "io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs" + ) + fun existingTableSchemaMismatch(streamConfig: StreamConfig) { + val initialState = mockk>() + every { initialState.streamConfig } returns streamConfig + every { initialState.initialRawTableStatus.maxProcessedTimestamp } returns + Optional.empty() + every { initialState.isFinalTablePresent } returns true + every { initialState.isSchemaMismatch } returns true + every { + initialState.destinationState.withSoftReset(any()) + } returns initialState.destinationState + + val streamOperations = TestStreamOperation(storageOperations, initialState) + + verifySequence { + storageOperations.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperations.createFinalSchema(streamId) + storageOperations.softResetFinalTable(streamConfig) + } + confirmVerified(storageOperations) + + clearMocks(storageOperations) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + + verifySequence { + storageOperations.cleanupStage(streamId) + storageOperations.typeAndDedupe( + streamConfig, + Optional.empty(), + "", + ) + } + confirmVerified(storageOperations) + checkUnnecessaryStub(initialState, initialState.initialRawTableStatus) + } + + @ParameterizedTest + @MethodSource( + "io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs" + ) + fun existingTableSchemaMatch(streamConfig: StreamConfig) { + val initialState = mockk>() + every { initialState.streamConfig } returns streamConfig + every { initialState.initialRawTableStatus.maxProcessedTimestamp } returns + Optional.empty() + every { initialState.isFinalTablePresent } returns true + every { initialState.isSchemaMismatch } returns false + every { initialState.destinationState } returns MinimumDestinationState.Impl(false) + + val streamOperations = TestStreamOperation(storageOperations, initialState) + + verifySequence { + storageOperations.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperations.createFinalSchema(streamId) + // No soft reset - we can just reuse the existing table. + } + confirmVerified(storageOperations) + + clearMocks(storageOperations) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + + verifySequence { + storageOperations.cleanupStage(streamId) + storageOperations.typeAndDedupe( + streamConfig, + Optional.empty(), + "", + ) + } + confirmVerified(storageOperations) + checkUnnecessaryStub(initialState, initialState.initialRawTableStatus) + } + + @ParameterizedTest + @MethodSource( + "io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs" + ) + fun existingTableAndStateRequiresSoftReset(streamConfig: StreamConfig) { + val initialState = mockk>() + every { initialState.streamConfig } returns streamConfig + every { initialState.initialRawTableStatus.maxProcessedTimestamp } returns + Optional.empty() + every { initialState.isFinalTablePresent } returns true + every { initialState.isSchemaMismatch } returns false + every { initialState.destinationState } returns MinimumDestinationState.Impl(true) + + val streamOperations = TestStreamOperation(storageOperations, initialState) + + verifySequence { + storageOperations.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperations.createFinalSchema(streamId) + storageOperations.softResetFinalTable(streamConfig) + } + confirmVerified(storageOperations) + + clearMocks(storageOperations) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + + verifySequence { + storageOperations.cleanupStage(streamId) + storageOperations.typeAndDedupe( + streamConfig, + Optional.empty(), + "", + ) + } + confirmVerified(storageOperations) + checkUnnecessaryStub(initialState, initialState.initialRawTableStatus) + } + + @ParameterizedTest + @MethodSource( + "io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigsAndBoolean" + ) + fun existingNonEmptyTableNoNewRecords( + streamConfig: StreamConfig, + hasUnprocessedRecords: Boolean + ) { + val initialState = mockk>() + every { initialState.streamConfig } returns streamConfig + // This is an overwrite sync, so we can ignore the old raw records. + // We should skip T+D if the current sync emitted 0 records. + every { initialState.initialRawTableStatus.hasUnprocessedRecords } returns + hasUnprocessedRecords + if (hasUnprocessedRecords) { + // We only care about this value if we're executing T+D. + // If there are no unprocessed records from a previous sync, and no new records from + // this sync, + // we don't need to set it. + every { initialState.initialRawTableStatus.maxProcessedTimestamp } returns + maxProcessedTimestamp + } + every { initialState.isFinalTablePresent } returns true + every { initialState.isSchemaMismatch } returns false + every { initialState.destinationState } returns MinimumDestinationState.Impl(false) + + val streamOperations = TestStreamOperation(storageOperations, initialState) + + verifySequence { + storageOperations.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperations.createFinalSchema(streamId) + } + confirmVerified(storageOperations) + + clearMocks(storageOperations) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(0))) + + verifySequence { + storageOperations.cleanupStage(streamId) + // If this sync emitted no records, we only need to run T+D if a previous sync + // emitted + // some records but failed to run T+D. + if (hasUnprocessedRecords) { + storageOperations.typeAndDedupe(streamConfig, maxProcessedTimestamp, "") + } + } + confirmVerified(storageOperations) + checkUnnecessaryStub(initialState, initialState.initialRawTableStatus) + } + } + + companion object { + val streamId = + StreamId( + "final_namespace", + "final_name", + "raw_namespace", + "raw_name", + "original_namespace", + "original_name", + ) + private val pk1 = ColumnId("pk1", "pk1_original_name", "pk1_canonical_name") + private val pk2 = ColumnId("pk2", "pk2_original_name", "pk2_canonical_name") + private val cursor = ColumnId("cursor", "cursor_original_name", "cursor_canonical_name") + val columns: LinkedHashMap = + linkedMapOf( + pk1 to AirbyteProtocolType.INTEGER, + pk2 to AirbyteProtocolType.STRING, + cursor to AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE, + ColumnId( + "username", + "username_original_name", + "username_canonical_name", + ) to AirbyteProtocolType.STRING, + ) + + const val EXPECTED_OVERWRITE_SUFFIX = "_airbyte_tmp" + val maxProcessedTimestamp = Optional.of(Instant.parse("2024-01-23T12:34:56Z")) + + private val appendStreamConfig = + StreamConfig( + streamId, + DestinationSyncMode.APPEND, + listOf(), + Optional.empty(), + columns, + // TODO currently these values are unused. Eventually we should restructure this + // class + // to test based on generation ID instead of sync mode. + 0, + 0, + 0 + ) + private val dedupStreamConfig = + StreamConfig( + streamId, + DestinationSyncMode.APPEND_DEDUP, + listOf(pk1, pk2), + Optional.of(cursor), + columns, + // TODO currently these values are unused. Eventually we should restructure this + // class + // to test based on generation ID instead of sync mode. + 0, + 0, + 0 + ) + + // junit 5 doesn't support class-level parameterization... + // so we have to hack this in a somewhat dumb way. + // append and dedup should behave identically from StreamOperations' POV, + // so just shove them together. + @JvmStatic + fun nonOverwriteStreamConfigs(): Stream = + Stream.of( + Arguments.of(appendStreamConfig), + Arguments.of(dedupStreamConfig), + ) + + // Some tests are further parameterized, which this method supports. + @JvmStatic + fun nonOverwriteStreamConfigsAndBoolean(): Stream = + Stream.of( + Arguments.of(appendStreamConfig, true), + Arguments.of(appendStreamConfig, false), + Arguments.of(dedupStreamConfig, true), + Arguments.of(dedupStreamConfig, false), + ) + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt new file mode 100644 index 000000000000..021c28c6c734 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt @@ -0,0 +1,334 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.StreamSyncSummary +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus +import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus +import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog +import io.airbyte.integrations.base.destination.typing_deduping.Sql +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.StreamId +import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.airbyte.protocol.models.v0.DestinationSyncMode +import io.mockk.clearMocks +import io.mockk.confirmVerified +import io.mockk.every +import io.mockk.mockk +import io.mockk.slot +import io.mockk.spyk +import io.mockk.verify +import java.util.Optional +import java.util.stream.Stream +import org.junit.jupiter.api.Test + +class DefaultSyncOperationTest { + private data class MockState( + val needsSoftReset: Boolean, + val softResetMigrationCompleted: Boolean, + val nonSoftResetMigrationCompleted: Boolean + ) : MinimumDestinationState { + override fun needsSoftReset(): Boolean = needsSoftReset + + override fun withSoftReset(needsSoftReset: Boolean): T { + @Suppress("UNCHECKED_CAST") return copy(needsSoftReset = needsSoftReset) as T + } + } + private class TestStreamOperation(destinationState: MockState) : StreamOperation { + override val updatedDestinationState: MockState = destinationState.withSoftReset(false) + override fun writeRecords( + streamConfig: StreamConfig, + stream: Stream + ) {} + override fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary) {} + } + + private val destinationHandler = mockk>(relaxed = true) + + @Test + fun multipleSoftResets() { + val overwriteInitialStatus = + DestinationInitialStatus( + overwriteStreamConfig, + isFinalTablePresent = true, + initialRawTableStatus = + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = false, + maxProcessedTimestamp = Optional.empty() + ), + isSchemaMismatch = true, + isFinalTableEmpty = false, + destinationState = + MockState( + needsSoftReset = false, + softResetMigrationCompleted = false, + nonSoftResetMigrationCompleted = true + ) + ) + val appendInitialStatus = + DestinationInitialStatus( + appendStreamConfig, + isFinalTablePresent = true, + initialRawTableStatus = + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = false, + maxProcessedTimestamp = Optional.empty() + ), + isSchemaMismatch = true, + isFinalTableEmpty = false, + destinationState = + MockState( + needsSoftReset = false, + softResetMigrationCompleted = false, + nonSoftResetMigrationCompleted = true + ) + ) + val dedupInitialStatus = + DestinationInitialStatus( + dedupStreamConfig, + isFinalTablePresent = true, + initialRawTableStatus = + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = false, + maxProcessedTimestamp = Optional.empty() + ), + isSchemaMismatch = true, + isFinalTableEmpty = false, + destinationState = + MockState( + needsSoftReset = false, + softResetMigrationCompleted = false, + nonSoftResetMigrationCompleted = true + ) + ) + every { destinationHandler.gatherInitialState(any()) } returns + listOf( + overwriteInitialStatus, + appendInitialStatus, + dedupInitialStatus, + ) + + val streamOperations = HashMap>() + val streamOperationsFactory = + mockk> { + val initialStatusSlot = slot>() + every { createInstance(capture(initialStatusSlot)) } answers + { + val streamOps = + spyk(TestStreamOperation(initialStatusSlot.captured.destinationState)) + streamOperations[initialStatusSlot.captured.streamConfig] = streamOps + streamOps + } + } + + val syncOperation = + DefaultSyncOperation( + parsedCatalog, + destinationHandler, + "default_ns", + streamOperationsFactory, + listOf(migrationWithSoftReset, migrationWithoutSoftReset), + ) + + // Not verifying ordering, simply because we multithread some stuff. + verify(exactly = 1) { + destinationHandler.gatherInitialState(any()) + destinationHandler.execute( + Sql.of("MIGRATE WITH SOFT_RESET airbyte_internal.overwrite_stream;") + ) + destinationHandler.execute( + Sql.of("MIGRATE WITH SOFT_RESET airbyte_internal.append_stream;") + ) + destinationHandler.execute( + Sql.of("MIGRATE WITH SOFT_RESET airbyte_internal.dedup_stream;") + ) + destinationHandler.commitDestinationStates( + mapOf( + overwriteStreamConfig.id to + MockState( + needsSoftReset = true, + softResetMigrationCompleted = true, + nonSoftResetMigrationCompleted = true, + ), + appendStreamConfig.id to + MockState( + needsSoftReset = true, + softResetMigrationCompleted = true, + nonSoftResetMigrationCompleted = true, + ), + dedupStreamConfig.id to + MockState( + needsSoftReset = true, + softResetMigrationCompleted = true, + nonSoftResetMigrationCompleted = true, + ), + ), + ) + streamOperationsFactory.createInstance( + overwriteInitialStatus.copy( + destinationState = + overwriteInitialStatus.destinationState.copy( + needsSoftReset = true, + softResetMigrationCompleted = true, + ), + ), + ) + streamOperationsFactory.createInstance( + appendInitialStatus.copy( + destinationState = + appendInitialStatus.destinationState.copy( + needsSoftReset = true, + softResetMigrationCompleted = true, + ), + ), + ) + streamOperationsFactory.createInstance( + dedupInitialStatus.copy( + destinationState = + dedupInitialStatus.destinationState.copy( + needsSoftReset = true, + softResetMigrationCompleted = true, + ), + ), + ) + streamOperations.values.onEach { it.updatedDestinationState } + destinationHandler.commitDestinationStates( + mapOf( + overwriteStreamConfig.id to + MockState( + needsSoftReset = false, + softResetMigrationCompleted = true, + nonSoftResetMigrationCompleted = true, + ), + appendStreamConfig.id to + MockState( + needsSoftReset = false, + softResetMigrationCompleted = true, + nonSoftResetMigrationCompleted = true, + ), + dedupStreamConfig.id to + MockState( + needsSoftReset = false, + softResetMigrationCompleted = true, + nonSoftResetMigrationCompleted = true, + ), + ), + ) + } + confirmVerified(destinationHandler) + confirmVerified(streamOperationsFactory) + streamOperations.values.onEach { confirmVerified(it) } + + clearMocks(destinationHandler) + clearMocks(streamOperationsFactory) + streamOperations.values.onEach { clearMocks(it) } + } + + companion object { + private val migrationWithSoftReset: Migration = + object : Migration { + override fun migrateIfNecessary( + destinationHandler: DestinationHandler, + stream: StreamConfig, + state: DestinationInitialStatus + ): Migration.MigrationResult { + if (!state.destinationState.softResetMigrationCompleted) { + destinationHandler.execute( + Sql.of("MIGRATE WITH SOFT_RESET ${stream.id.rawTableId("")}"), + ) + } + return Migration.MigrationResult( + state.destinationState.copy( + needsSoftReset = true, + softResetMigrationCompleted = true, + ), + false, + ) + } + } + + private val migrationWithoutSoftReset: Migration = + object : Migration { + override fun migrateIfNecessary( + destinationHandler: DestinationHandler, + stream: StreamConfig, + state: DestinationInitialStatus + ): Migration.MigrationResult { + if (!state.destinationState.nonSoftResetMigrationCompleted) { + destinationHandler.execute( + Sql.of("MIGRATE WITHOUT SOFT_RESET ${stream.id.rawTableId("")}"), + ) + } + return Migration.MigrationResult( + state.destinationState.copy(nonSoftResetMigrationCompleted = true), + false + ) + } + } + + private val overwriteStreamConfig = + StreamConfig( + StreamId( + "overwrite_ns", + "overwrite_stream", + "airbyte_internal", + "overwrite_stream", + "overwrite_ns", + "overwrite_stream" + ), + DestinationSyncMode.OVERWRITE, + mockk(), + mockk(), + mockk(), + 0, + 0, + 0, + ) + private val appendStreamConfig = + StreamConfig( + StreamId( + "append_ns", + "append_stream", + "airbyte_internal", + "append_stream", + "append_ns", + "append_stream" + ), + DestinationSyncMode.APPEND, + mockk(), + mockk(), + mockk(), + 0, + 0, + 0, + ) + private val dedupStreamConfig = + StreamConfig( + StreamId( + "dedup_ns", + "dedup_stream", + "airbyte_internal", + "dedup_stream", + "dedup_ns", + "dedup_stream" + ), + DestinationSyncMode.APPEND_DEDUP, + mockk(), + mockk(), + mockk(), + 0, + 0, + 0, + ) + private val parsedCatalog = + ParsedCatalog(listOf(overwriteStreamConfig, appendStreamConfig, dedupStreamConfig)) + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index ac9e81c8d9be..08c2ec8ef980 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -7,8 +7,8 @@ import com.fasterxml.jackson.databind.JsonNode import com.google.common.collect.Streams import io.airbyte.commons.json.Jsons import io.airbyte.commons.string.Strings -import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeTransaction.executeSoftReset -import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeTransaction.executeTypeAndDedupe +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeSoftReset +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeTypeAndDedupe import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog @@ -1628,11 +1628,7 @@ abstract class BaseSqlGeneratorIntegrationTest