Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSSQL and debezium improvements #44759

Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f4b8180
test
rodireich Aug 26, 2024
6590035
test
rodireich Aug 26, 2024
ad1ea1a
test
rodireich Aug 26, 2024
c09dbd2
test
rodireich Aug 26, 2024
95ac561
test
rodireich Aug 26, 2024
9960d38
test
rodireich Aug 26, 2024
45bd4c0
test
rodireich Aug 26, 2024
470e13a
test
rodireich Aug 26, 2024
8fcdf0c
test
rodireich Aug 26, 2024
bd5498b
test
rodireich Aug 26, 2024
a73c6d5
test
rodireich Aug 26, 2024
27e5319
test
rodireich Aug 26, 2024
9995f12
test
rodireich Aug 27, 2024
e11c15d
test
rodireich Aug 27, 2024
90ba205
test
rodireich Aug 27, 2024
abba05d
test
rodireich Aug 27, 2024
7f9ece1
test
rodireich Aug 27, 2024
0778be1
Merge branch 'master' into 6192-db-sources-mssql-javalangillegalargum…
rodireich Aug 27, 2024
c3ab36f
test
rodireich Aug 27, 2024
202ed06
test
rodireich Aug 28, 2024
97e89a2
Merge branch 'master' into 6192-db-sources-mssql-javalangillegalargum…
rodireich Aug 28, 2024
93d5951
test
rodireich Aug 28, 2024
3089475
test
rodireich Aug 28, 2024
45a9c0b
Merge branch 'master' into 6192-db-sources-mssql-javalangillegalargum…
rodireich Aug 28, 2024
388468e
test
rodireich Aug 28, 2024
1f5553f
test
rodireich Aug 28, 2024
486cfdc
ignore regular mssql doc from sidebar, replace with custom sidebar ob…
evantahler Aug 28, 2024
0819f8b
Merge branch 'master' into 6192-db-sources-mssql-javalangillegalargum…
rodireich Aug 28, 2024
da95ce7
version bump
rodireich Aug 28, 2024
5cb022b
Merge branch 'master' into 6192-db-sources-mssql-javalangillegalargum…
rodireich Aug 28, 2024
78f7487
sanity
rodireich Aug 28, 2024
b9dedcc
Merge branch 'master' into 6192-db-sources-mssql-javalangillegalargum…
rodireich Aug 28, 2024
b873398
sanity
rodireich Aug 28, 2024
c9518df
use published cdk
rodireich Aug 28, 2024
7230252
use published cdk
rodireich Aug 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,22 @@ package io.airbyte.cdk.integrations.debezium.internals
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.commons.json.Jsons
import io.debezium.engine.ChangeEvent
import io.github.oshai.kotlinlogging.KotlinLogging

