Skip to content
Draft

test #70218

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import io.airbyte.cdk.protocol.AirbyteValueProtobufEncoder
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.protobuf.AirbyteRecordMessage
import io.airbyte.protocol.protobuf.AirbyteRecordMessage.AirbyteRecordMessageProtobuf
import io.github.oshai.kotlinlogging.KotlinLogging
import java.math.BigDecimal
import java.net.URL
import java.time.OffsetDateTime
Expand Down Expand Up @@ -62,12 +63,14 @@ fun <R> valueForProtobufEncoding(fve: FieldValueEncoder<R>): Any? {
}
}

val log = KotlinLogging.logger {}
fun NativeRecordPayload.toProtobuf(
schema: Set<FieldOrMetaField>,
recordMessageBuilder: AirbyteRecordMessageProtobuf.Builder,
valueBuilder: AirbyteRecordMessage.AirbyteValueProtobuf.Builder
): AirbyteRecordMessageProtobuf.Builder {
return recordMessageBuilder.apply {
log.info { "*** schema: $schema" }
schema
.sortedBy { it.id }
.forEachIndexed { index, field ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import io.airbyte.protocol.protobuf.AirbyteMessage.AirbyteMessageProtobuf
import io.airbyte.protocol.protobuf.AirbyteRecordMessage.AirbyteRecordMessageProtobuf
import io.airbyte.protocol.protobuf.AirbyteRecordMessage.AirbyteValueProtobuf
import io.airbyte.protocol.protobuf.AirbyteRecordMessageMetaOuterClass
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Clock
import java.time.ZoneOffset

Expand Down Expand Up @@ -114,10 +115,12 @@ sealed class FeedBootstrap<T : Feed>(
outputDataChannel.close()
}

val log = KotlinLogging.logger {}
override fun accept(
recordData: NativeRecordPayload,
changes: Map<Field, FieldValueChange>?
) {
log.info { "*** schema: ${stream.schema}" }
if (changes.isNullOrEmpty()) {
acceptWithoutChanges(recordData.toJson())
} else {
Expand Down Expand Up @@ -227,11 +230,13 @@ sealed class FeedBootstrap<T : Feed>(
socketProtobufOutputConsumer.close()
}

val log = KotlinLogging.logger {}
val valueVBuilder = AirbyteValueProtobuf.newBuilder()!!
override fun accept(
recordData: NativeRecordPayload,
changes: Map<Field, FieldValueChange>?
) {
log.info { "*** schema: ${stream.schema}" }
if (changes.isNullOrEmpty()) {
acceptWithoutChanges(
recordData.toProtobuf(stream.schema, defaultRecordData, valueVBuilder)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
testExecutionConcurrency=1
JunitMethodExecutionTimeout=5m
cdkVersion=0.1.77
cdkVersion=local
Original file line number Diff line number Diff line change
Expand Up @@ -148,40 +148,52 @@ class MySqlSourceDebeziumOperations(
val transactionMillis: Long = source["ts_ms"].asLong()
val transactionOffsetDateTime: OffsetDateTime =
OffsetDateTime.ofInstant(Instant.ofEpochMilli(transactionMillis), ZoneOffset.UTC)
resultRow[CommonMetaField.CDC_UPDATED_AT.id] =
FieldValueEncoder(
transactionOffsetDateTime,
CommonMetaField.CDC_UPDATED_AT.type.jsonEncoder as JsonEncoder<Any>
)

resultRow[CommonMetaField.CDC_DELETED_AT.id] =
FieldValueEncoder(
if (isDelete) transactionOffsetDateTime else null,
(if (isDelete) CommonMetaField.CDC_DELETED_AT.type.jsonEncoder else NullCodec)
as JsonEncoder<Any>
)
if (stream.schema.any { it.id == CommonMetaField.CDC_UPDATED_AT.id }) {
resultRow[CommonMetaField.CDC_UPDATED_AT.id] =
FieldValueEncoder(
transactionOffsetDateTime,
CommonMetaField.CDC_UPDATED_AT.type.jsonEncoder as JsonEncoder<Any>
)
}

if (stream.schema.any { it.id == CommonMetaField.CDC_DELETED_AT.id }) {
resultRow[CommonMetaField.CDC_DELETED_AT.id] =
FieldValueEncoder(
if (isDelete) transactionOffsetDateTime else null,
(if (isDelete) CommonMetaField.CDC_DELETED_AT.type.jsonEncoder else NullCodec)
as JsonEncoder<Any>
)
}

// Set _ab_cdc_log_file and _ab_cdc_log_pos meta-field values.
val position = MySqlSourceCdcPosition(source["file"].asText(), source["pos"].asLong())

resultRow[MySqlSourceCdcMetaFields.CDC_LOG_FILE.id] =
FieldValueEncoder(
position.fileName,
MySqlSourceCdcMetaFields.CDC_LOG_FILE.type.jsonEncoder as JsonEncoder<Any>
)
if (stream.schema.any { it.id == MySqlSourceCdcMetaFields.CDC_LOG_FILE.id }) {
resultRow[MySqlSourceCdcMetaFields.CDC_LOG_FILE.id] =
FieldValueEncoder(
position.fileName,
MySqlSourceCdcMetaFields.CDC_LOG_FILE.type.jsonEncoder as JsonEncoder<Any>
)
}

resultRow[MySqlSourceCdcMetaFields.CDC_LOG_POS.id] =
FieldValueEncoder(
position.position.toDouble(),
MySqlSourceCdcMetaFields.CDC_LOG_POS.type.jsonEncoder as JsonEncoder<Any>
)
if (stream.schema.any { it.id == MySqlSourceCdcMetaFields.CDC_LOG_POS.id }) {
resultRow[MySqlSourceCdcMetaFields.CDC_LOG_POS.id] =
FieldValueEncoder(
position.position.toDouble(),
MySqlSourceCdcMetaFields.CDC_LOG_POS.type.jsonEncoder as JsonEncoder<Any>
)
}

// Set the _ab_cdc_cursor meta-field value.
resultRow[MySqlSourceCdcMetaFields.CDC_CURSOR.id] =
FieldValueEncoder(
position.cursorValue,
MySqlSourceCdcMetaFields.CDC_CURSOR.type.jsonEncoder as JsonEncoder<Any>
)

if (stream.schema.any { it.id == MySqlSourceCdcMetaFields.CDC_CURSOR.id }) {
// Set the _ab_cdc_cursor meta-field value.
resultRow[MySqlSourceCdcMetaFields.CDC_CURSOR.id] =
FieldValueEncoder(
position.cursorValue,
MySqlSourceCdcMetaFields.CDC_CURSOR.type.jsonEncoder as JsonEncoder<Any>
)
}

// Return a DeserializedRecord instance.
return DeserializedRecord(resultRow, emptyMap())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package io.airbyte.integrations.source.mysql
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import com.mysql.cj.MysqlType
import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.data.JsonEncoder
import io.airbyte.cdk.discover.CdcIntegerMetaFieldType
Expand Down Expand Up @@ -68,13 +69,14 @@ import io.airbyte.cdk.read.WhereClauseNode
import io.airbyte.cdk.read.WhereNode
import io.airbyte.cdk.read.cdc.DebeziumOffset
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.micronaut.context.annotation.Primary
import jakarta.inject.Singleton
import java.time.OffsetDateTime

@Singleton
@Primary
class MySqlSourceOperations :
class MySqlSourceOperations(val configuredCatalog: ConfiguredAirbyteCatalog,) :
JdbcMetadataQuerier.FieldTypeMapper, SelectQueryGenerator, JdbcAirbyteStreamFactory {

override val globalCursor: MetaField = MySqlSourceCdcMetaFields.CDC_CURSOR
Expand All @@ -86,7 +88,11 @@ class MySqlSourceOperations :
CommonMetaField.CDC_DELETED_AT,
MySqlSourceCdcMetaFields.CDC_LOG_FILE,
MySqlSourceCdcMetaFields.CDC_LOG_POS
)
).filter { metaField ->
configuredCatalog.streams.any { configuredStream ->
configuredStream.stream?.jsonSchema?.get("properties")?.has(metaField.id) == true
}
}.toSet()

@Suppress("UNCHECKED_CAST")
override fun decorateRecordData(
Expand Down Expand Up @@ -129,28 +135,45 @@ class MySqlSourceOperations :
stream: Stream,
recordData: ObjectNode
) {
recordData.set<JsonNode>(
CommonMetaField.CDC_UPDATED_AT.id,
CdcOffsetDateTimeMetaFieldType.jsonEncoder.encode(timestamp),
)
recordData.set<JsonNode>(
MySqlSourceCdcMetaFields.CDC_LOG_POS.id,
CdcIntegerMetaFieldType.jsonEncoder.encode(0),
)
/*
val airbyteSteam = configuredCatalog.streams.mapNotNull { it.stream }.first { stream.id == StreamIdentifier.from(it) }
val jsonSchemaProperties: JsonNode = airbyteSteam.jsonSchema["properties"]
*/

// jsonSchemaProperties.properties().firstOrNull { it.key == CommonMetaField.CDC_UPDATED_AT.id }?.run {
if (stream.schema.any { it.id == CommonMetaField.CDC_UPDATED_AT.id }) {
recordData.set<JsonNode>(
CommonMetaField.CDC_UPDATED_AT.id,
CdcOffsetDateTimeMetaFieldType.jsonEncoder.encode(timestamp),
)
}
// }
if (stream.schema.any { it.id == MySqlSourceCdcMetaFields.CDC_LOG_POS.id }) {
recordData.set<JsonNode>(
MySqlSourceCdcMetaFields.CDC_LOG_POS.id,
CdcIntegerMetaFieldType.jsonEncoder.encode(0),
)
}
if (globalStateValue == null) {
return
}
val offset: DebeziumOffset =
MySqlSourceDebeziumOperations.deserializeStateUnvalidated(globalStateValue).offset
val position: MySqlSourceCdcPosition = MySqlSourceDebeziumOperations.position(offset)
recordData.set<JsonNode>(
MySqlSourceCdcMetaFields.CDC_LOG_FILE.id,
CdcStringMetaFieldType.jsonEncoder.encode(position.fileName),
)
recordData.set<JsonNode>(
MySqlSourceCdcMetaFields.CDC_LOG_POS.id,
CdcIntegerMetaFieldType.jsonEncoder.encode(position.position),
)

if (stream.schema.any { it.id == MySqlSourceCdcMetaFields.CDC_LOG_FILE.id }) {
recordData.set<JsonNode>(
MySqlSourceCdcMetaFields.CDC_LOG_FILE.id,
CdcStringMetaFieldType.jsonEncoder.encode(position.fileName),
)
}

if (stream.schema.any { it.id == MySqlSourceCdcMetaFields.CDC_LOG_POS.id }) {
recordData.set<JsonNode>(
MySqlSourceCdcMetaFields.CDC_LOG_POS.id,
CdcIntegerMetaFieldType.jsonEncoder.encode(position.position),
)
}
}

override fun toFieldType(c: JdbcMetadataQuerier.ColumnMetadata): FieldType =
Expand Down
Loading