[Feature][Connector-V2][SqlServer CDC] support sqlserver schema evolution#10890
[Feature][Connector-V2][SqlServer CDC] support sqlserver schema evolution#10890CloverDew wants to merge 12 commits into
Conversation
830a124 to
0bdbe16
Compare
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for working on this. I went through the latest head and retraced the SQL Server CDC schema-evolution path end to end. The overall direction makes sense, but I still found one blocking correctness issue before this can be merged.
What this PR solves
- User pain point
Today SQL Server CDC schema changes are hard to propagate safely through SeaTunnel, especially when column-level DDL needs to become downstream schema evolution events. - Fix approach
This PR adds SQL Server-specific schema diff / event resolution logic and an integration test that exercises the Flink schema-evolution path. - One-line summary
The feature is valuable, but the current rename heuristic can still translate some realDROP + ADDchanges intoRENAME, which changes the downstream DDL semantics.
Runtime path I checked
SQL Server CDC captures DDL
-> SqlServerSchemaChangeResolver.resolve(...)
-> diffColumns(...) [SqlServerSchemaChangeResolver.java:227-245]
-> pairRenameColumns(...) [247-304]
-> emit RENAME / ADD / DROP events
-> downstream schema evolution sink applies those events
Key findings
- The normal path absolutely hits this code: SQL Server column changes must go through
SqlServerSchemaChangeResolverbefore SeaTunnel can emit schema-change events. - The latest implementation does cover the happy-path rename case.
- But
pairRenameColumns(...)still treats “same ordinal + same type” as sufficient evidence for rename. - That is not semantically safe, because a real
DROP old_col+ADD new_col same_type same_positionis not equivalent toRENAME old_col TO new_colfor downstream sinks.
Findings
Issue 1: DROP + ADD can still be collapsed into RENAME, which changes downstream schema-evolution semantics
- Location:
seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerSchemaChangeResolver.java:247-304 - Description:
This logic sits directly in the SQL Server CDC schema-resolution hot path.pairRenameColumns(...)pairs dropped/added columns by ordinal and type and then emits a rename. That is too aggressive: in real SQL Server DDL, “drop one column and add another compatible column in the same position” is not the same operation as rename. - Risk:
This is a blocking correctness issue. Downstream sinks may apply a rename when the real intent was drop+add, which can change column lineage, target-side DDL behavior, and user-visible schema semantics. - Best improvement:
Option A: only emitRENAMEwhen the upstream change source gives strong rename evidence; otherwise fall back toDROP + ADD.
Option B: keep the heuristic, but add stricter guards and still prefer the conservativeDROP + ADDpath whenever the intent is not provable. - Severity: High
Test / CI notes
SqlServerCDCIT.testWithSchemaEvolution()covers the end-to-end path and uses condition-based waiting rather than new fixed sleeps, so I did not see a new flaky-test pattern there.- The current
Buildsignal is still incomplete/red from GitHub metadata. The visible failed run only shows the early workflow jobs being cancelled, so I could not attribute a SQL Server-specific CI failure from the exposed logs alone.
Conclusion: can merge after fixes
- Blocking items
- Issue 1: the current rename pairing is not semantically safe and can emit the wrong schema-change operation on the main path.
- Suggested but non-blocking improvements
- Add a regression case that proves a same-ordinal / same-type replace-column scenario is preserved as
DROP + ADDrather than being upgraded toRENAME.
Overall, the direction is good and the integration coverage helps, but the rename detection still needs to become more conservative before this is safe to merge.
ef2bff3 to
940c243
Compare
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the update. I re-reviewed the latest head from the full SQL Server schema-evolution path, and the key blocker from the previous round is fixed now: the resolver no longer treats a generic drop/add shape as a rename unless the DDL explicitly says so.
What problem this PR solves
- User pain point
SQL Server CDC did not have a reliable schema-evolution event path, especially when add/drop/rename/modify changes had to be translated into downstreamSchemaChangeEvents. - Fix approach
The PR introducesSqlServerSchemaChangeResolverto convert Debezium schema-change records into SeaTunnel column events, with unit and e2e coverage for add/drop/rename/modify paths. - One-line summary
This adds SQL Server CDC schema-evolution event resolution and propagation.
1. Code change review
1.1 Core logic analysis
The runtime path is:
Debezium SQL Server schema-change SourceRecord
-> SqlServerSchemaChangeResolver.support(record)
-> resolve(record, catalogTables)
-> resolveTablePath(...)
-> getCurrentTable(...)
-> parseSpRenameColumns(ddl)
-> diffColumns(...)
-> pairRenameColumns(...)
-> add / drop / modify / change events
-> downstream schema operator / coordinator consumes AlterTableColumnsEvent
The important latest-head behavior is here:
if (!StringUtils.equalsIgnoreCase(addedName, explicitRenames.get(removedName))) {
continue;
}
if (!sameDefinitionExceptName(removed.value, convertedAdded)) {
continue;
}That means rename is now gated by an explicit sp_rename ... 'COLUMN' signal, instead of being inferred from any matching drop/add pair with the same definition.
The new tests cover both branches:
- explicit rename ->
AlterTableChangeColumnEvent - non-explicit rename-like DDL -> keep
ADD + DROP
And the e2e rename SQL now forces real post-switch row updates so the CDC stream actually emits verifiable events.
1.2 Compatibility impact
Fully compatible from the source-review perspective. This is an additive SQL Server CDC schema-evolution capability and I do not see a new protocol/default/serialization regression on the latest head.
1.3 Performance / side effects
The resolver work is schema-change-event scoped, not row-hot-path scoped. I do not see a new CPU, memory, GC, concurrency, retry, idempotency, or resource-release problem in the current implementation.
1.4 Error handling and logging
No new source-level blocking issue found.
2. Code quality evaluation
2.1 Code style
The latest implementation is much clearer than the earlier heuristic-only version. The rename decision is now concentrated in parseSpRenameColumns(...) and pairRenameColumns(...), which makes the behavior easier to reason about.
2.2 Test coverage and stability
- The unit tests cover add, explicit rename, non-explicit rename, and bracketed identifiers.
- The e2e test runs the full add -> drop -> rename -> modify evolution sequence.
- I did not find a flaky-test pattern in the newly added rename path.
Test stability rating: Stable.
2.3 Documentation
I am not expanding the docs gap into a new blocker in this round. The main previous logic issue is fixed on the latest head.
3. Architecture
3.1 Solution quality
This is a precise long-term direction. Using explicit SQL Server rename DDL as the strong signal is much safer than generic shape-based inference.
3.2 Maintainability
Good. Future SQL Server schema-change support can continue to build on the same “DDL signal + table_changes diff” structure.
3.3 Extensibility
Good reuse potential for future column-level schema changes.
3.4 Historical compatibility
No new historical-compatibility concern found on the latest head.
4. Issue summary
No new source-level blocking issue found.
5. Merge conclusion
Conclusion: can merge
-
Blocking items
No source blocker from the latest head. -
Suggested follow-ups
None.
From the source-review side, the latest head looks good to me. The rename false-positive risk that blocked the earlier version is addressed in the current implementation. The remaining gate is just waiting for the Build check to finish.
DanielLeens
left a comment
There was a problem hiding this comment.
Sorry for missing this in the last review — that's on me. I re-reviewed the latest head from scratch, and I want to flag one more item that I should have caught earlier. This is a carryover from the previous version, not something introduced by the final fix ci commit.
What problem this PR solves
- User pain point
SQL Server CDC needs to translate upstream DDL into SeaTunnel schema-change events so downstream Flink schema-evolution sinks can keep the target table aligned. - Fix approach
This PR adds SQL Server schema-change resolution and wires the events into the shared Flink schema-evolution path. - One sentence
The SQL Server-specific direction is good, but the latest head still carries one blocking recovery-chain bug in the shared Flink schema-evolution layer.
Simple example: if SQL Server emits ALTER TABLE orders ADD region VARCHAR(20) while the Flink job is checkpointing, the source side now waits for sink-side schema application. But after a failure/recovery window, one subtask can still remember "I already processed this epoch" before the actual ALTER TABLE finished, and then ACK the replayed event without reapplying the DDL.
1. Code change review
1.1 Core logic analysis
Runtime chain I checked:
SQL Server CDC schema change
-> SqlServerSchemaChangeResolver.resolve(...)
-> Flink schema operator / coordinator emits SchemaChangeEvent
-> BroadcastSchemaSinkOperator.handleBroadcastedSchemaChange(...)
-> emitApplySchemaEventToSink(event, epoch)
-> FlinkSinkWriter.handleSchemaChangeEvent(...)
-> applySchemaChange(...)
-> sendSchemaChangeAck(...)
-> LocalSchemaCoordinator tracks applied epochs and acks
The important current-head pieces are:
emitApplySchemaEventToSink(event, epoch);
lastProcessedEpoch.put(tableId, epoch);in BroadcastSchemaSinkOperator, while the real sink-side completion still happens later in:
((SupportSchemaEvolutionSinkWriter) sinkWriter).applySchemaChange(schemaChangeEvent);
sendSchemaChangeAck(schemaChangeEvent, epoch, subtaskId, success);That means the operator state advances before the actual sink DDL is durably known to have succeeded.
Key findings:
- The normal SQL Server schema-evolution path absolutely hits this shared Flink layer.
- The ACK timing improvement is real: the writer now ACKs only after
applySchemaChange(...)finishes. - But
BroadcastSchemaSinkOperatorstill snapshotslastProcessedEpochtoo early. - On recovery, that early state can turn an unfinished DDL into a duplicate fast-path ACK.
Logic correctness in the failure path:
Schema event broadcast
-> BroadcastSchemaSinkOperator forwards event downstream
-> BroadcastSchemaSinkOperator immediately writes lastProcessedEpoch
-> checkpoint captures that operator state
-> job fails before FlinkSinkWriter finishes ALTER TABLE
-> restore
-> same schema event is replayed
-> BroadcastSchemaSinkOperator sees epoch <= lastProcessedEpoch
-> treats it as already processed and sends ACK without reapplying DDL
-> coordinator can release data for the new schema even though the sink table was never altered
1.2 Compatibility impact
Partially incompatible in practice if this bug is left in place, because recovery can make the sink-side schema state diverge from the source-side event history. API/config/default-value compatibility is otherwise unchanged by this specific finding.
1.3 Performance / side effects
The main risk is not CPU or GC. It is recovery correctness, idempotency, and state consistency across failures. Once lastProcessedEpoch gets ahead of the real sink DDL, replay can no longer trust the duplicate-detection shortcut.
1.4 Error handling and logging
Issue 1: lastProcessedEpoch is persisted before the sink writer actually finishes the schema change
- Location:
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/BroadcastSchemaSinkOperator.java:169-176 - Problem description:
This code is on the main Flink schema-evolution path for this PR.handleBroadcastedSchemaChange(...)forwards the event to the sink writer and immediately records the epoch as processed. But the real DDL completion still happens later inFlinkSinkWriter.handleSchemaChangeEvent(...)whenapplySchemaChange(...)returns and the writer sends the ACK. - Potential risk:
If a checkpoint captureslastProcessedEpochafter the forward but before the real sink-side DDL finishes, recovery can replay the same event and take the duplicate branch (epoch <= lastProcessedEpoch) instead of reapplying the schema change. That can release post-DDL rows while the sink table is still on the old schema. This is a blocking recovery / correctness issue. - Best improvement suggestion:
Move the "already processed" state transition behind the real sink-side completion boundary. In practice,lastProcessedEpochshould only advance after the operator can prove the schema change completed successfully, or it should be reconstructed from an ACK-backed state source rather than from the pre-ACK forward step. - Severity: High
2. Code quality evaluation
2.1 Code style
The SQL Server resolver itself looks clearer on the latest head, and the shared Flink code is readable. The blocker is semantic rather than stylistic.
2.2 Test coverage and stability
The added SQL Server tests cover the happy path well, and I did not see a new flaky-test pattern in the newly added assertions. Stability rating: Stable.
Missing coverage that matters for this blocker:
- recovery after
BroadcastSchemaSinkOperatorhas checkpointed the epoch but beforeFlinkSinkWriter.applySchemaChange(...)completes; - replay of the same epoch after partial progress.
2.3 Documentation
No extra doc blocker from this round. The remaining issue is in the shared runtime semantics, not the SQL Server-specific explanation.
3. Architecture
3.1 Solution elegance
The direction is still a precise long-term fix, but the recovery boundary is not fully closed yet.
3.2 Maintainability
The current split between "forwarded" and "actually applied" is understandable, but the processed-epoch state still sits on the wrong side of that boundary.
3.3 Extensibility
This matters beyond SQL Server. The same shared Flink schema-evolution layer will affect every connector that reuses it.
3.4 Historical compatibility
Runtime compatibility across failover is still the concern here. A recovered job can misclassify an unfinished schema change as completed.
4. Issue summary
| No. | Issue | Location | Severity |
|---|---|---|---|
| Issue 1 | lastProcessedEpoch advances before the real sink-side schema change completes |
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/BroadcastSchemaSinkOperator.java:169-176 |
High |
5. Merge conclusion
Conclusion: can merge after fixes
- Blocking items
- Issue 1: close the recovery gap so replay cannot ACK an unfinished schema change as already processed.
- Suggested but non-blocking follow-ups
- Add one focused recovery test that covers "epoch checkpointed in broadcast operator, DDL not yet finished in sink writer, then restore".
Overall, I still like the SQL Server-specific direction and the rename-path fixes from the previous round. But with the current shared Flink state transition, I do not think this latest head is safe to merge yet.
|
Hello, the issue you reviewed is from another pull request (#10648). The reason I kept these commits in SQL Server CDC is that without the fixes in 10648, the results of |
|
Thanks for clarifying this. I agree that the shared Flink schema-evolution changes are also being worked in #10648, and I should have said that dependency more explicitly. The reason I still treated it as a blocker on this PR is that the current head of If your plan is to drop that carried shared change once |
Purpose of this pull request
This PR implements schema evolution support for SQL Server CDC, enabling the SQL Server CDC connector to detect and propagate DDL changes.
The demand comes from: #10282
Key Components:
Based on: #10648 (review should start from commit 1187917 commit message: implements sqlserver cdc), please wait for the merge at #10648, Otherwise, the test results of SqlServerCDCIT on the Flink engine will be unstable.
How was this patch tested?
SqlServerSchemaChangeResolverTest, Tests column diff logic, rename detection, table path parsing, and edge cases
SqlServerSourceConfigFactoryTest, Tests configuration factory
SqlServerDialectTest, Tests dialect behavior
SqlServerCDCIT with dedicated schema change test cases: add column operations, drop column operations, alter column type changes, sp_rename operations
Test configuration: sqlservercdc_to_sqlserver_with_schema_change.conf
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.