class ChangeEventWithMetadata(private val event: ChangeEvent<String?, String?>) {
private val eventKeyAsJson: JsonNode = Jsons.deserialize(event.key())
private val eventValueAsJson: JsonNode = Jsons.deserialize(event.value())
private val snapshotMetadata: SnapshotMetadata? =
SnapshotMetadata.Companion.fromString(eventValueAsJson["source"]["snapshot"].asText())

fun event(): ChangeEvent<String?, String?> {
return event
}
private val LOGGER = KotlinLogging.logger {}

fun eventKeyAsJson(): JsonNode {
return eventKeyAsJson
}
class ChangeEventWithMetadata(private val event: ChangeEvent<String?, String?>) {
val eventKeyAsJson: JsonNode?
postamar marked this conversation as resolved.
Show resolved Hide resolved
get() = event.key()?.let { Jsons.deserialize(it) }.also { it ?: LOGGER.warn { "Event key is null $event" } }
val eventValueAsJson: JsonNode?
get() = event.value()?.let { Jsons.deserialize(it) }.also { it ?: LOGGER.warn { "Event value is null $event" } }

fun eventValueAsJson(): JsonNode {
return eventValueAsJson
}
val snapshotMetadata: SnapshotMetadata?
get() {
val metadataKey = eventValueAsJson?.get("source")?.get("snapshot")?.asText()
return metadataKey?.let { return SnapshotMetadata.fromString(metadataKey) }
}

val isSnapshotEvent: Boolean
get() = SnapshotMetadata.Companion.isSnapshotEventMetadata(snapshotMetadata)

fun snapshotMetadata(): SnapshotMetadata? {
return snapshotMetadata
}
get() = SnapshotMetadata.isSnapshotEventMetadata(snapshotMetadata)
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class DebeziumRecordIterator<T>(
hasSnapshotFinished = !changeEventWithMetadata.isSnapshotEvent

if (isEventLogged) {
val source: JsonNode? = changeEventWithMetadata.eventValueAsJson()["source"]
val source: JsonNode? = changeEventWithMetadata.eventValueAsJson?.get("source")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kotlin doesn't have ?[] unfortunately

LOGGER.info {
"CDC events queue poll(): " +
"returned a change event with \"source\": $source."
Expand Down Expand Up @@ -340,8 +340,10 @@ class DebeziumRecordIterator<T>(
* snapshots) t: truncate, m: message
*/
fun isEventTypeHandled(event: ChangeEventWithMetadata): Boolean {
event.eventValueAsJson()["op"]?.asText()?.let {
return it in listOf("c", "u", "d", "r", "t")
LOGGER.info { "*** isEventTypeHandled for ${event.eventValueAsJson}" }
LOGGER.info { "*** op ${event.eventValueAsJson?.get("op")?.asText()}" }
event.eventValueAsJson?.get("op")?.asText()?.let {
return it in listOf("c", "u", "d", "t")
postamar marked this conversation as resolved.
Show resolved Hide resolved
}
?: return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ class RelationalDbDebeziumEventConverter(
private val emittedAt: Instant
) : DebeziumEventConverter {
override fun toAirbyteMessage(event: ChangeEventWithMetadata): AirbyteMessage {
val debeziumEvent = event.eventValueAsJson()
val before: JsonNode? = debeziumEvent.get(DebeziumEventConverter.Companion.BEFORE_EVENT)
val after: JsonNode? = debeziumEvent.get(DebeziumEventConverter.Companion.AFTER_EVENT)
val debeziumEvent = event.eventValueAsJson
val before: JsonNode? = debeziumEvent?.get(DebeziumEventConverter.Companion.BEFORE_EVENT)
val after: JsonNode? = debeziumEvent?.get(DebeziumEventConverter.Companion.AFTER_EVENT)
val source: JsonNode =
checkNotNull(debeziumEvent.get(DebeziumEventConverter.Companion.SOURCE_EVENT)) {
checkNotNull(debeziumEvent?.get(DebeziumEventConverter.Companion.SOURCE_EVENT)) {
"ChangeEvent contains no source record $debeziumEvent"
}

Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.44.4'
features = ['db-sources']
useLocalCdk = false
useLocalCdk = true
}

java {
Expand All @@ -25,7 +25,7 @@ application {

dependencies {
implementation 'com.microsoft.sqlserver:mssql-jdbc:12.6.1.jre11'
implementation 'io.debezium:debezium-embedded:2.6.1.Final'
implementation 'io.debezium:debezium-embedded:2.7.1.Final'
implementation 'io.debezium:debezium-connector-sqlserver:2.6.2.Final'
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.1.8
dockerImageTag: 4.1.9
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static Properties getDebeziumProperties(final JdbcDatabase database, fina
} else {
// If not in snapshot mode, initial will make sure that a snapshot is taken if the transaction log
// is rotated out. This will also end up read streaming changes from the transaction_log.
props.setProperty("snapshot.mode", "initial");
props.setProperty("snapshot.mode", "recovery");
}

props.setProperty("snapshot.isolation.mode", "read_committed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ public MssqlCdcTargetPosition(final Lsn targetLsn) {
public boolean reachedTargetPosition(final ChangeEventWithMetadata changeEventWithMetadata) {
if (changeEventWithMetadata.isSnapshotEvent()) {
return false;
} else if (SnapshotMetadata.LAST == changeEventWithMetadata.snapshotMetadata()) {
} else if (SnapshotMetadata.LAST == changeEventWithMetadata.getSnapshotMetadata()) {
LOGGER.info("Signalling close because Snapshot is complete");
return true;
} else {
final Lsn recordLsn = extractLsn(changeEventWithMetadata.eventValueAsJson());
final Lsn recordLsn = extractLsn(changeEventWithMetadata.getEventValueAsJson());
final boolean isEventLSNAfter = targetLsn.compareTo(recordLsn) <= 0;
if (isEventLSNAfter) {
LOGGER.info("Signalling close because record's LSN : " + recordLsn + " is after target LSN : " + targetLsn);
Expand Down Expand Up @@ -122,7 +122,7 @@ public boolean isEventAheadOffset(Map<String, String> offset, ChangeEventWithMet
if (offset == null || offset.size() != 1) {
return false;
}
final Lsn eventLsn = extractLsn(event.eventValueAsJson());
final Lsn eventLsn = extractLsn(event.getEventValueAsJson());
final Lsn offsetLsn = offsetToLsn(offset);
return eventLsn.compareTo(offsetLsn) > 0;
}
Expand Down
Loading