From a78647eb595aa5435f033afc38cd487ca8644fe5 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 10 Jun 2024 09:40:30 -0700 Subject: [PATCH] Destinations CDK: CatalogParser sets default namespace (#38121) --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../destination/async/AsyncStreamConsumer.kt | 41 ++++--------------- .../async/StreamDescriptorUtils.kt | 7 ++++ .../async/buffers/BufferEnqueue.kt | 23 ++++++++--- .../async/buffers/BufferManager.kt | 7 +++- .../async/state/GlobalAsyncStateManager.kt | 31 +------------- .../util/ConfiguredCatalogUtil.kt | 24 ----------- .../src/main/resources/version.properties | 2 +- .../async/AsyncStreamConsumerTest.kt | 9 ++-- .../async/buffers/BufferDequeueTest.kt | 35 ++++++---------- .../async/buffers/BufferEnqueueTest.kt | 9 ++-- .../state/GlobalAsyncStateManagerTest.kt | 40 +++++++++--------- .../jdbc/AbstractJdbcDestination.kt | 20 +-------- .../jdbc/JdbcBufferedConsumerFactory.kt | 5 +-- .../staging/StagingConsumerFactory.kt | 9 ++-- .../typing_deduping/CatalogParser.kt | 16 +++++++- .../typing_deduping/CatalogParserTest.kt | 17 +++++++- .../BaseSqlGeneratorIntegrationTest.kt | 4 +- 18 files changed, 123 insertions(+), 177 deletions(-) delete mode 100644 airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConfiguredCatalogUtil.kt diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 7a358625a339..1a3756836748 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,6 +174,7 @@ corresponds to that version. | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.37.0 | 2024-06-10 | [\#38121](https://github.com/airbytehq/airbyte/pull/38121) | Destinations: Set default namespace via CatalogParser | | 0.36.8 | 2024-06-07 | [\#38763](https://github.com/airbytehq/airbyte/pull/38763) | Increase Jackson message length limit | | 0.36.7 | 2024-06-06 | [\#39220](https://github.com/airbytehq/airbyte/pull/39220) | Handle null messages in ConnectorExceptionUtil | | 0.36.6 | 2024-06-05 | [\#39106](https://github.com/airbytehq/airbyte/pull/39106) | Skip write to storage with 0 byte file | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt index 9d1d37386c14..a8852e7b93e9 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt @@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.destination.async import com.google.common.base.Preconditions -import com.google.common.base.Strings import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer import io.airbyte.cdk.integrations.destination.StreamSyncSummary import io.airbyte.cdk.integrations.destination.async.buffers.BufferEnqueue @@ -28,7 +27,6 @@ import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicLong import java.util.function.Consumer -import kotlin.jvm.optionals.getOrNull import org.jetbrains.annotations.VisibleForTesting private val logger = KotlinLogging.logger {} @@ -51,7 +49,6 @@ constructor( onFlush: DestinationFlushFunction, private val catalog: ConfiguredAirbyteCatalog, private val bufferManager: BufferManager, - private val defaultNamespace: Optional, private val flushFailure: FlushFailure = FlushFailure(), workerPool: ExecutorService = Executors.newFixedThreadPool(5), private val airbyteMessageDeserializer: AirbyteMessageDeserializer = @@ -79,28 +76,6 @@ constructor( private var hasClosed = false private var hasFailed = false - internal constructor( - outputRecordCollector: Consumer, - onStart: OnStartFunction, - onClose: OnCloseFunction, - flusher: DestinationFlushFunction, - catalog: ConfiguredAirbyteCatalog, - bufferManager: BufferManager, - flushFailure: FlushFailure, - defaultNamespace: Optional, - ) : this( - outputRecordCollector, - onStart, - onClose, - flusher, - catalog, - bufferManager, - defaultNamespace, - flushFailure, - Executors.newFixedThreadPool(5), - AirbyteMessageDeserializer(), - ) - @Throws(Exception::class) override fun start() { Preconditions.checkState(!hasStarted, "Consumer has already been started.") @@ -129,9 +104,6 @@ constructor( message, ) if (AirbyteMessage.Type.RECORD == partialAirbyteMessage.type) { - if (Strings.isNullOrEmpty(partialAirbyteMessage.record?.namespace)) { - partialAirbyteMessage.record?.namespace = defaultNamespace.getOrNull() - } validateRecord(partialAirbyteMessage) partialAirbyteMessage.record?.streamDescriptor?.let { @@ -141,7 +113,6 @@ constructor( bufferEnqueue.addRecord( partialAirbyteMessage, sizeInBytes + PARTIAL_DESERIALIZE_REF_BYTES, - defaultNamespace, ) } @@ -159,10 +130,14 @@ constructor( bufferManager.close() val streamSyncSummaries = - streamNames.associateWith { streamDescriptor: StreamDescriptor -> - StreamSyncSummary( - Optional.of(getRecordCounter(streamDescriptor).get()), - ) + streamNames.associate { streamDescriptor -> + StreamDescriptorUtils.withDefaultNamespace( + streamDescriptor, + bufferManager.defaultNamespace, + ) to + StreamSyncSummary( + Optional.of(getRecordCounter(streamDescriptor).get()), + ) } onClose.accept(hasFailed, streamSyncSummaries) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/StreamDescriptorUtils.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/StreamDescriptorUtils.kt index c1861b5bc21e..cd77fbcfb01a 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/StreamDescriptorUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/StreamDescriptorUtils.kt @@ -34,4 +34,11 @@ object StreamDescriptorUtils { return pairs } + + fun withDefaultNamespace(sd: StreamDescriptor, defaultNamespace: String) = + if (sd.namespace.isNullOrEmpty()) { + StreamDescriptor().withName(sd.name).withNamespace(defaultNamespace) + } else { + sd + } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueue.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueue.kt index 79b70e9da3fe..0e292df46331 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueue.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueue.kt @@ -7,9 +7,9 @@ package io.airbyte.cdk.integrations.destination.async.buffers import io.airbyte.cdk.integrations.destination.async.GlobalMemoryManager import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage import io.airbyte.cdk.integrations.destination.async.state.GlobalAsyncStateManager +import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.StreamDescriptor -import java.util.Optional import java.util.concurrent.ConcurrentMap /** @@ -20,6 +20,7 @@ class BufferEnqueue( private val memoryManager: GlobalMemoryManager, private val buffers: ConcurrentMap, private val stateManager: GlobalAsyncStateManager, + private val defaultNamespace: String, ) { /** * Buffer a record. Contains memory management logic to dynamically adjust queue size based via @@ -31,12 +32,11 @@ class BufferEnqueue( fun addRecord( message: PartialAirbyteMessage, sizeInBytes: Int, - defaultNamespace: Optional, ) { if (message.type == AirbyteMessage.Type.RECORD) { handleRecord(message, sizeInBytes) } else if (message.type == AirbyteMessage.Type.STATE) { - stateManager.trackState(message, sizeInBytes.toLong(), defaultNamespace.orElse("")) + stateManager.trackState(message, sizeInBytes.toLong()) } } @@ -53,7 +53,20 @@ class BufferEnqueue( } val stateId = stateManager.getStateIdAndIncrementCounter(streamDescriptor) - var addedToQueue = queue.offer(message, sizeInBytes.toLong(), stateId) + // We don't set the default namespace until after putting this message into the state + // manager/etc. + // All our internal handling is on the true (null) namespace, + // we just set the default namespace when handing off to destination-specific code. + val mangledMessage = + if (message.record!!.namespace.isNullOrEmpty()) { + val clone = Jsons.clone(message) + clone.record!!.namespace = defaultNamespace + clone + } else { + message + } + + var addedToQueue = queue.offer(mangledMessage, sizeInBytes.toLong(), stateId) var i = 0 while (!addedToQueue) { @@ -61,7 +74,7 @@ class BufferEnqueue( if (newlyAllocatedMemory > 0) { queue.addMaxMemory(newlyAllocatedMemory) } - addedToQueue = queue.offer(message, sizeInBytes.toLong(), stateId) + addedToQueue = queue.offer(mangledMessage, sizeInBytes.toLong(), stateId) i++ if (i > 5) { try { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferManager.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferManager.kt index f762d90b4a2a..96cdedd0bf9d 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferManager.kt @@ -22,6 +22,11 @@ private val logger = KotlinLogging.logger {} class BufferManager @JvmOverloads constructor( + /** + * This probably doesn't belong here, but it's the easiest place where both [BufferEnqueue] and + * [io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer] can both get to it. + */ + public val defaultNamespace: String, maxMemory: Long = (Runtime.getRuntime().maxMemory() * MEMORY_LIMIT_RATIO).toLong(), ) { @get:VisibleForTesting val buffers: ConcurrentMap @@ -46,7 +51,7 @@ constructor( memoryManager = GlobalMemoryManager(maxMemory) this.stateManager = GlobalAsyncStateManager(memoryManager) buffers = ConcurrentHashMap() - bufferEnqueue = BufferEnqueue(memoryManager, buffers, stateManager) + bufferEnqueue = BufferEnqueue(memoryManager, buffers, stateManager, defaultNamespace) bufferDequeue = BufferDequeue(memoryManager, buffers, stateManager) debugLoop = Executors.newSingleThreadScheduledExecutor() debugLoop.scheduleAtFixedRate( diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt index 97c8f6fbdb66..c959e1ce5247 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt @@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.destination.async.state import com.google.common.base.Preconditions -import com.google.common.base.Strings import io.airbyte.cdk.integrations.destination.async.GlobalMemoryManager import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage import io.airbyte.commons.json.Jsons @@ -104,7 +103,6 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) { fun trackState( message: PartialAirbyteMessage, sizeInBytes: Long, - defaultNamespace: String, ) { if (preState) { convertToGlobalIfNeeded(message) @@ -113,7 +111,7 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) { // stateType should not change after a conversion. Preconditions.checkArgument(stateType == extractStateType(message)) - closeState(message, sizeInBytes, defaultNamespace) + closeState(message, sizeInBytes) } /** @@ -323,10 +321,9 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) { private fun closeState( message: PartialAirbyteMessage, sizeInBytes: Long, - defaultNamespace: String, ) { val resolvedDescriptor: StreamDescriptor = - extractStream(message, defaultNamespace) + extractStream(message) .orElse( SENTINEL_GLOBAL_DESC, ) @@ -424,38 +421,14 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) { UUID.randomUUID().toString(), ) - /** - * If the user has selected the Destination Namespace as the Destination default while - * setting up the connector, the platform sets the namespace as null in the StreamDescriptor - * in the AirbyteMessages (both record and state messages). The destination checks that if - * the namespace is empty or null, if yes then re-populates it with the defaultNamespace. - * See [io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.accept] But - * destination only does this for the record messages. So when state messages arrive without - * a namespace and since the destination doesn't repopulate it with the default namespace, - * there is a mismatch between the StreamDescriptor from record messages and state messages. - * That breaks the logic of the state management class as [descToStateIdQ] needs to have - * consistent StreamDescriptor. This is why while trying to extract the StreamDescriptor - * from state messages, we check if the namespace is null, if yes then replace it with - * defaultNamespace to keep it consistent with the record messages. - */ private fun extractStream( message: PartialAirbyteMessage, - defaultNamespace: String, ): Optional { if ( message.state?.type != null && message.state?.type == AirbyteStateMessage.AirbyteStateType.STREAM ) { val streamDescriptor: StreamDescriptor? = message.state?.stream?.streamDescriptor - if (Strings.isNullOrEmpty(streamDescriptor?.namespace)) { - return Optional.of( - StreamDescriptor() - .withName( - streamDescriptor?.name, - ) - .withNamespace(defaultNamespace), - ) - } return streamDescriptor?.let { Optional.of(it) } ?: Optional.empty() } return Optional.empty() diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConfiguredCatalogUtil.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConfiguredCatalogUtil.kt deleted file mode 100644 index d33bbabb6280..000000000000 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConfiguredCatalogUtil.kt +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.integrations.util - -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog - -/** - * For streams in [catalog] which do not have a namespace specified, explicitly set their namespace - * to the [defaultNamespace] - */ -fun addDefaultNamespaceToStreams(catalog: ConfiguredAirbyteCatalog, defaultNamespace: String?) { - if (defaultNamespace == null) { - return - } - // TODO: This logic exists in all V2 destinations. - // This is sad that if we forget to add this, there will be a null pointer during parseCatalog - for (catalogStream in catalog.streams) { - if (catalogStream.stream.namespace.isNullOrEmpty()) { - catalogStream.stream.namespace = defaultNamespace - } - } -} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 2432e9d94401..b0b8584ae871 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.36.8 +version=0.37.0 diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt index 4d9e03cb450b..e2e235d350aa 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt @@ -31,7 +31,6 @@ import io.airbyte.protocol.models.v0.StreamDescriptor import java.io.IOException import java.math.BigDecimal import java.time.Instant -import java.util.Optional import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException @@ -60,7 +59,7 @@ class AsyncStreamConsumerTest { private val CATALOG: ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog() .withStreams( - java.util.List.of( + listOf( CatalogHelpers.createConfiguredAirbyteStream( STREAM_NAME, SCHEMA_NAME, @@ -145,9 +144,8 @@ class AsyncStreamConsumerTest { onClose = onClose, onFlush = flushFunction, catalog = CATALOG, - bufferManager = BufferManager(), + bufferManager = BufferManager("default_ns"), flushFailure = flushFailure, - defaultNamespace = Optional.of("default_ns"), airbyteMessageDeserializer = airbyteMessageDeserializer, workerPool = Executors.newFixedThreadPool(5), ) @@ -264,9 +262,8 @@ class AsyncStreamConsumerTest { Mockito.mock(OnCloseFunction::class.java), flushFunction, CATALOG, - BufferManager((1024 * 10).toLong()), + BufferManager("default_ns", (1024 * 10).toLong()), flushFailure, - Optional.of("default_ns"), ) Mockito.`when`(flushFunction.optimalBatchSizeBytes).thenReturn(0L) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeueTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeueTest.kt index 209676f5c102..2e5bdc85ebec 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeueTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeueTest.kt @@ -12,7 +12,6 @@ import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.StreamDescriptor import java.time.Instant import java.time.temporal.ChronoUnit -import java.util.Optional import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Test @@ -33,29 +32,25 @@ class BufferDequeueTest { internal inner class Take { @Test internal fun testTakeShouldBestEffortRead() { - val bufferManager = BufferManager() + val bufferManager = BufferManager(DEFAULT_NAMESPACE) val enqueue = bufferManager.bufferEnqueue val dequeue = bufferManager.bufferDequeue enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) // total size of records is 80, so we expect 50 to get us 2 records (prefer to @@ -77,24 +72,21 @@ class BufferDequeueTest { @Test internal fun testTakeShouldReturnAllIfPossible() { - val bufferManager = BufferManager() + val bufferManager = BufferManager(DEFAULT_NAMESPACE) val enqueue = bufferManager.bufferEnqueue val dequeue = bufferManager.bufferDequeue enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) try { @@ -108,19 +100,17 @@ class BufferDequeueTest { @Test internal fun testTakeFewerRecordsThanSizeLimitShouldNotError() { - val bufferManager = BufferManager() + val bufferManager = BufferManager(DEFAULT_NAMESPACE) val enqueue = bufferManager.bufferEnqueue val dequeue = bufferManager.bufferDequeue enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) try { @@ -135,12 +125,12 @@ class BufferDequeueTest { @Test internal fun testMetadataOperationsCorrect() { - val bufferManager = BufferManager() + val bufferManager = BufferManager(DEFAULT_NAMESPACE) val enqueue = bufferManager.bufferEnqueue val dequeue = bufferManager.bufferDequeue - enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) - enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) + enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES) + enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES) val secondStream = StreamDescriptor().withName("stream_2") val recordFromSecondStream = Jsons.clone(RECORD_MSG_20_BYTES) @@ -148,7 +138,6 @@ class BufferDequeueTest { enqueue.addRecord( recordFromSecondStream, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) Assertions.assertEquals(60, dequeue.totalGlobalQueueSizeBytes) @@ -169,7 +158,7 @@ class BufferDequeueTest { @Test internal fun testMetadataOperationsError() { - val bufferManager = BufferManager() + val bufferManager = BufferManager(DEFAULT_NAMESPACE) val dequeue = bufferManager.bufferDequeue val ghostStream = StreamDescriptor().withName("ghost stream") @@ -186,7 +175,7 @@ class BufferDequeueTest { @Test @Throws(Exception::class) internal fun cleansUpMemoryForEmptyQueues() { - val bufferManager = BufferManager() + val bufferManager = BufferManager(DEFAULT_NAMESPACE) val enqueue = bufferManager.bufferEnqueue val dequeue = bufferManager.bufferDequeue val memoryManager = bufferManager.memoryManager @@ -198,15 +187,15 @@ class BufferDequeueTest { ) // allocate a block for new stream - enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) + enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES) Assertions.assertEquals( 2 * GlobalMemoryManager.BLOCK_SIZE_BYTES, memoryManager.getCurrentMemoryBytes(), ) - enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) - enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) - enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) + enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES) + enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES) + enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES) // no re-allocates as we haven't breached block size Assertions.assertEquals( diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueueTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueueTest.kt index 01c859d31b3f..caf1f9fa6445 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueueTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueueTest.kt @@ -10,7 +10,6 @@ import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordM import io.airbyte.cdk.integrations.destination.async.state.GlobalAsyncStateManager import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.StreamDescriptor -import java.util.Optional import java.util.concurrent.ConcurrentHashMap import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test @@ -31,6 +30,7 @@ class BufferEnqueueTest { Mockito.mock( GlobalAsyncStateManager::class.java, ), + DEFAULT_NAMESPACE, ) val streamName = "stream" @@ -42,7 +42,7 @@ class BufferEnqueueTest { PartialAirbyteRecordMessage().withStream(streamName), ) - enqueue.addRecord(record, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) + enqueue.addRecord(record, RECORD_SIZE_20_BYTES) Assertions.assertEquals(1, streamToBuffer[stream]!!.size()) Assertions.assertEquals(20L, streamToBuffer[stream]!!.currentMemoryUsage) } @@ -58,6 +58,7 @@ class BufferEnqueueTest { Mockito.mock( GlobalAsyncStateManager::class.java, ), + DEFAULT_NAMESPACE, ) val streamName = "stream" @@ -69,8 +70,8 @@ class BufferEnqueueTest { PartialAirbyteRecordMessage().withStream(streamName), ) - enqueue.addRecord(record, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) - enqueue.addRecord(record, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) + enqueue.addRecord(record, RECORD_SIZE_20_BYTES) + enqueue.addRecord(record, RECORD_SIZE_20_BYTES) Assertions.assertEquals(2, streamToBuffer[stream]!!.size()) Assertions.assertEquals(40, streamToBuffer[stream]!!.currentMemoryUsage) } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManagerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManagerTest.kt index 896e6c04b408..3079d2bc2d06 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManagerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManagerTest.kt @@ -214,7 +214,7 @@ class GlobalAsyncStateManagerTest { emittedStatesFromDestination.associateWith { it.state?.destinationStats } assertEquals(0, stateWithStats.size) - stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.flushStates { e: AirbyteMessage -> emittedStatesFromDestination.add( e, @@ -256,7 +256,7 @@ class GlobalAsyncStateManagerTest { GlobalAsyncStateManager(GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)) // GLOBAL - stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.flushStates { e: AirbyteMessage -> emittedStatesFromDestination.add( e, @@ -286,7 +286,6 @@ class GlobalAsyncStateManagerTest { stateManager.trackState( STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, - DEFAULT_NAMESPACE, ) } } @@ -302,7 +301,7 @@ class GlobalAsyncStateManagerTest { val preConvertId2: Long = simulateIncomingRecords(STREAM3_DESC, 10, stateManager) assertEquals(3, setOf(preConvertId0, preConvertId1, preConvertId2).size) - stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE) // Since this is actually a global state, we can only flush after all streams are done. stateManager.decrement(preConvertId0, 10) @@ -350,7 +349,7 @@ class GlobalAsyncStateManagerTest { GlobalAsyncStateManager(GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)) val preConvertId0: Long = simulateIncomingRecords(STREAM1_DESC, 10, stateManager) - stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.decrement(preConvertId0, 10) stateManager.flushStates { e: AirbyteMessage -> emittedStatesFromDestination.add( @@ -377,7 +376,7 @@ class GlobalAsyncStateManagerTest { emittedStatesFromDestination.clear() val afterConvertId1: Long = simulateIncomingRecords(STREAM1_DESC, 10, stateManager) - stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE) stateManager.decrement(afterConvertId1, 10) stateManager.flushStates { e: AirbyteMessage -> emittedStatesFromDestination.add( @@ -408,7 +407,7 @@ class GlobalAsyncStateManagerTest { GlobalAsyncStateManager(GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)) val preConvertId0: Long = simulateIncomingRecords(STREAM1_DESC, 10, stateManager) - stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.decrement(preConvertId0, 10) stateManager.flushStates { e: AirbyteMessage -> emittedStatesFromDestination.add( @@ -433,7 +432,7 @@ class GlobalAsyncStateManagerTest { assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) emittedStatesFromDestination.clear() - stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE) stateManager.flushStates { e: AirbyteMessage -> emittedStatesFromDestination.add( e, @@ -461,7 +460,7 @@ class GlobalAsyncStateManagerTest { emittedStatesFromDestination.clear() val afterConvertId2: Long = simulateIncomingRecords(STREAM1_DESC, 10, stateManager) - stateManager.trackState(GLOBAL_STATE_MESSAGE3, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE3, STATE_MSG_SIZE) stateManager.decrement(afterConvertId2, 10) stateManager.flushStates { e: AirbyteMessage -> emittedStatesFromDestination.add( @@ -494,7 +493,7 @@ class GlobalAsyncStateManagerTest { val preConvertId0: Long = simulateIncomingRecords(STREAM1_DESC, 10, stateManager) val preConvertId1: Long = simulateIncomingRecords(STREAM2_DESC, 10, stateManager) assertNotEquals(preConvertId0, preConvertId1) - stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.decrement(preConvertId0, 10) stateManager.decrement(preConvertId1, 10) stateManager.flushStates { e: AirbyteMessage -> @@ -523,7 +522,7 @@ class GlobalAsyncStateManagerTest { val afterConvertId0: Long = simulateIncomingRecords(STREAM1_DESC, 10, stateManager) val afterConvertId1: Long = simulateIncomingRecords(STREAM2_DESC, 10, stateManager) assertEquals(afterConvertId0, afterConvertId1) - stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE) stateManager.decrement(afterConvertId0, 20) stateManager.flushStates { e: AirbyteMessage -> emittedStatesFromDestination.add( @@ -557,7 +556,7 @@ class GlobalAsyncStateManagerTest { GlobalAsyncStateManager(GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)) // GLOBAL - stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.flushStates { e: AirbyteMessage -> emittedStatesFromDestination.add( e, @@ -586,7 +585,6 @@ class GlobalAsyncStateManagerTest { stateManager.trackState( GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, - DEFAULT_NAMESPACE, ) } } @@ -598,7 +596,7 @@ class GlobalAsyncStateManagerTest { GlobalAsyncStateManager(GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)) var stateId: Long = simulateIncomingRecords(STREAM1_DESC, 3, stateManager) - stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.decrement(stateId, 3) stateManager.flushStates { e: AirbyteMessage -> emittedStatesFromDestination.add( @@ -625,7 +623,7 @@ class GlobalAsyncStateManagerTest { emittedStatesFromDestination.clear() stateId = simulateIncomingRecords(STREAM1_DESC, 10, stateManager) - stateManager.trackState(STREAM1_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM1_STATE_MESSAGE2, STATE_MSG_SIZE) stateManager.decrement(stateId, 10) stateManager.flushStates { e: AirbyteMessage -> emittedStatesFromDestination.add( @@ -660,7 +658,7 @@ class GlobalAsyncStateManagerTest { GlobalAsyncStateManager(GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)) var stateId: Long = simulateIncomingRecords(STREAM1_DESC, 3, stateManager) - stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.decrement(stateId, 3) stateManager.flushStates { e: AirbyteMessage -> emittedStatesFromDestination.add( @@ -685,7 +683,7 @@ class GlobalAsyncStateManagerTest { assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) emittedStatesFromDestination.clear() - stateManager.trackState(STREAM1_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM1_STATE_MESSAGE2, STATE_MSG_SIZE) stateManager.flushStates { e: AirbyteMessage -> emittedStatesFromDestination.add( e, @@ -713,7 +711,7 @@ class GlobalAsyncStateManagerTest { emittedStatesFromDestination.clear() stateId = simulateIncomingRecords(STREAM1_DESC, 10, stateManager) - stateManager.trackState(STREAM1_STATE_MESSAGE3, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM1_STATE_MESSAGE3, STATE_MSG_SIZE) stateManager.decrement(stateId, 10) stateManager.flushStates { e: AirbyteMessage -> emittedStatesFromDestination.add( @@ -750,7 +748,7 @@ class GlobalAsyncStateManagerTest { val stream1StateId: Long = simulateIncomingRecords(STREAM1_DESC, 3, stateManager) val stream2StateId: Long = simulateIncomingRecords(STREAM2_DESC, 7, stateManager) - stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.decrement(stream1StateId, 3) stateManager.flushStates { e: AirbyteMessage -> emittedStatesFromDestination.add( @@ -782,7 +780,7 @@ class GlobalAsyncStateManagerTest { ) } assertEquals(listOf(), emittedStatesFromDestination) - stateManager.trackState(STREAM2_STATE_MESSAGE, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM2_STATE_MESSAGE, STATE_MSG_SIZE) stateManager.decrement(stream2StateId, 3) // only flush state if counter is 0. stateManager.flushStates { e: AirbyteMessage -> @@ -831,7 +829,7 @@ class GlobalAsyncStateManagerTest { GlobalAsyncStateManager(GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)) val stateId = simulateIncomingRecords(STREAM1_DESC, 6, stateManager) stateManager.decrement(stateId, 4) - stateManager.trackState(GLOBAL_STATE_MESSAGE1, 1, STREAM1_DESC.namespace) + stateManager.trackState(GLOBAL_STATE_MESSAGE1, 1) stateManager.flushStates { e: AirbyteMessage -> emittedStatesFromDestination.add( e, diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt index 9e3b1dc174d9..dde56527f700 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt @@ -28,7 +28,6 @@ import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordM import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator -import io.airbyte.cdk.integrations.util.addDefaultNamespaceToStreams import io.airbyte.commons.exceptions.ConnectionErrorException import io.airbyte.commons.json.Jsons import io.airbyte.commons.map.MoreMaps @@ -246,22 +245,7 @@ abstract class AbstractJdbcDestination ): SerializedAirbyteMessageConsumer? { val database = getDatabase(getDataSource(config)) - // Short circuit for non-v2 destinations. - if (!isDestinationV2) { - return JdbcBufferedConsumerFactory.createAsync( - outputRecordCollector, - database, - sqlOperations, - namingResolver, - config, - catalog, - null, - NoopTyperDeduper(), - ) - } - val defaultNamespace = config[configSchemaKey].asText() - addDefaultNamespaceToStreams(catalog, defaultNamespace) return getV2MessageConsumer( config, catalog, @@ -290,8 +274,8 @@ abstract class AbstractJdbcDestination CatalogParser(sqlGenerator, override) } - .orElse(CatalogParser(sqlGenerator)) + .map { override: String -> CatalogParser(sqlGenerator, defaultNamespace, override) } + .orElse(CatalogParser(sqlGenerator, defaultNamespace)) .parseCatalog(catalog!!) val typerDeduper: TyperDeduper = buildTyperDeduper( diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt index a1f174b4e3c4..e23089c64762 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt @@ -62,7 +62,7 @@ object JdbcBufferedConsumerFactory { namingResolver: NamingConventionTransformer, config: JsonNode, catalog: ConfiguredAirbyteCatalog, - defaultNamespace: String?, + defaultNamespace: String, typerDeduper: TyperDeduper, dataTransformer: StreamAwareDataTransformer = IdentityDataTransformer(), optimalBatchSizeBytes: Long = DEFAULT_OPTIMAL_BATCH_SIZE_FOR_FLUSH, @@ -85,8 +85,7 @@ object JdbcBufferedConsumerFactory { optimalBatchSizeBytes ), catalog, - BufferManager((Runtime.getRuntime().maxMemory() * 0.2).toLong()), - Optional.ofNullable(defaultNamespace), + BufferManager(defaultNamespace, (Runtime.getRuntime().maxMemory() * 0.2).toLong()), FlushFailure(), Executors.newFixedThreadPool(2), AirbyteMessageDeserializer(dataTransformer) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt index 2cd699c75995..d39482253134 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt @@ -48,7 +48,7 @@ private constructor( private val purgeStagingData: Boolean, private val typerDeduper: TyperDeduper?, private val parsedCatalog: ParsedCatalog?, - private val defaultNamespace: String?, + private val defaultNamespace: String, private val destinationColumns: JavaBaseConstants.DestinationColumns, // Optional fields private val bufferMemoryLimit: Optional, @@ -105,7 +105,9 @@ private constructor( purgeStagingData, typerDeduper, parsedCatalog, - defaultNamespace, + // If we don't set a default namespace, throw. This is required for staging + // destinations. + defaultNamespace!!, destinationColumns, bufferMemoryLimit, optimalBatchSizeBytes, @@ -148,8 +150,7 @@ private constructor( ), flusher, catalog!!, - BufferManager(getMemoryLimit(bufferMemoryLimit)), - Optional.ofNullable(defaultNamespace), + BufferManager(defaultNamespace, getMemoryLimit(bufferMemoryLimit)), FlushFailure(), Executors.newFixedThreadPool(5), AirbyteMessageDeserializer(dataTransformer), diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt index 07ae4123d338..88e4f5053b27 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt @@ -6,6 +6,7 @@ package io.airbyte.integrations.base.destination.typing_deduping import com.google.common.annotations.VisibleForTesting import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addStringForDeinterpolation import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream import io.github.oshai.kotlinlogging.KotlinLogging @@ -19,9 +20,20 @@ class CatalogParser @JvmOverloads constructor( private val sqlGenerator: SqlGenerator, - private val rawNamespace: String = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE + private val defaultNamespace: String, + private val rawNamespace: String = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE, ) { - fun parseCatalog(catalog: ConfiguredAirbyteCatalog): ParsedCatalog { + fun parseCatalog(orginalCatalog: ConfiguredAirbyteCatalog): ParsedCatalog { + // Don't mutate the original catalog, just operate on a copy of it + // This is... probably the easiest way we have to deep clone a protocol model object? + val catalog = Jsons.clone(orginalCatalog) + catalog.streams.onEach { + // Overwrite null namespaces + if (it.stream.namespace.isNullOrEmpty()) { + it.stream.namespace = defaultNamespace + } + } + // this code is bad and I feel bad // it's mostly a port of the old normalization logic to prevent tablename collisions. // tbh I have no idea if it works correctly. diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt index 9032fc5e42fb..f6ad003dac09 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt @@ -45,7 +45,7 @@ internal class CatalogParserTest { StreamId(namespace, name, rawNamespace, namespace + "_abab_" + name, namespace, name) } - parser = CatalogParser(sqlGenerator) + parser = CatalogParser(sqlGenerator, "default_namespace") } /** @@ -176,9 +176,22 @@ internal class CatalogParserTest { ) } + @Test + fun testDefaultNamespace() { + val catalog = + parser.parseCatalog( + ConfiguredAirbyteCatalog() + .withStreams( + listOf(stream(null, "a", Jsons.deserialize("""{"type": "object"}"""))) + ) + ) + + Assertions.assertEquals("default_namespace", catalog.streams[0].id.originalNamespace) + } + companion object { private fun stream( - namespace: String, + namespace: String?, name: String, schema: JsonNode = Jsons.deserialize( diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index bcb5488acfa0..b28d16b2c077 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -1773,7 +1773,9 @@ abstract class BaseSqlGeneratorIntegrationTest