diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 1a3756836748..0e889da660f1 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,6 +174,7 @@ corresponds to that version. | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.37.1 | 2024-06-10 | [\#38075](https://github.com/airbytehq/airbyte/pull/38075) | Destinations: Track stream statuses in async framework | | 0.37.0 | 2024-06-10 | [\#38121](https://github.com/airbytehq/airbyte/pull/38121) | Destinations: Set default namespace via CatalogParser | | 0.36.8 | 2024-06-07 | [\#38763](https://github.com/airbytehq/airbyte/pull/38763) | Increase Jackson message length limit | | 0.36.7 | 2024-06-06 | [\#39220](https://github.com/airbytehq/airbyte/pull/39220) | Handle null messages in ConnectorExceptionUtil | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StreamSyncSummary.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StreamSyncSummary.kt index fd2bf0fdc785..6f15e8e1ed48 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StreamSyncSummary.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StreamSyncSummary.kt @@ -3,17 +3,9 @@ */ package io.airbyte.cdk.integrations.destination -import java.util.* +import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus -/** - * @param recordsWritten The number of records written to the stream, or empty if the caller does - * not track this information. (this is primarily for backwards-compatibility with the legacy - * destinations framework; new implementations should always provide this information). If this - * value is empty, consumers should assume that the sync wrote nonzero records for this stream. - */ -data class StreamSyncSummary(val recordsWritten: Optional) { - - companion object { - @JvmField val DEFAULT: StreamSyncSummary = StreamSyncSummary(Optional.empty()) - } -} +data class StreamSyncSummary( + val recordsWritten: Long, + val terminalStatus: AirbyteStreamStatus, +) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt index a8852e7b93e9..aa0172c56272 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt @@ -17,10 +17,10 @@ import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseF import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.StreamDescriptor import io.github.oshai.kotlinlogging.KotlinLogging -import java.util.Optional import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap import java.util.concurrent.ExecutorService @@ -71,6 +71,8 @@ constructor( // Note that this map will only be populated for streams with nonzero records. private val recordCounts: ConcurrentMap = ConcurrentHashMap() + private val terminalStatusesFromSource: ConcurrentMap = + ConcurrentHashMap() private var hasStarted = false private var hasClosed = false @@ -103,12 +105,43 @@ constructor( airbyteMessageDeserializer.deserializeAirbyteMessage( message, ) - if (AirbyteMessage.Type.RECORD == partialAirbyteMessage.type) { - validateRecord(partialAirbyteMessage) - - partialAirbyteMessage.record?.streamDescriptor?.let { - getRecordCounter(it).incrementAndGet() + when (partialAirbyteMessage.type) { + AirbyteMessage.Type.RECORD -> { + validateRecord(partialAirbyteMessage) + + partialAirbyteMessage.record?.streamDescriptor?.let { + getRecordCounter(it).incrementAndGet() + + if (terminalStatusesFromSource.containsKey(it)) { + throw IllegalStateException( + "Received a record message after a terminal stream status for stream ${it.namespace}.${it.name}" + ) + } + } + } + AirbyteMessage.Type.TRACE -> { + // There are many types of trace messages, but we only care about stream status + // messages with status=COMPLETE or INCOMPLETE. + // INCOMPLETE is a slightly misleading name - it actually means "Stream has stopped + // due to an interruption or error", i.e. failure + partialAirbyteMessage.trace?.streamStatus?.let { + val isTerminalStatus = + it.status == AirbyteStreamStatus.COMPLETE || + it.status == AirbyteStreamStatus.INCOMPLETE + if (isTerminalStatus) { + val conflictsWithExistingStatus = + terminalStatusesFromSource.containsKey(it.streamDescriptor) && + terminalStatusesFromSource[it.streamDescriptor] != it.status + if (conflictsWithExistingStatus) { + throw IllegalStateException( + "Received conflicting stream statuses for stream ${it.streamDescriptor.namespace}.${it.streamDescriptor.name}" + ) + } + terminalStatusesFromSource[it.streamDescriptor] = it.status + } + } } + else -> {} } bufferEnqueue.addRecord( partialAirbyteMessage, @@ -131,12 +164,18 @@ constructor( val streamSyncSummaries = streamNames.associate { streamDescriptor -> + // If we didn't receive a stream status message, assume success. + // Platform won't send us any stream status messages yet (since we're not declaring + // supportsRefresh in metadata), so we will always hit this case. + val terminalStatusFromSource = + terminalStatusesFromSource[streamDescriptor] ?: AirbyteStreamStatus.COMPLETE StreamDescriptorUtils.withDefaultNamespace( streamDescriptor, bufferManager.defaultNamespace, ) to StreamSyncSummary( - Optional.of(getRecordCounter(streamDescriptor).get()), + getRecordCounter(streamDescriptor).get(), + terminalStatusFromSource, ) } onClose.accept(hasFailed, streamSyncSummaries) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueue.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueue.kt index 0e292df46331..b82cba4fbb42 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueue.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueue.kt @@ -33,10 +33,14 @@ class BufferEnqueue( message: PartialAirbyteMessage, sizeInBytes: Int, ) { - if (message.type == AirbyteMessage.Type.RECORD) { - handleRecord(message, sizeInBytes) - } else if (message.type == AirbyteMessage.Type.STATE) { - stateManager.trackState(message, sizeInBytes.toLong()) + when (message.type) { + AirbyteMessage.Type.RECORD -> { + handleRecord(message, sizeInBytes) + } + AirbyteMessage.Type.STATE -> { + stateManager.trackState(message, sizeInBytes.toLong()) + } + else -> {} } } @@ -44,7 +48,7 @@ class BufferEnqueue( message: PartialAirbyteMessage, sizeInBytes: Int, ) { - val streamDescriptor = extractStateFromRecord(message) + val streamDescriptor = extractStreamDescriptorFromRecord(message) val queue = buffers.computeIfAbsent( streamDescriptor, @@ -87,7 +91,9 @@ class BufferEnqueue( } companion object { - private fun extractStateFromRecord(message: PartialAirbyteMessage): StreamDescriptor { + private fun extractStreamDescriptorFromRecord( + message: PartialAirbyteMessage + ): StreamDescriptor { return StreamDescriptor() .withNamespace(message.record?.namespace) .withName(message.record?.stream) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/deser/AirbyteMessageDeserializer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/deser/AirbyteMessageDeserializer.kt index ffca265970cc..0c4918bb1ff2 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/deser/AirbyteMessageDeserializer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/deser/AirbyteMessageDeserializer.kt @@ -6,6 +6,9 @@ package io.airbyte.cdk.integrations.destination.async.deser import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.AirbyteMessage +import io.github.oshai.kotlinlogging.KotlinLogging + +private val logger = KotlinLogging.logger {} class AirbyteMessageDeserializer( private val dataTransformer: StreamAwareDataTransformer = IdentityDataTransformer(), @@ -53,8 +56,8 @@ class AirbyteMessageDeserializer( partial.record?.data = null } else if (AirbyteMessage.Type.STATE == msgType) { partial.withSerialized(message) - } else { - throw RuntimeException(String.format("Unsupported message type: %s", msgType)) + } else if (AirbyteMessage.Type.TRACE != msgType) { + logger.warn { "Unsupported message type: $msgType" } } return partial diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteMessage.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteMessage.kt index 8ae021c6ca8c..16d5871709fd 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteMessage.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteMessage.kt @@ -7,6 +7,7 @@ package io.airbyte.cdk.integrations.destination.async.model import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.annotation.JsonPropertyDescription import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.AirbyteTraceMessage import java.util.Objects class PartialAirbyteMessage { @@ -26,6 +27,12 @@ class PartialAirbyteMessage { @JsonProperty("state") var state: PartialAirbyteStateMessage? = null + @get:JsonProperty("trace") + @set:JsonProperty("trace") + @JsonProperty("trace") + // These messages don't contain arbitrary blobs, so just directly reference the protocol struct. + var trace: AirbyteTraceMessage? = null + /** * For record messages, this stores the serialized data blob (i.e. * `Jsons.serialize(message.getRecord().getData())`). For state messages, this stores the diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.kt index ccf15b902710..5bf1922d47ce 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.kt @@ -286,7 +286,8 @@ internal constructor( * hasFailed=false, then it could be full success. if hasFailed=true, then going for partial * success. */ - onClose.accept(false, null) + // TODO what to do here? + onClose.accept(false, HashMap()) } stateManager.listCommitted()!!.forEach(outputRecordCollector) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/OnCloseFunction.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/OnCloseFunction.kt index ff3b23b2352b..7a3c5bd30712 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/OnCloseFunction.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/OnCloseFunction.kt @@ -5,20 +5,21 @@ package io.airbyte.cdk.integrations.destination.buffered_stream_consumer import io.airbyte.cdk.integrations.destination.StreamSyncSummary -import io.airbyte.commons.functional.CheckedBiConsumer import io.airbyte.protocol.models.v0.StreamDescriptor /** * Interface allowing destination to specify clean up logic that must be executed after all * record-related logic has finished. * - * The map of StreamSyncSummaries MUST be non-null, but MAY be empty. Streams not present in the map - * will be treated as equivalent to [StreamSyncSummary.DEFAULT]. - * * The @JvmSuppressWildcards is here so that the 2nd parameter of accept stays a java * Map rather than becoming a Map */ -fun interface OnCloseFunction : - CheckedBiConsumer< - Boolean, @JvmSuppressWildcards Map, Exception> +fun interface OnCloseFunction { + @JvmSuppressWildcards + @Throws(Exception::class) + fun accept( + hasFailed: Boolean, + streamSyncSummaries: Map, + ) +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index b0b8584ae871..503969f8df66 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.37.0 +version=0.37.1 diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt index e2e235d350aa..f21d0ca6115a 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt @@ -6,6 +6,7 @@ package io.airbyte.cdk.integrations.destination.async import com.fasterxml.jackson.databind.JsonNode import edu.umd.cs.findbugs.annotations.SuppressFBWarnings +import io.airbyte.cdk.integrations.destination.StreamSyncSummary import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager import io.airbyte.cdk.integrations.destination.async.deser.AirbyteMessageDeserializer import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer @@ -25,6 +26,8 @@ import io.airbyte.protocol.models.v0.AirbyteRecordMessage import io.airbyte.protocol.models.v0.AirbyteStateMessage import io.airbyte.protocol.models.v0.AirbyteStateStats import io.airbyte.protocol.models.v0.AirbyteStreamState +import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage +import io.airbyte.protocol.models.v0.AirbyteTraceMessage import io.airbyte.protocol.models.v0.CatalogHelpers import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.StreamDescriptor @@ -44,15 +47,20 @@ import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Test -import org.mockito.ArgumentMatchers +import org.junit.jupiter.api.assertDoesNotThrow +import org.mockito.ArgumentCaptor import org.mockito.Mockito +import org.mockito.kotlin.any +import org.mockito.kotlin.capture class AsyncStreamConsumerTest { companion object { private const val RECORD_SIZE_20_BYTES = 20 private const val SCHEMA_NAME = "public" + private const val DEFAULT_NAMESPACE = "default_ns" private const val STREAM_NAME = "id_and_name" private const val STREAM_NAME2 = STREAM_NAME + 2 + private const val STREAM_NAME3 = "stream_with_no_namespace" private val STREAM1_DESC: StreamDescriptor = StreamDescriptor().withNamespace(SCHEMA_NAME).withName(STREAM_NAME) @@ -72,6 +80,12 @@ class AsyncStreamConsumerTest { Field.of("id", JsonSchemaType.NUMBER), Field.of("name", JsonSchemaType.STRING), ), + CatalogHelpers.createConfiguredAirbyteStream( + STREAM_NAME3, + null, + Field.of("id", JsonSchemaType.NUMBER), + Field.of("name", JsonSchemaType.STRING), + ), ), ) @@ -113,6 +127,68 @@ class AsyncStreamConsumerTest { .withStreamState(Jsons.jsonNode(2)), ), ) + + private val STREAM1_SUCCESS_MESSAGE = + Jsons.serialize( + AirbyteMessage() + .withType(AirbyteMessage.Type.TRACE) + .withTrace( + AirbyteTraceMessage() + .withType(AirbyteTraceMessage.Type.STREAM_STATUS) + .withStreamStatus( + AirbyteStreamStatusTraceMessage() + .withStreamDescriptor( + StreamDescriptor() + .withName(STREAM_NAME) + .withNamespace(SCHEMA_NAME), + ) + .withStatus( + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE + ), + ), + ), + ) + private val STREAM2_FAILURE_MESSAGE = + Jsons.serialize( + AirbyteMessage() + .withType(AirbyteMessage.Type.TRACE) + .withTrace( + AirbyteTraceMessage() + .withType(AirbyteTraceMessage.Type.STREAM_STATUS) + .withStreamStatus( + AirbyteStreamStatusTraceMessage() + .withStreamDescriptor( + StreamDescriptor() + .withName(STREAM_NAME2) + .withNamespace(SCHEMA_NAME), + ) + .withStatus( + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus + .INCOMPLETE + ), + ), + ), + ) + private val STREAM3_SUCCESS_MESSAGE = + Jsons.serialize( + AirbyteMessage() + .withType(AirbyteMessage.Type.TRACE) + .withTrace( + AirbyteTraceMessage() + .withType(AirbyteTraceMessage.Type.STREAM_STATUS) + .withStreamStatus( + AirbyteStreamStatusTraceMessage() + .withStreamDescriptor( + StreamDescriptor() + // Note: no namespace. + .withName(STREAM_NAME3), + ) + .withStatus( + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE + ), + ), + ), + ) } private lateinit var consumer: AsyncStreamConsumer @@ -144,7 +220,7 @@ class AsyncStreamConsumerTest { onClose = onClose, onFlush = flushFunction, catalog = CATALOG, - bufferManager = BufferManager("default_ns"), + bufferManager = BufferManager(DEFAULT_NAMESPACE), flushFailure = flushFailure, airbyteMessageDeserializer = airbyteMessageDeserializer, workerPool = Executors.newFixedThreadPool(5), @@ -153,6 +229,31 @@ class AsyncStreamConsumerTest { Mockito.`when`(flushFunction.optimalBatchSizeBytes).thenReturn(10000L) } + /** + * This test verifies that we don't throw on unusual message types. + * + * TODO Add a test for completely unrecognized message type (i.e. not present in + * [AirbyteMessage.Type]). Currently Jackson fails to deser that. + */ + @Test + @Throws(Exception::class) + internal fun testAcceptUnexpectedMessage() { + val weirdMessage = + Jsons.serialize( + AirbyteMessage() + .withType(AirbyteMessage.Type.LOG) + .withLog( + AirbyteLogMessage() + .withLevel(AirbyteLogMessage.Level.INFO) + .withMessage("foo") + .withStackTrace("bar"), + ), + ) + + consumer.start() + assertDoesNotThrow { consumer.accept(weirdMessage, weirdMessage.length) } + } + @Test @Throws(Exception::class) internal fun test1StreamWith1State() { @@ -375,20 +476,6 @@ class AsyncStreamConsumerTest { assertEquals(emptyMap.toString(), partial.serialized) } - @Test - internal fun deserializeAirbyteMessageWithNoStateOrRecord() { - val airbyteMessage = - AirbyteMessage().withType(AirbyteMessage.Type.LOG).withLog(AirbyteLogMessage()) - val serializedAirbyteMessage = Jsons.serialize(airbyteMessage) - assertThrows( - RuntimeException::class.java, - ) { - airbyteMessageDeserializer.deserializeAirbyteMessage( - serializedAirbyteMessage, - ) - } - } - @Test internal fun deserializeAirbyteMessageWithAirbyteState() { val serializedAirbyteMessage = Jsons.serialize(STATE_MESSAGE1) @@ -400,28 +487,78 @@ class AsyncStreamConsumerTest { } @Test - internal fun deserializeAirbyteMessageWithBadAirbyteState() { - val badState = - AirbyteMessage() - .withState( - AirbyteStateMessage() - .withType(AirbyteStateMessage.AirbyteStateType.STREAM) - .withStream( - AirbyteStreamState() - .withStreamDescriptor( - STREAM1_DESC, - ) - .withStreamState(Jsons.jsonNode(1)), - ), - ) - val serializedAirbyteMessage = Jsons.serialize(badState) - assertThrows( - RuntimeException::class.java, - ) { - airbyteMessageDeserializer.deserializeAirbyteMessage( - serializedAirbyteMessage, - ) - } + @Throws(Exception::class) + internal fun testStreamStatusHandling() { + val expectedRecords = generateRecords(1000) + + consumer.start() + consumeRecords(consumer, expectedRecords) + consumer.accept(STREAM1_SUCCESS_MESSAGE, STREAM1_SUCCESS_MESSAGE.length) + consumer.accept(STREAM2_FAILURE_MESSAGE, STREAM2_FAILURE_MESSAGE.length) + consumer.accept(STREAM3_SUCCESS_MESSAGE, STREAM3_SUCCESS_MESSAGE.length) + consumer.close() + + val captor: ArgumentCaptor> = + ArgumentCaptor.captor() + Mockito.verify(onClose).accept(any(), capture(captor)) + assertEquals( + mapOf( + StreamDescriptor().withNamespace(SCHEMA_NAME).withName(STREAM_NAME) to + StreamSyncSummary( + expectedRecords.size.toLong(), + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE, + ), + StreamDescriptor().withNamespace(SCHEMA_NAME).withName(STREAM_NAME2) to + StreamSyncSummary( + 0, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.INCOMPLETE, + ), + // Note that we set the default namespace, since the original namespace was null. + StreamDescriptor().withNamespace(DEFAULT_NAMESPACE).withName(STREAM_NAME3) to + StreamSyncSummary( + 0, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE, + ), + ), + captor.value, + ) + } + + @Test + @Throws(Exception::class) + internal fun testDefaultStreamStatusHandling() { + val expectedRecords = generateRecords(1000) + + consumer.start() + consumeRecords(consumer, expectedRecords) + // Note: no stream status messages + consumer.close() + + val captor: ArgumentCaptor> = + ArgumentCaptor.captor() + Mockito.verify(onClose).accept(any(), capture(captor)) + assertEquals( + // All streams have a COMPLETE status. + // TODO: change this to INCOMPLETE after we switch the default behavior. + mapOf( + StreamDescriptor().withNamespace(SCHEMA_NAME).withName(STREAM_NAME) to + StreamSyncSummary( + expectedRecords.size.toLong(), + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE, + ), + StreamDescriptor().withNamespace(SCHEMA_NAME).withName(STREAM_NAME2) to + StreamSyncSummary( + 0, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE, + ), + StreamDescriptor().withNamespace(DEFAULT_NAMESPACE).withName(STREAM_NAME3) to + StreamSyncSummary( + 0, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE, + ), + ), + captor.value, + ) } @Nested @@ -526,7 +663,7 @@ class AsyncStreamConsumerTest { @Throws(Exception::class) private fun verifyStartAndClose() { Mockito.verify(onStart).call() - Mockito.verify(onClose).accept(ArgumentMatchers.any(), ArgumentMatchers.any()) + Mockito.verify(onClose).accept(any(), any()) } @Throws(Exception::class) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt index b92f52611175..d3533a194bfe 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt @@ -10,6 +10,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.DestinationIniti import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus import io.airbyte.protocol.models.v0.DestinationSyncMode import io.github.oshai.kotlinlogging.KotlinLogging import java.util.Optional @@ -68,6 +69,10 @@ abstract class AbstractStreamOperation return prepareFinalTableForOverwrite(initialStatus) DestinationSyncMode.APPEND, DestinationSyncMode.APPEND_DEDUP -> { @@ -128,11 +133,23 @@ abstract class AbstractStreamOperation 0 }.orElse(true) || - (initialRawTableStatus.hasUnprocessedRecords && isNotOverwriteSync) + // Non-overwrite syncs should T+D regardless of status, + // so the user sees progress after every attempt. + // But overwrite syncs should only run T+D if the stream was successful + // (since we're T+Ding into a temp final table anyway). + val streamStatusRequiresTd = + isNotOverwriteSync || syncSummary.terminalStatus == AirbyteStreamStatus.COMPLETE + val shouldRunTypingDeduping: Boolean = + if (streamStatusRequiresTd) { + // Legacy logic that if recordsWritten or not tracked then it could be non-zero. + // But for OVERWRITE syncs, we don't need to look at old records. + val hasRecordsNeedingTd = + syncSummary.recordsWritten > 0 || + (isNotOverwriteSync && initialRawTableStatus.hasUnprocessedRecords) + hasRecordsNeedingTd + } else { + false + } if (!shouldRunTypingDeduping) { log.info { "Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} " + @@ -150,13 +167,13 @@ abstract class AbstractStreamOperation( return@filter false } // Skip if we don't have any records for this stream. - val streamSyncSummary = - streamSyncSummaries.getOrDefault( - streamConfig.id.asStreamDescriptor(), - StreamSyncSummary.DEFAULT - ) - val nonzeroRecords = - streamSyncSummary.recordsWritten - .map { r: Long -> - r > 0 - } // If we didn't track record counts during the sync, assume we had nonzero - // records for this stream - .orElse(true) + val streamSyncSummary = streamSyncSummaries[streamConfig.id.asStreamDescriptor()]!! + val nonzeroRecords = streamSyncSummary.recordsWritten > 0 val unprocessedRecordsPreexist = initialRawTableStateByStream[streamConfig.id]!!.hasUnprocessedRecords // If this sync emitted records, or the previous sync left behind some unprocessed diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduper.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduper.kt index 305ecdb51181..20dc8ea541d3 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduper.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduper.kt @@ -70,9 +70,7 @@ interface TyperDeduper { * * @param streamSyncSummaries Information about what happened during the sync. Implementations * SHOULD use this information to skip T+D when possible (this is not a requirement for - * correctness, but does allow us to save time/money). This parameter MUST NOT be null. Streams - * MAY be omitted, which will be treated as though they were mapped to - * [StreamSyncSummary.DEFAULT]. + * correctness, but does allow us to save time/money). This parameter MUST NOT be null. */ @Throws(Exception::class) fun typeAndDedupe(streamSyncSummaries: Map) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt index 08eefecb96d6..dd0c3922be04 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt @@ -14,6 +14,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableS import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig import io.airbyte.integrations.base.destination.typing_deduping.StreamId import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus import io.airbyte.protocol.models.v0.DestinationSyncMode import io.mockk.checkUnnecessaryStub import io.mockk.clearMocks @@ -29,7 +30,6 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.MethodSource -import org.junit.jupiter.params.provider.ValueSource /** * Verify that [AbstractStreamOperation] behaves correctly, given various initial states. We @@ -103,7 +103,10 @@ class AbstractStreamOperationTest { confirmVerified(storageOperation) clearMocks(storageOperation) - streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + streamOperations.finalizeTable( + streamConfig, + StreamSyncSummary(42, AirbyteStreamStatus.COMPLETE) + ) verifySequence { storageOperation.cleanupStage(streamId) @@ -147,7 +150,10 @@ class AbstractStreamOperationTest { confirmVerified(storageOperation) clearMocks(storageOperation) - streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + streamOperations.finalizeTable( + streamConfig, + StreamSyncSummary(42, AirbyteStreamStatus.COMPLETE) + ) verifySequence { storageOperation.cleanupStage(streamId) @@ -189,7 +195,10 @@ class AbstractStreamOperationTest { confirmVerified(storageOperation) clearMocks(storageOperation) - streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + streamOperations.finalizeTable( + streamConfig, + StreamSyncSummary(42, AirbyteStreamStatus.COMPLETE) + ) verifySequence { storageOperation.cleanupStage(streamId) @@ -229,7 +238,10 @@ class AbstractStreamOperationTest { confirmVerified(storageOperation) clearMocks(storageOperation) - streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + streamOperations.finalizeTable( + streamConfig, + StreamSyncSummary(42, AirbyteStreamStatus.COMPLETE) + ) verifySequence { storageOperation.cleanupStage(streamId) @@ -248,17 +260,45 @@ class AbstractStreamOperationTest { ) } - @ParameterizedTest - @ValueSource(booleans = [true, false]) - fun existingNonEmptyTableNoNewRecords(hasUnprocessedRecords: Boolean) { + @Test + fun existingNonEmptyTableStatusIncomplete() { + val initialState = + mockk> { + every { streamConfig } returns this@Overwrite.streamConfig + every { initialRawTableStatus } returns mockk() + every { isFinalTablePresent } returns true + every { isFinalTableEmpty } returns false + every { + destinationState.withSoftReset(any()) + } returns destinationState + } + + val streamOperations = TestStreamOperation(storageOperation, initialState) + // No point in verifying setup, completely identical to existingNonEmptyTable + clearMocks(storageOperation) + streamOperations.finalizeTable( + streamConfig, + StreamSyncSummary(42, AirbyteStreamStatus.INCOMPLETE) + ) + + verifySequence { + storageOperation.cleanupStage(streamId) + // Don't run T+D, and don't overwrite the final table. + } + confirmVerified(storageOperation) + checkUnnecessaryStub( + initialState, + initialState.initialRawTableStatus, + initialState.destinationState + ) + } + + @Test + fun existingNonEmptyTableNoNewRecords() { val initialState = mockk> { every { streamConfig } returns this@Overwrite.streamConfig every { initialRawTableStatus } returns mockk() - // This is an overwrite sync, so we can ignore the old raw records. - // We should skip T+D if the current sync emitted 0 records. - every { initialRawTableStatus.hasUnprocessedRecords } returns - hasUnprocessedRecords every { isFinalTablePresent } returns true every { isFinalTableEmpty } returns false every { @@ -275,7 +315,10 @@ class AbstractStreamOperationTest { confirmVerified(storageOperation) clearMocks(storageOperation) - streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(0))) + streamOperations.finalizeTable( + streamConfig, + StreamSyncSummary(0, AirbyteStreamStatus.COMPLETE) + ) verifySequence { storageOperation.cleanupStage(streamId) @@ -316,7 +359,10 @@ class AbstractStreamOperationTest { confirmVerified(storageOperation) clearMocks(storageOperation) - streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + streamOperations.finalizeTable( + streamConfig, + StreamSyncSummary(42, AirbyteStreamStatus.COMPLETE) + ) verifySequence { storageOperation.cleanupStage(streamId) @@ -359,7 +405,10 @@ class AbstractStreamOperationTest { confirmVerified(storageOperation) clearMocks(storageOperation) - streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + streamOperations.finalizeTable( + streamConfig, + StreamSyncSummary(42, AirbyteStreamStatus.COMPLETE) + ) verifySequence { storageOperation.cleanupStage(streamId) @@ -400,7 +449,10 @@ class AbstractStreamOperationTest { confirmVerified(storageOperation) clearMocks(storageOperation) - streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + streamOperations.finalizeTable( + streamConfig, + StreamSyncSummary(42, AirbyteStreamStatus.COMPLETE) + ) verifySequence { storageOperation.cleanupStage(streamId) @@ -437,7 +489,10 @@ class AbstractStreamOperationTest { confirmVerified(storageOperation) clearMocks(storageOperation) - streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + streamOperations.finalizeTable( + streamConfig, + StreamSyncSummary(42, AirbyteStreamStatus.COMPLETE) + ) verifySequence { storageOperation.cleanupStage(streamId) @@ -488,7 +543,10 @@ class AbstractStreamOperationTest { confirmVerified(storageOperation) clearMocks(storageOperation) - streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(0))) + streamOperations.finalizeTable( + streamConfig, + StreamSyncSummary(0, AirbyteStreamStatus.COMPLETE) + ) verifySequence { storageOperation.cleanupStage(streamId) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt index d9c3a48f80eb..762d246babd1 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt @@ -15,6 +15,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig import io.airbyte.integrations.base.destination.typing_deduping.StreamId import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus import io.airbyte.protocol.models.v0.DestinationSyncMode import io.mockk.clearMocks import io.mockk.confirmVerified @@ -140,12 +141,18 @@ class DefaultSyncOperationTest { streamOperations.values.onEach { clearMocks(it) } syncOperation.finalizeStreams( - mapOf(appendStreamConfig.id.asStreamDescriptor() to StreamSyncSummary(Optional.of(42))) + mapOf( + appendStreamConfig.id.asStreamDescriptor() to + StreamSyncSummary(42, AirbyteStreamStatus.COMPLETE) + ) ) verify(exactly = 1) { streamOperations.values.onEach { - it.finalizeTable(appendStreamConfig, StreamSyncSummary(Optional.of(42))) + it.finalizeTable( + appendStreamConfig, + StreamSyncSummary(42, AirbyteStreamStatus.COMPLETE) + ) } } confirmVerified(destinationHandler) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt index f681d9be3be3..a9e38fb18d9b 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt @@ -9,11 +9,11 @@ import io.airbyte.integrations.base.destination.typing_deduping.Sql.Companion.of import io.airbyte.integrations.base.destination.typing_deduping.Sql.Companion.separately import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus import io.airbyte.protocol.models.v0.DestinationSyncMode import io.airbyte.protocol.models.v0.StreamDescriptor import java.time.Instant import java.util.* -import java.util.Map import java.util.function.Consumer import kotlin.collections.HashMap import kotlin.collections.List @@ -481,18 +481,27 @@ class DefaultTyperDeduperTest { Mockito.clearInvocations(destinationHandler) typerDeduper.typeAndDedupe( - Map.of( - StreamDescriptor().withName("overwrite_stream").withNamespace("overwrite_ns"), - StreamSyncSummary(Optional.of(0L)), - StreamDescriptor().withName("append_stream").withNamespace("append_ns"), - StreamSyncSummary(Optional.of(1L)) - ) + mapOf( + StreamDescriptor().withName("overwrite_stream").withNamespace("overwrite_ns") to + StreamSyncSummary( + 0, + AirbyteStreamStatus.COMPLETE, + ), + StreamDescriptor().withName("append_stream").withNamespace("append_ns") to + StreamSyncSummary( + 1, + AirbyteStreamStatus.COMPLETE, + ), + StreamDescriptor().withName("dedup_stream").withNamespace("dedup_ns") to + StreamSyncSummary( + 1, + AirbyteStreamStatus.COMPLETE, + ), + ), ) // append_stream and dedup_stream should be T+D-ed. overwrite_stream has explicitly 0 - // records, but - // dedup_stream - // is missing from the map, so implicitly has nonzero records. + // records, so skip it in T+D. Mockito.verify(destinationHandler) .execute(of("UPDATE TABLE append_ns.append_stream WITHOUT SAFER CASTING")) Mockito.verify(destinationHandler) @@ -526,12 +535,23 @@ class DefaultTyperDeduperTest { Mockito.clearInvocations(destinationHandler) typerDeduper.typeAndDedupe( - Map.of( - StreamDescriptor().withName("overwrite_stream").withNamespace("overwrite_ns"), - StreamSyncSummary(Optional.of(0L)), - StreamDescriptor().withName("append_stream").withNamespace("append_ns"), - StreamSyncSummary(Optional.of(1L)) - ) + mapOf( + StreamDescriptor().withName("overwrite_stream").withNamespace("overwrite_ns") to + StreamSyncSummary( + 0, + AirbyteStreamStatus.COMPLETE, + ), + StreamDescriptor().withName("append_stream").withNamespace("append_ns") to + StreamSyncSummary( + 0, + AirbyteStreamStatus.COMPLETE, + ), + StreamDescriptor().withName("dedup_stream").withNamespace("dedup_ns") to + StreamSyncSummary( + 0, + AirbyteStreamStatus.COMPLETE, + ), + ), ) Mockito.verify(destinationHandler) @@ -610,14 +630,11 @@ class DefaultTyperDeduperTest { Mockito.verify(destinationHandler).execute(of("MIGRATE airbyte_internal.dedup_stream")) Mockito.verify(destinationHandler) .commitDestinationStates( - Map.of( - OVERWRITE_STREAM_CONFIG.id, - MockState(true, true, true), - APPEND_STREAM_CONFIG.id, - MockState(true, true, true), - DEDUPE_STREAM_CONFIG.id, - MockState(true, true, true) - ) + mapOf( + OVERWRITE_STREAM_CONFIG.id to MockState(true, true, true), + APPEND_STREAM_CONFIG.id to MockState(true, true, true), + DEDUPE_STREAM_CONFIG.id to MockState(true, true, true), + ), ) Mockito.verify(destinationHandler).gatherInitialState(any()) Mockito.verify(destinationHandler) @@ -662,14 +679,11 @@ class DefaultTyperDeduperTest { // And we should commit the states. Note that we now set needsSoftReset=false. Mockito.verify(destinationHandler) .commitDestinationStates( - Map.of( - OVERWRITE_STREAM_CONFIG.id, - MockState(false, true, true), - APPEND_STREAM_CONFIG.id, - MockState(false, true, true), - DEDUPE_STREAM_CONFIG.id, - MockState(false, true, true) - ) + mapOf( + OVERWRITE_STREAM_CONFIG.id to MockState(false, true, true), + APPEND_STREAM_CONFIG.id to MockState(false, true, true), + DEDUPE_STREAM_CONFIG.id to MockState(false, true, true), + ), ) Mockito.verifyNoMoreInteractions(destinationHandler) @@ -728,14 +742,11 @@ class DefaultTyperDeduperTest { Mockito.verify(destinationHandler).execute(of("MIGRATE airbyte_internal.dedup_stream")) Mockito.verify(destinationHandler) .commitDestinationStates( - Map.of( - OVERWRITE_STREAM_CONFIG.id, - MockState(true, true, true), - APPEND_STREAM_CONFIG.id, - MockState(true, true, true), - DEDUPE_STREAM_CONFIG.id, - MockState(true, true, true) - ) + mapOf( + OVERWRITE_STREAM_CONFIG.id to MockState(true, true, true), + APPEND_STREAM_CONFIG.id to MockState(true, true, true), + DEDUPE_STREAM_CONFIG.id to MockState(true, true, true), + ), ) Mockito.verify(destinationHandler).gatherInitialState(any()) Mockito.verify(destinationHandler) @@ -780,14 +791,11 @@ class DefaultTyperDeduperTest { // And we should commit the states. Mockito.verify(destinationHandler) .commitDestinationStates( - Map.of( - OVERWRITE_STREAM_CONFIG.id, - MockState(false, true, true), - APPEND_STREAM_CONFIG.id, - MockState(false, true, true), - DEDUPE_STREAM_CONFIG.id, - MockState(false, true, true) - ) + mapOf( + OVERWRITE_STREAM_CONFIG.id to MockState(false, true, true), + APPEND_STREAM_CONFIG.id to MockState(false, true, true), + DEDUPE_STREAM_CONFIG.id to MockState(false, true, true), + ), ) Mockito.verifyNoMoreInteractions(destinationHandler) @@ -837,14 +845,11 @@ class DefaultTyperDeduperTest { // it. Mockito.verify(destinationHandler) .commitDestinationStates( - Map.of( - OVERWRITE_STREAM_CONFIG.id, - MockState(true, false, false), - APPEND_STREAM_CONFIG.id, - MockState(true, false, false), - DEDUPE_STREAM_CONFIG.id, - MockState(true, false, false) - ) + mapOf( + OVERWRITE_STREAM_CONFIG.id to MockState(true, false, false), + APPEND_STREAM_CONFIG.id to MockState(true, false, false), + DEDUPE_STREAM_CONFIG.id to MockState(true, false, false), + ), ) Mockito.verify(destinationHandler).gatherInitialState(any()) Mockito.verify(destinationHandler) @@ -889,14 +894,11 @@ class DefaultTyperDeduperTest { // And we should commit the states. Note that we now set needsSoftReset=false. Mockito.verify(destinationHandler) .commitDestinationStates( - Map.of( - OVERWRITE_STREAM_CONFIG.id, - MockState(false, false, false), - APPEND_STREAM_CONFIG.id, - MockState(false, false, false), - DEDUPE_STREAM_CONFIG.id, - MockState(false, false, false) - ) + mapOf( + OVERWRITE_STREAM_CONFIG.id to MockState(false, false, false), + APPEND_STREAM_CONFIG.id to MockState(false, false, false), + DEDUPE_STREAM_CONFIG.id to MockState(false, false, false), + ), ) Mockito.verifyNoMoreInteractions(destinationHandler)