Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resumable Full Refresh sync for mssql #37451

Merged
merged 69 commits into from
May 7, 2024
Merged

Resumable Full Refresh sync for mssql #37451

merged 69 commits into from
May 7, 2024

Conversation

rodireich
Copy link
Contributor

@rodireich rodireich commented Apr 19, 2024

The implementation for Mssql references MySql's implementation: #36932

Copy link

vercel bot commented Apr 19, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview May 7, 2024 5:49am

@octavia-squidington-iii octavia-squidington-iii added area/connectors Connector related issues CDK Connector Development Kit connectors/source/mssql labels Apr 19, 2024
@rodireich rodireich changed the base branch from master to xiaohan/cdk-rfr-interface April 23, 2024 22:25
@rodireich rodireich marked this pull request as ready for review April 30, 2024 23:49
@rodireich rodireich requested a review from a team as a code owner April 30, 2024 23:49
@rodireich rodireich changed the title RFR for mssql Resumable Full Refresh sync for mssql Apr 30, 2024
@octavia-squidington-iv octavia-squidington-iv requested review from a team April 30, 2024 23:50
@@ -39,7 +41,7 @@ import org.mockito.Mockito
"The static variables are updated in subclasses for convenience, and cannot be final."
)
abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
@JvmField protected var testdb: T = createTestDatabase()
@JvmField protected var testdb: T? = null
Copy link
Contributor Author

@rodireich rodireich May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This line created a new test db every time an instance was created, leaving it open, while test themselves opened another test db in setup.
This left test db's that were never torn down

@@ -281,6 +282,7 @@ public void close() {
bgThread.stop = true;
}
super.close();
MssqlDebeziumStateUtil.disposeInitialState();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Because concurrent mssql tests are reusing threads in a fixed thread pool, it is necessary to dispose of cached state so the next test using this thread will start fresh.
Cached initial state is tied to a thread (ThreadLocal<>)

Copy link
Contributor

@xiaohansong xiaohansong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pretty clean! I'm halfway through the PR and will finish later.

? new CdcState().withState(initialDebeziumState)
: stateManager.getCdcStateManager().getCdcState();
cdcStreamsForInitialOrderedColumnLoad(stateManager.getCdcStateManager(), catalog, savedOffsetStillPresentOnServer);
final CdcState stateToBeUsed = getCdcState(database, catalog, stateManager, savedOffsetStillPresentOnServer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should rename this to initialState or initialStateToBeUsed

Copy link
Contributor

@xiaohansong xiaohansong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@@ -227,9 +221,11 @@ private AutoCloseableIterator<AirbyteMessage> augmentWithState(final AutoCloseab
final Long syncCheckpointRecords = config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY) != null ? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
: DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS;

initialLoadStateManager.setStreamStateForIncrementalRunSupplier(streamStateForIncrementalRunSupplier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized this will be a bug, I reproduced it on mysql and postgres. I think mssql is fine because we set streamStateForIncrementalRunSupplier the same for both full refresh and incremental refresh - so just FYI

Since we share initialLoadStateManager, and we setup the iterators before actually iterating through them, the full refresh setup will overwrite the ones used in incremental iterators.

In postgres and mysql my solution was to provide a optional streamStateForIncrementalRunSupplier and in state manager it will default to emptyObject. So I think we achieved the same thing but just with different approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's take this offline. I'd like to understand how mssql is different.

@@ -23,7 +23,8 @@
import org.junit.jupiter.api.parallel.ExecutionMode;

@TestInstance(Lifecycle.PER_METHOD)
@Execution(ExecutionMode.CONCURRENT)
@Execution(ExecutionMode.SAME_THREAD)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. forgot to change back. thanks

|:--------|:-----------| :---------------------------------------------------------------------------------------------------------------- |:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.0.15 | 2024-04-22 | [37541](https://github.com/airbytehq/airbyte/pull/37541) | Adopt latest CDK. reduce excessive logs. |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.0.16 | 2024-04-30 | [37451](https://github.com/airbytehq/airbyte/pull/37451) | Resumable full refresh read of tables. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this needs to be a minor update! and also we need to figure out how to highlight the minimum platform requirement in this doc.

this.pairToOrderedColInfo = pairToOrderedColInfo;
this.pairToOrderedColLoadStatus = MssqlInitialLoadStateManager.initPairToOrderedColumnLoadStatusMap(initialLoadStreams.pairToInitialLoadStatus());
}

public MssqlInitialLoadStreamStateManager(final ConfiguredAirbyteCatalog catalog,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can delete this now

@rodireich rodireich changed the base branch from xiaohan/cdk-rfr-interface to master May 3, 2024 22:47
@rodireich rodireich merged commit 5432fab into master May 7, 2024
30 checks passed
@rodireich rodireich deleted the rodi/5th-7016 branch May 7, 2024 14:36
throw new RuntimeException(String.format("Stream %s was not provided with an appropriate cursor", stream.getStream().getName()));
}
final CursorBasedStatus cursorBasedStatus = new CursorBasedStatus();
final Optional<String> maybeCursorField = Optional.ofNullable(cursorInfoOptional.get().getCursorField());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephane-airbyte actually this change by @rodireich should have fixed the issue that we were discussing here

#39143 (comment)

Noticing the use of Optional and ifPresent function below which were not checked in the previous version. So I will close that issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. This is indeed fixing it. Thank you

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the discussion that led to this investigation!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation CDK Connector Development Kit connectors/source/mssql
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants