Skip to content

Commit

Permalink
Destinations: Refreshes: Track stream statuses in async framework (#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Jun 10, 2024
1 parent a78647e commit 05fd09b
Show file tree
Hide file tree
Showing 16 changed files with 443 additions and 184 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.37.1 | 2024-06-10 | [\#38075](https://github.com/airbytehq/airbyte/pull/38075) | Destinations: Track stream statuses in async framework |
| 0.37.0 | 2024-06-10 | [\#38121](https://github.com/airbytehq/airbyte/pull/38121) | Destinations: Set default namespace via CatalogParser |
| 0.36.8 | 2024-06-07 | [\#38763](https://github.com/airbytehq/airbyte/pull/38763) | Increase Jackson message length limit |
| 0.36.7 | 2024-06-06 | [\#39220](https://github.com/airbytehq/airbyte/pull/39220) | Handle null messages in ConnectorExceptionUtil |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,9 @@
*/
package io.airbyte.cdk.integrations.destination

import java.util.*
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus

/**
* @param recordsWritten The number of records written to the stream, or empty if the caller does
* not track this information. (this is primarily for backwards-compatibility with the legacy
* destinations framework; new implementations should always provide this information). If this
* value is empty, consumers should assume that the sync wrote nonzero records for this stream.
*/
data class StreamSyncSummary(val recordsWritten: Optional<Long>) {

companion object {
@JvmField val DEFAULT: StreamSyncSummary = StreamSyncSummary(Optional.empty())
}
}
data class StreamSyncSummary(
val recordsWritten: Long,
val terminalStatus: AirbyteStreamStatus,
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseF
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.StreamDescriptor
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.ExecutorService
Expand Down Expand Up @@ -71,6 +71,8 @@ constructor(

// Note that this map will only be populated for streams with nonzero records.
private val recordCounts: ConcurrentMap<StreamDescriptor, AtomicLong> = ConcurrentHashMap()
private val terminalStatusesFromSource: ConcurrentMap<StreamDescriptor, AirbyteStreamStatus> =
ConcurrentHashMap()

private var hasStarted = false
private var hasClosed = false
Expand Down Expand Up @@ -103,12 +105,43 @@ constructor(
airbyteMessageDeserializer.deserializeAirbyteMessage(
message,
)
if (AirbyteMessage.Type.RECORD == partialAirbyteMessage.type) {
validateRecord(partialAirbyteMessage)

partialAirbyteMessage.record?.streamDescriptor?.let {
getRecordCounter(it).incrementAndGet()
when (partialAirbyteMessage.type) {
AirbyteMessage.Type.RECORD -> {
validateRecord(partialAirbyteMessage)

partialAirbyteMessage.record?.streamDescriptor?.let {
getRecordCounter(it).incrementAndGet()

if (terminalStatusesFromSource.containsKey(it)) {
throw IllegalStateException(
"Received a record message after a terminal stream status for stream ${it.namespace}.${it.name}"
)
}
}
}
AirbyteMessage.Type.TRACE -> {
// There are many types of trace messages, but we only care about stream status
// messages with status=COMPLETE or INCOMPLETE.
// INCOMPLETE is a slightly misleading name - it actually means "Stream has stopped
// due to an interruption or error", i.e. failure
partialAirbyteMessage.trace?.streamStatus?.let {
val isTerminalStatus =
it.status == AirbyteStreamStatus.COMPLETE ||
it.status == AirbyteStreamStatus.INCOMPLETE
if (isTerminalStatus) {
val conflictsWithExistingStatus =
terminalStatusesFromSource.containsKey(it.streamDescriptor) &&
terminalStatusesFromSource[it.streamDescriptor] != it.status
if (conflictsWithExistingStatus) {
throw IllegalStateException(
"Received conflicting stream statuses for stream ${it.streamDescriptor.namespace}.${it.streamDescriptor.name}"
)
}
terminalStatusesFromSource[it.streamDescriptor] = it.status
}
}
}
else -> {}
}
bufferEnqueue.addRecord(
partialAirbyteMessage,
Expand All @@ -131,12 +164,18 @@ constructor(

val streamSyncSummaries =
streamNames.associate { streamDescriptor ->
// If we didn't receive a stream status message, assume success.
// Platform won't send us any stream status messages yet (since we're not declaring
// supportsRefresh in metadata), so we will always hit this case.
val terminalStatusFromSource =
terminalStatusesFromSource[streamDescriptor] ?: AirbyteStreamStatus.COMPLETE
StreamDescriptorUtils.withDefaultNamespace(
streamDescriptor,
bufferManager.defaultNamespace,
) to
StreamSyncSummary(
Optional.of(getRecordCounter(streamDescriptor).get()),
getRecordCounter(streamDescriptor).get(),
terminalStatusFromSource,
)
}
onClose.accept(hasFailed, streamSyncSummaries)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,22 @@ class BufferEnqueue(
message: PartialAirbyteMessage,
sizeInBytes: Int,
) {
if (message.type == AirbyteMessage.Type.RECORD) {
handleRecord(message, sizeInBytes)
} else if (message.type == AirbyteMessage.Type.STATE) {
stateManager.trackState(message, sizeInBytes.toLong())
when (message.type) {
AirbyteMessage.Type.RECORD -> {
handleRecord(message, sizeInBytes)
}
AirbyteMessage.Type.STATE -> {
stateManager.trackState(message, sizeInBytes.toLong())
}
else -> {}
}
}

private fun handleRecord(
message: PartialAirbyteMessage,
sizeInBytes: Int,
) {
val streamDescriptor = extractStateFromRecord(message)
val streamDescriptor = extractStreamDescriptorFromRecord(message)
val queue =
buffers.computeIfAbsent(
streamDescriptor,
Expand Down Expand Up @@ -87,7 +91,9 @@ class BufferEnqueue(
}

companion object {
private fun extractStateFromRecord(message: PartialAirbyteMessage): StreamDescriptor {
private fun extractStreamDescriptorFromRecord(
message: PartialAirbyteMessage
): StreamDescriptor {
return StreamDescriptor()
.withNamespace(message.record?.namespace)
.withName(message.record?.stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package io.airbyte.cdk.integrations.destination.async.deser
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.github.oshai.kotlinlogging.KotlinLogging

private val logger = KotlinLogging.logger {}

class AirbyteMessageDeserializer(
private val dataTransformer: StreamAwareDataTransformer = IdentityDataTransformer(),
Expand Down Expand Up @@ -53,8 +56,8 @@ class AirbyteMessageDeserializer(
partial.record?.data = null
} else if (AirbyteMessage.Type.STATE == msgType) {
partial.withSerialized(message)
} else {
throw RuntimeException(String.format("Unsupported message type: %s", msgType))
} else if (AirbyteMessage.Type.TRACE != msgType) {
logger.warn { "Unsupported message type: $msgType" }
}

return partial
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package io.airbyte.cdk.integrations.destination.async.model
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.annotation.JsonPropertyDescription
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteTraceMessage
import java.util.Objects

class PartialAirbyteMessage {
Expand All @@ -26,6 +27,12 @@ class PartialAirbyteMessage {
@JsonProperty("state")
var state: PartialAirbyteStateMessage? = null

@get:JsonProperty("trace")
@set:JsonProperty("trace")
@JsonProperty("trace")
// These messages don't contain arbitrary blobs, so just directly reference the protocol struct.
var trace: AirbyteTraceMessage? = null

/**
* For record messages, this stores the serialized data blob (i.e.
* `Jsons.serialize(message.getRecord().getData())`). For state messages, this stores the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ internal constructor(
* hasFailed=false, then it could be full success. if hasFailed=true, then going for partial
* success.
*/
onClose.accept(false, null)
// TODO what to do here?
onClose.accept(false, HashMap())
}

stateManager.listCommitted()!!.forEach(outputRecordCollector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,21 @@
package io.airbyte.cdk.integrations.destination.buffered_stream_consumer

import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.commons.functional.CheckedBiConsumer
import io.airbyte.protocol.models.v0.StreamDescriptor

/**
* Interface allowing destination to specify clean up logic that must be executed after all
* record-related logic has finished.
*
* The map of StreamSyncSummaries MUST be non-null, but MAY be empty. Streams not present in the map
* will be treated as equivalent to [StreamSyncSummary.DEFAULT].
*
* The @JvmSuppressWildcards is here so that the 2nd parameter of accept stays a java
* Map<StreamDescriptor, StreamSyncSummary> rather than becoming a Map<StreamDescriptor, ? extends
* StreamSyncSummary>
*/
fun interface OnCloseFunction :
CheckedBiConsumer<
Boolean, @JvmSuppressWildcards Map<StreamDescriptor, StreamSyncSummary>, Exception>
fun interface OnCloseFunction {
@JvmSuppressWildcards
@Throws(Exception::class)
fun accept(
hasFailed: Boolean,
streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>,
)
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.37.0
version=0.37.1
Loading

0 comments on commit 05fd09b

Please sign in to comment.