Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSSQL and debezium improvements #44759

Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f4b8180
test
rodireich Aug 26, 2024
6590035
test
rodireich Aug 26, 2024
ad1ea1a
test
rodireich Aug 26, 2024
c09dbd2
test
rodireich Aug 26, 2024
95ac561
test
rodireich Aug 26, 2024
9960d38
test
rodireich Aug 26, 2024
45bd4c0
test
rodireich Aug 26, 2024
470e13a
test
rodireich Aug 26, 2024
8fcdf0c
test
rodireich Aug 26, 2024
bd5498b
test
rodireich Aug 26, 2024
a73c6d5
test
rodireich Aug 26, 2024
27e5319
test
rodireich Aug 26, 2024
9995f12
test
rodireich Aug 27, 2024
e11c15d
test
rodireich Aug 27, 2024
90ba205
test
rodireich Aug 27, 2024
abba05d
test
rodireich Aug 27, 2024
7f9ece1
test
rodireich Aug 27, 2024
0778be1
Merge branch 'master' into 6192-db-sources-mssql-javalangillegalargum…
rodireich Aug 27, 2024
c3ab36f
test
rodireich Aug 27, 2024
202ed06
test
rodireich Aug 28, 2024
97e89a2
Merge branch 'master' into 6192-db-sources-mssql-javalangillegalargum…
rodireich Aug 28, 2024
93d5951
test
rodireich Aug 28, 2024
3089475
test
rodireich Aug 28, 2024
45a9c0b
Merge branch 'master' into 6192-db-sources-mssql-javalangillegalargum…
rodireich Aug 28, 2024
388468e
test
rodireich Aug 28, 2024
1f5553f
test
rodireich Aug 28, 2024
486cfdc
ignore regular mssql doc from sidebar, replace with custom sidebar ob…
evantahler Aug 28, 2024
0819f8b
Merge branch 'master' into 6192-db-sources-mssql-javalangillegalargum…
rodireich Aug 28, 2024
da95ce7
version bump
rodireich Aug 28, 2024
5cb022b
Merge branch 'master' into 6192-db-sources-mssql-javalangillegalargum…
rodireich Aug 28, 2024
78f7487
sanity
rodireich Aug 28, 2024
b9dedcc
Merge branch 'master' into 6192-db-sources-mssql-javalangillegalargum…
rodireich Aug 28, 2024
b873398
sanity
rodireich Aug 28, 2024
c9518df
use published cdk
rodireich Aug 28, 2024
7230252
use published cdk
rodireich Aug 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:-------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.44.18 | 2024-08-22 | [\#44505](https://github.com/airbytehq/airbyte/pull/44505) | Improve handling of incoming debezium change events |
| 0.44.17 | 2024-08-27 | [\#44832](https://github.com/airbytehq/airbyte/pull/44832) | Fix issues where some error messages with upper cases do not get matched by the error translation framework. |
| 0.44.16 | 2024-08-22 | [\#44505](https://github.com/airbytehq/airbyte/pull/44505) | Destinations: add sqlgenerator testing for mixed-case stream name |
| 0.44.15 | ?????????? | [\#?????](https://github.com/airbytehq/airbyte/pull/?????) | ????? |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.44.17
version=0.44.18
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,32 @@ package io.airbyte.cdk.integrations.debezium.internals
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.commons.json.Jsons
import io.debezium.engine.ChangeEvent
import io.github.oshai.kotlinlogging.KotlinLogging

class ChangeEventWithMetadata(private val event: ChangeEvent<String?, String?>) {
private val eventKeyAsJson: JsonNode = Jsons.deserialize(event.key())
private val eventValueAsJson: JsonNode = Jsons.deserialize(event.value())
private val snapshotMetadata: SnapshotMetadata? =
SnapshotMetadata.Companion.fromString(eventValueAsJson["source"]["snapshot"].asText())

fun event(): ChangeEvent<String?, String?> {
return event
}
private val LOGGER = KotlinLogging.logger {}

fun eventKeyAsJson(): JsonNode {
return eventKeyAsJson
}
class ChangeEventWithMetadata(private val event: ChangeEvent<String?, String?>) {
val eventKeyAsJson: JsonNode?
postamar marked this conversation as resolved.
Show resolved Hide resolved
get() =
event
.key()
?.let { Jsons.deserialize(it) }
.also { it ?: LOGGER.warn { "Event key is null $event" } }
val eventValueAsJson: JsonNode?
get() =
event
.value()
?.let { Jsons.deserialize(it) }
.also { it ?: LOGGER.warn { "Event value is null $event" } }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could spam the logs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get to the situation where value or key are null then something is seriously misoncifugred, as was the case on the oncall ticket.
I'd rather know what the malformed debezium event looks like. I couldn't figure out the root cause until I was able to get to this information.


fun eventValueAsJson(): JsonNode {
return eventValueAsJson
}
val snapshotMetadata: SnapshotMetadata?
get() {
val metadataKey = eventValueAsJson?.get("source")?.get("snapshot")?.asText()
return metadataKey?.let {
return SnapshotMetadata.fromString(metadataKey)
}
postamar marked this conversation as resolved.
Show resolved Hide resolved
}

val isSnapshotEvent: Boolean
get() = SnapshotMetadata.Companion.isSnapshotEventMetadata(snapshotMetadata)

fun snapshotMetadata(): SnapshotMetadata? {
return snapshotMetadata
}
get() = SnapshotMetadata.isSnapshotEventMetadata(snapshotMetadata)
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class DebeziumRecordIterator<T>(
hasSnapshotFinished = !changeEventWithMetadata.isSnapshotEvent

if (isEventLogged) {
val source: JsonNode? = changeEventWithMetadata.eventValueAsJson()["source"]
val source: JsonNode? = changeEventWithMetadata.eventValueAsJson?.get("source")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kotlin doesn't have ?[] unfortunately

LOGGER.info {
"CDC events queue poll(): " +
"returned a change event with \"source\": $source."
Expand Down Expand Up @@ -340,8 +340,8 @@ class DebeziumRecordIterator<T>(
* snapshots) t: truncate, m: message
*/
fun isEventTypeHandled(event: ChangeEventWithMetadata): Boolean {
event.eventValueAsJson()["op"]?.asText()?.let {
return it in listOf("c", "u", "d", "r", "t")
event.eventValueAsJson?.get("op")?.asText()?.let {
return it in listOf("c", "u", "d", "t")
postamar marked this conversation as resolved.
Show resolved Hide resolved
}
?: return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ class RelationalDbDebeziumEventConverter(
private val emittedAt: Instant
) : DebeziumEventConverter {
override fun toAirbyteMessage(event: ChangeEventWithMetadata): AirbyteMessage {
val debeziumEvent = event.eventValueAsJson()
val before: JsonNode? = debeziumEvent.get(DebeziumEventConverter.Companion.BEFORE_EVENT)
val after: JsonNode? = debeziumEvent.get(DebeziumEventConverter.Companion.AFTER_EVENT)
val debeziumEvent = event.eventValueAsJson
val before: JsonNode? = debeziumEvent?.get(DebeziumEventConverter.Companion.BEFORE_EVENT)
val after: JsonNode? = debeziumEvent?.get(DebeziumEventConverter.Companion.AFTER_EVENT)
val source: JsonNode =
checkNotNull(debeziumEvent.get(DebeziumEventConverter.Companion.SOURCE_EVENT)) {
checkNotNull(debeziumEvent?.get(DebeziumEventConverter.Companion.SOURCE_EVENT)) {
"ChangeEvent contains no source record $debeziumEvent"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class DebeziumRecordIteratorTest {
"c, true",
"u, true",
"d, true",
"r, true",
"r, false",
postamar marked this conversation as resolved.
Show resolved Hide resolved
"t, true",
"m, false",
"badVal, false",
Expand Down
6 changes: 3 additions & 3 deletions airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.44.17'
cdkVersionRequired = '0.44.18'
features = ['db-sources']
useLocalCdk = false
useLocalCdk = true
}

java {
Expand All @@ -25,7 +25,7 @@ application {

dependencies {
implementation 'com.microsoft.sqlserver:mssql-jdbc:12.6.1.jre11'
implementation 'io.debezium:debezium-embedded:2.6.1.Final'
implementation 'io.debezium:debezium-embedded:2.7.1.Final'
implementation 'io.debezium:debezium-connector-sqlserver:2.6.2.Final'
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.1.9
dockerImageTag: 4.1.10
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static Properties getDebeziumProperties(final JdbcDatabase database, fina
} else {
// If not in snapshot mode, initial will make sure that a snapshot is taken if the transaction log
// is rotated out. This will also end up read streaming changes from the transaction_log.
props.setProperty("snapshot.mode", "initial");
props.setProperty("snapshot.mode", "when_needed");
}

props.setProperty("snapshot.isolation.mode", "read_committed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ public MssqlCdcTargetPosition(final Lsn targetLsn) {
public boolean reachedTargetPosition(final ChangeEventWithMetadata changeEventWithMetadata) {
if (changeEventWithMetadata.isSnapshotEvent()) {
return false;
} else if (SnapshotMetadata.LAST == changeEventWithMetadata.snapshotMetadata()) {
} else if (SnapshotMetadata.LAST == changeEventWithMetadata.getSnapshotMetadata()) {
LOGGER.info("Signalling close because Snapshot is complete");
return true;
} else {
final Lsn recordLsn = extractLsn(changeEventWithMetadata.eventValueAsJson());
final Lsn recordLsn = extractLsn(changeEventWithMetadata.getEventValueAsJson());
final boolean isEventLSNAfter = targetLsn.compareTo(recordLsn) <= 0;
if (isEventLSNAfter) {
LOGGER.info("Signalling close because record's LSN : " + recordLsn + " is after target LSN : " + targetLsn);
Expand Down Expand Up @@ -122,7 +122,7 @@ public boolean isEventAheadOffset(Map<String, String> offset, ChangeEventWithMet
if (offset == null || offset.size() != 1) {
return false;
}
final Lsn eventLsn = extractLsn(event.eventValueAsJson());
final Lsn eventLsn = extractLsn(event.getEventValueAsJson());
final Lsn offsetLsn = offsetToLsn(offset);
return eventLsn.compareTo(offsetLsn) > 0;
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.1.10 | 2024-08-27 | [44759](https://github.com/airbytehq/airbyte/pull/44759) | Improve null safety in parsing debezium change events. |
| 4.1.9 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
| 4.1.8 | 2024-08-08 | [43410](https://github.com/airbytehq/airbyte/pull/43410) | Adopt latest CDK. |
| 4.1.7 | 2024-08-06 | [42869](https://github.com/airbytehq/airbyte/pull/42869) | Adopt latest CDK. |
Expand Down
30 changes: 30 additions & 0 deletions docs/integrations/sources/mssql/mssql-troubleshooting.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Troubleshooting Microsoft SQL Server (MSSQL) Sources

## Connector Limitations

### Adding columns to existing tables with CDC

When working with source SQL Server (MSSQL) in CDC mode, Making alteration to a table such as `ALTER TABLE <table> ADD <column>` will not automatically be reflected in the CDC stream.
The easiest way of making CDC match the new structure of a table. You can disable and re-enable CDC on the table. This will create a new capture instance for the table with the new structure:
1. Disabling CDC on the table:
```sql
EXEC sys.sp_cdc_disable_table
@source_schema = N'<schema>',
@source_name = N'<table>',
@capture_instance = N'<capture instance (typically schema_table)>'
```
2. Enabling CDC on the table:
```sql
EXEC sys.sp_cdc_enable_table
@source_schema = N'<schema>',
@source_name = N'<table>',
@role_name = NULL
```
Note: You may want to set a `@role_name` or any other arguments similarly to how they were set when CDC was enabled in the first place.

You can validate which columns are being captured by running the following query:
```sql
EXEC sys.sp_cdc_get_captured_columns
@capture_instance = N'<capture instance (typically schema_table)>';
```

30 changes: 23 additions & 7 deletions docusaurus/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ function getSourceConnectors() {
"readme",
"postgres",
"mongodb-v2",
"mssql",
"mysql",
]);
}
Expand Down Expand Up @@ -147,6 +148,22 @@ const sourceMysql = {
],
};

const sourceMssql = {
type: "category",
label: "MS SQL Server (MSSQL)",
link: {
type: "doc",
id: "integrations/sources/mssql",
},
items: [
{
type: "doc",
label: "Troubleshooting",
id: "integrations/sources/mssql/mssql-troubleshooting",
},
],
};

const destinationS3 = {
type: "category",
label: "S3",
Expand All @@ -156,15 +173,15 @@ const destinationS3 = {
},
items: [
{
type: "doc",
label: "Migration Guide",
id: "integrations/destinations/s3-migrations",
type: "doc",
label: "Migration Guide",
id: "integrations/destinations/s3-migrations",
},
{
type: "doc",
label: "Troubleshooting",
id: "integrations/destinations/s3/s3-troubleshooting",
}
},
],
};

Expand Down Expand Up @@ -350,6 +367,7 @@ const connectorCatalog = {
sourcePostgres,
sourceMongoDB,
sourceMysql,
sourceMssql,
...getSourceConnectors(),
].sort((itemA, itemB) => itemA.label.localeCompare(itemB.label)),
},
Expand Down Expand Up @@ -573,9 +591,7 @@ module.exports = {
type: "doc",
id: "operator-guides/upgrading-airbyte",
},
items: [
"managing-airbyte/connector-updates"
],
items: ["managing-airbyte/connector-updates"],
},
{
type: "category",
Expand Down
Loading