Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resumable Full Refresh sync for mssql #37451

Merged
merged 69 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
000d09e
poc pr
xiaohansong Mar 29, 2024
701f879
test
xiaohansong Apr 1, 2024
6961a94
Merge remote-tracking branch 'origin/master' into xiaohan/poc-rfr
xiaohansong Apr 1, 2024
3d7e013
merge to head change
xiaohansong Apr 1, 2024
41848ed
Merge remote-tracking branch 'origin/master' into xiaohan/poc-rfr
xiaohansong Apr 4, 2024
3907d9d
save work
xiaohansong Apr 4, 2024
9164bfd
save work
xiaohansong Apr 5, 2024
92b71b6
Merge remote-tracking branch 'origin/master' into xiaohan/poc-rfr
xiaohansong Apr 5, 2024
651f088
save work
xiaohansong Apr 5, 2024
8183126
Merge remote-tracking branch 'origin/master' into xiaohan/cdk-rfr-int…
xiaohansong Apr 5, 2024
566e344
cdk change for rfr
xiaohansong Apr 5, 2024
c81fa4c
some clean up
xiaohansong Apr 5, 2024
ff6ce90
update interface
xiaohansong Apr 16, 2024
908d1da
Merge remote-tracking branch 'origin/master' into xiaohan/cdk-rfr-int…
xiaohansong Apr 16, 2024
760777a
Merge branch 'master' into xiaohan/cdk-rfr-interface
xiaohansong Apr 17, 2024
653c19b
mssql rfr implementation
rodireich Apr 19, 2024
c469c99
mssql rfr implementation
rodireich Apr 19, 2024
4ffe6b2
mssql rfr implementation
rodireich Apr 19, 2024
37fd000
mssql rfr implementation
rodireich Apr 20, 2024
09488ab
mssql rfr implementation
rodireich Apr 20, 2024
4d97b2d
update jdbc
xiaohansong Apr 23, 2024
eee5db6
Merge remote-tracking branch 'origin/master' into xiaohan/cdk-rfr-int…
xiaohansong Apr 23, 2024
6bd2b75
format
xiaohansong Apr 23, 2024
12997ec
Merge branch 'master' into xiaohan/cdk-rfr-interface
xiaohansong Apr 23, 2024
137dd86
Merge branch 'xiaohan/cdk-rfr-interface' into rodi/5th-7016
rodireich Apr 23, 2024
1d60fe6
test
rodireich Apr 23, 2024
f98f02f
mssql rfr
rodireich Apr 24, 2024
470e432
mssql rfr
rodireich Apr 26, 2024
8f118ad
Merge remote-tracking branch 'origin/master' into xiaohan/cdk-rfr-int…
xiaohansong Apr 26, 2024
90b9ca8
mssql rfr
rodireich Apr 28, 2024
ed4e7fe
Merge branch 'xiaohan/cdk-rfr-interface' into rodi/5th-7016
rodireich Apr 28, 2024
7cca19e
mssql rfr
rodireich Apr 28, 2024
0d8d3f8
mssql rfr
rodireich Apr 29, 2024
266ff96
mssql rfr
rodireich Apr 29, 2024
9a118ff
mssql rfr
rodireich Apr 29, 2024
34f4d47
mssql rfr
rodireich Apr 29, 2024
da2fbe1
mssql rfr
rodireich Apr 29, 2024
7e2e66e
mssql rfr
rodireich Apr 29, 2024
730b64c
mssql rfr
rodireich Apr 29, 2024
ccf2a3c
mssql rfr
rodireich Apr 29, 2024
f22f33c
mssql rfr
rodireich Apr 29, 2024
cb24220
mssql rfr
rodireich Apr 29, 2024
886c348
mssql rfr
rodireich Apr 29, 2024
d1800e5
mssql rfr
rodireich Apr 29, 2024
6e9299b
mssql rfr
rodireich Apr 29, 2024
05f470e
mssql rfr
rodireich Apr 29, 2024
e78b56e
mssql rfr
rodireich Apr 30, 2024
984c8a4
mssql rfr
rodireich Apr 30, 2024
18843f2
mssql rfr
rodireich Apr 30, 2024
3f04061
mssql rfr
rodireich Apr 30, 2024
d325718
mssql rfr
rodireich Apr 30, 2024
b25a0d6
mssql rfr
rodireich Apr 30, 2024
22aab75
mssql rfr
rodireich Apr 30, 2024
4462470
mssql rfr
rodireich Apr 30, 2024
f3b887d
mssql rfr
rodireich Apr 30, 2024
39f4b2d
mssql rfr
rodireich Apr 30, 2024
ca257d1
mssql rfr
rodireich Apr 30, 2024
afbb193
mssql rfr
rodireich Apr 30, 2024
b94f8e1
mssql rfr
rodireich Apr 30, 2024
2868a08
code sanity
rodireich May 2, 2024
0e4ccb5
Merge branch 'master' into rodi/5th-7016
rodireich May 3, 2024
3a941b8
mssql rfr
rodireich May 3, 2024
b337bd4
Merge branch 'master' into rodi/5th-7016
rodireich May 5, 2024
9fefb7a
mssql rfr
rodireich May 5, 2024
f9ba78c
mssql rfr
rodireich May 5, 2024
468972b
mssql rfr
rodireich May 5, 2024
dd61263
mssql rfr
rodireich May 6, 2024
7e8a773
mssql rfr
rodireich May 7, 2024
8b22e62
mssql rfr
rodireich May 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ internal class DefaultJdbcSourceAcceptanceTest :
JdbcSourceAcceptanceTest<
DefaultJdbcSourceAcceptanceTest.PostgresTestSource, BareBonesTestDatabase>() {
override fun config(): JsonNode {
return testdb.testConfigBuilder().build()
return testdb?.testConfigBuilder()?.build()!!
}

override fun source(): PostgresTestSource {
Expand Down Expand Up @@ -181,12 +181,15 @@ internal class DefaultJdbcSourceAcceptanceTest :
fun testCustomParametersOverwriteDefaultParametersExpectException() {
val connectionPropertiesUrl = "ssl=false"
val config =
getConfigWithConnectionProperties(
PSQL_CONTAINER,
testdb.databaseName,
connectionPropertiesUrl
)
val customParameters = parseJdbcParameters(config, JdbcUtils.CONNECTION_PROPERTIES_KEY, "&")
testdb?.let {
getConfigWithConnectionProperties(
PSQL_CONTAINER,
it.databaseName,
connectionPropertiesUrl
)
}
val customParameters =
parseJdbcParameters(config!!, JdbcUtils.CONNECTION_PROPERTIES_KEY, "&")
val defaultParameters = mapOf("ssl" to "true", "sslmode" to "require")
Assertions.assertThrows(IllegalArgumentException::class.java) {
JdbcDataSourceUtils.assertCustomParametersDontOverwriteDefaultParameters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.mockito.Mockito
"The static variables are updated in subclasses for convenience, and cannot be final."
)
abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
@JvmField protected var testdb: T = createTestDatabase()
@JvmField protected var testdb: T? = null
Copy link
Contributor Author

@rodireich rodireich May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This line created a new test db every time an instance was created, leaving it open, while test themselves opened another test db in setup.
This left test db's that were never torn down


protected fun streamName(): String {
return TABLE_NAME
Expand Down Expand Up @@ -120,60 +120,60 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
testdb!!.with("ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD'")
}
testdb
.with(
?.with(
createTableQuery(
getFullyQualifiedTableName(TABLE_NAME),
COLUMN_CLAUSE_WITH_PK,
primaryKeyClause(listOf("id"))
)
)
.with(
?.with(
"INSERT INTO %s(id, name, updated_at) VALUES (1, 'picard', '2004-10-19')",
getFullyQualifiedTableName(TABLE_NAME)
)
.with(
?.with(
"INSERT INTO %s(id, name, updated_at) VALUES (2, 'crusher', '2005-10-19')",
getFullyQualifiedTableName(TABLE_NAME)
)
.with(
?.with(
"INSERT INTO %s(id, name, updated_at) VALUES (3, 'vash', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)
)
.with(
?.with(
createTableQuery(
getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK),
COLUMN_CLAUSE_WITHOUT_PK,
""
)
)
.with(
?.with(
"INSERT INTO %s(id, name, updated_at) VALUES (1, 'picard', '2004-10-19')",
getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK)
)
.with(
?.with(
"INSERT INTO %s(id, name, updated_at) VALUES (2, 'crusher', '2005-10-19')",
getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK)
)
.with(
?.with(
"INSERT INTO %s(id, name, updated_at) VALUES (3, 'vash', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK)
)
.with(
?.with(
createTableQuery(
getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK),
COLUMN_CLAUSE_WITH_COMPOSITE_PK,
primaryKeyClause(listOf("first_name", "last_name"))
)
)
.with(
?.with(
"INSERT INTO %s(first_name, last_name, updated_at) VALUES ('first', 'picard', '2004-10-19')",
getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK)
)
.with(
?.with(
"INSERT INTO %s(first_name, last_name, updated_at) VALUES ('second', 'crusher', '2005-10-19')",
getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK)
)
.with(
?.with(
"INSERT INTO %s(first_name, last_name, updated_at) VALUES ('third', 'vash', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK)
)
Expand Down Expand Up @@ -774,11 +774,11 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {

protected open fun executeStatementReadIncrementallyTwice() {
testdb
.with(
?.with(
"INSERT INTO %s (id, name, updated_at) VALUES (4, 'riker', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)
)
.with(
?.with(
"INSERT INTO %s (id, name, updated_at) VALUES (5, 'data', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)
)
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.31.5'
features = ['db-sources']
useLocalCdk = false
useLocalCdk = true
}

java {
// TODO: rewrite code to avoid javac wornings in the first place
// TODO: rewrite code to avoid javac warnings in the first place
compileJava {
options.compilerArgs += "-Xlint:-try,-rawtypes"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.0.17
dockerImageTag: 4.0.18
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,20 @@ public enum ReplicationMethod {

@VisibleForTesting
static boolean isCdc(final JsonNode config) {
// new replication method config since version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isObject()) {
final JsonNode replicationConfig = config.get(LEGACY_REPLICATION_FIELD);
return ReplicationMethod.valueOf(replicationConfig.get(METHOD_FIELD).asText()) == ReplicationMethod.CDC;
}
// legacy replication method config before version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isTextual()) {
return ReplicationMethod.valueOf(config.get(LEGACY_REPLICATION_FIELD).asText()) == ReplicationMethod.CDC;
}
if (config.hasNonNull(REPLICATION_FIELD)) {
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
return ReplicationMethod.valueOf(replicationConfig.get(REPLICATION_TYPE_FIELD).asText()) == ReplicationMethod.CDC;
if (config != null) {
// new replication method config since version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isObject()) {
final JsonNode replicationConfig = config.get(LEGACY_REPLICATION_FIELD);
return ReplicationMethod.valueOf(replicationConfig.get(METHOD_FIELD).asText()) == ReplicationMethod.CDC;
}
// legacy replication method config before version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isTextual()) {
return ReplicationMethod.valueOf(config.get(LEGACY_REPLICATION_FIELD).asText()) == ReplicationMethod.CDC;
}
if (config.hasNonNull(REPLICATION_FIELD)) {
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
return ReplicationMethod.valueOf(replicationConfig.get(REPLICATION_TYPE_FIELD).asText()) == ReplicationMethod.CDC;
}
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public AirbyteMessage saveState(final Map<String, String> offset, final SchemaHi
state.put(IS_COMPRESSED, dbHistory.isCompressed());

final JsonNode asJson = Jsons.jsonNode(state);

LOGGER.info("debezium state offset: {}", Jsons.jsonNode(offset));

final CdcState cdcState = new CdcState().withState(asJson);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.airbyte.cdk.integrations.source.relationaldb.models.InternalModels.StateType;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.math.BigDecimal;
Expand Down Expand Up @@ -185,46 +185,46 @@ public static Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair,

final Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair, CursorBasedStatus> cursorBasedStatusMap = new HashMap<>();
streams.forEach(stream -> {
try {
final String name = stream.getStream().getName();
final String namespace = stream.getStream().getNamespace();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(namespace, name, quoteString);

final Optional<CursorInfo> cursorInfoOptional =
stateManager.getCursorInfo(new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair(name, namespace));
if (cursorInfoOptional.isEmpty()) {
throw new RuntimeException(String.format("Stream %s was not provided with an appropriate cursor", stream.getStream().getName()));
}
final String name = stream.getStream().getName();
final String namespace = stream.getStream().getNamespace();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(namespace, name, quoteString);

LOGGER.info("Querying max cursor value for {}.{}", namespace, name);
final String cursorField = cursorInfoOptional.get().getCursorField();
final Optional<CursorInfo> cursorInfoOptional =
stateManager.getCursorInfo(new AirbyteStreamNameNamespacePair(name, namespace));
if (cursorInfoOptional.isEmpty()) {
throw new RuntimeException(String.format("Stream %s was not provided with an appropriate cursor", stream.getStream().getName()));
}
final CursorBasedStatus cursorBasedStatus = new CursorBasedStatus();
final Optional<String> maybeCursorField = Optional.ofNullable(cursorInfoOptional.get().getCursorField());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephane-airbyte actually this change by @rodireich should have fixed the issue that we were discussing here

#39143 (comment)

Noticing the use of Optional and ifPresent function below which were not checked in the previous version. So I will close that issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. This is indeed fixing it. Thank you

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the discussion that led to this investigation!

maybeCursorField.ifPresent(cursorField -> {
LOGGER.info("Cursor {}. Querying max cursor value for {}.{}", cursorField, namespace, name);
final String quotedCursorField = getIdentifierWithQuoting(cursorField, quoteString);
final String cursorBasedSyncStatusQuery = String.format(MAX_CURSOR_VALUE_QUERY,
quotedCursorField,
fullTableName,
quotedCursorField,
quotedCursorField,
fullTableName);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(cursorBasedSyncStatusQuery).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
final CursorBasedStatus cursorBasedStatus = new CursorBasedStatus();
cursorBasedStatus.setStateType(StateType.CURSOR_BASED);
cursorBasedStatus.setVersion(2L);
cursorBasedStatus.setStreamName(name);
cursorBasedStatus.setStreamNamespace(namespace);
final List<JsonNode> jsonNodes;
try {
jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(cursorBasedSyncStatusQuery).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
} catch (SQLException e) {
throw new RuntimeException("Failed to read max cursor value from %s.%s".formatted(namespace, name), e);
}
cursorBasedStatus.setCursorField(ImmutableList.of(cursorField));

if (!jsonNodes.isEmpty()) {
final JsonNode result = jsonNodes.get(0);
cursorBasedStatus.setCursor(result.get(cursorField).asText());
cursorBasedStatus.setCursorRecordCount((long) jsonNodes.size());
}

cursorBasedStatusMap.put(new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair(name, namespace), cursorBasedStatus);
} catch (final SQLException e) {
throw new RuntimeException(e);
}
cursorBasedStatus.setStateType(StateType.CURSOR_BASED);
cursorBasedStatus.setVersion(2L);
cursorBasedStatus.setStreamName(name);
cursorBasedStatus.setStreamNamespace(namespace);
cursorBasedStatusMap.put(new AirbyteStreamNameNamespacePair(name, namespace), cursorBasedStatus);
});
});

return cursorBasedStatusMap;
Expand Down
Loading
Loading