Skip to content

[Feature][Connector-V2][SqlServer CDC] support sqlserver schema evolution#10890

Open
CloverDew wants to merge 12 commits into
apache:devfrom
CloverDew:feature/cdc-sqlserver-schema-evolution-on-flink
Open

[Feature][Connector-V2][SqlServer CDC] support sqlserver schema evolution#10890
CloverDew wants to merge 12 commits into
apache:devfrom
CloverDew:feature/cdc-sqlserver-schema-evolution-on-flink

Conversation

@CloverDew
Copy link
Copy Markdown
Contributor

Purpose of this pull request

This PR implements schema evolution support for SQL Server CDC, enabling the SQL Server CDC connector to detect and propagate DDL changes.

The demand comes from: #10282

Key Components:

  1. SqlServerSchemaChangeResolver, Core resolver that detects schema changes by comparing the previous catalog schema with Debezium's table change payload.
  2. SqlServerStreamingChangeEventSource, Extends Debezium's streaming source.
  3. Integration with SqlServerIncrementalSource, Implements SupportSchemaEvolution interface with supported types.

Based on: #10648 (review should start from commit 1187917 commit message: implements sqlserver cdc), please wait for the merge at #10648, Otherwise, the test results of SqlServerCDCIT on the Flink engine will be unstable.

How was this patch tested?

  1. Unit Tests:
  • SqlServerSchemaChangeResolverTest, Tests column diff logic, rename detection, table path parsing, and edge cases

  • SqlServerSourceConfigFactoryTest, Tests configuration factory

  • SqlServerDialectTest, Tests dialect behavior

  1. E2E Tests:
  • SqlServerCDCIT with dedicated schema change test cases: add column operations, drop column operations, alter column type changes, sp_rename operations

  • Test configuration: sqlservercdc_to_sqlserver_with_schema_change.conf

Check list

@CloverDew CloverDew force-pushed the feature/cdc-sqlserver-schema-evolution-on-flink branch from 830a124 to 0bdbe16 Compare May 15, 2026 07:50
Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this. I went through the latest head and retraced the SQL Server CDC schema-evolution path end to end. The overall direction makes sense, but I still found one blocking correctness issue before this can be merged.

What this PR solves

  • User pain point
    Today SQL Server CDC schema changes are hard to propagate safely through SeaTunnel, especially when column-level DDL needs to become downstream schema evolution events.
  • Fix approach
    This PR adds SQL Server-specific schema diff / event resolution logic and an integration test that exercises the Flink schema-evolution path.
  • One-line summary
    The feature is valuable, but the current rename heuristic can still translate some real DROP + ADD changes into RENAME, which changes the downstream DDL semantics.

Runtime path I checked

SQL Server CDC captures DDL
  -> SqlServerSchemaChangeResolver.resolve(...)
      -> diffColumns(...) [SqlServerSchemaChangeResolver.java:227-245]
      -> pairRenameColumns(...) [247-304]
      -> emit RENAME / ADD / DROP events
  -> downstream schema evolution sink applies those events

Key findings

  1. The normal path absolutely hits this code: SQL Server column changes must go through SqlServerSchemaChangeResolver before SeaTunnel can emit schema-change events.
  2. The latest implementation does cover the happy-path rename case.
  3. But pairRenameColumns(...) still treats “same ordinal + same type” as sufficient evidence for rename.
  4. That is not semantically safe, because a real DROP old_col + ADD new_col same_type same_position is not equivalent to RENAME old_col TO new_col for downstream sinks.

Findings

Issue 1: DROP + ADD can still be collapsed into RENAME, which changes downstream schema-evolution semantics

  • Location: seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerSchemaChangeResolver.java:247-304
  • Description:
    This logic sits directly in the SQL Server CDC schema-resolution hot path. pairRenameColumns(...) pairs dropped/added columns by ordinal and type and then emits a rename. That is too aggressive: in real SQL Server DDL, “drop one column and add another compatible column in the same position” is not the same operation as rename.
  • Risk:
    This is a blocking correctness issue. Downstream sinks may apply a rename when the real intent was drop+add, which can change column lineage, target-side DDL behavior, and user-visible schema semantics.
  • Best improvement:
    Option A: only emit RENAME when the upstream change source gives strong rename evidence; otherwise fall back to DROP + ADD.
    Option B: keep the heuristic, but add stricter guards and still prefer the conservative DROP + ADD path whenever the intent is not provable.
  • Severity: High

Test / CI notes

  • SqlServerCDCIT.testWithSchemaEvolution() covers the end-to-end path and uses condition-based waiting rather than new fixed sleeps, so I did not see a new flaky-test pattern there.
  • The current Build signal is still incomplete/red from GitHub metadata. The visible failed run only shows the early workflow jobs being cancelled, so I could not attribute a SQL Server-specific CI failure from the exposed logs alone.

Conclusion: can merge after fixes

  1. Blocking items
  • Issue 1: the current rename pairing is not semantically safe and can emit the wrong schema-change operation on the main path.
  1. Suggested but non-blocking improvements
  • Add a regression case that proves a same-ordinal / same-type replace-column scenario is preserved as DROP + ADD rather than being upgraded to RENAME.

Overall, the direction is good and the integration coverage helps, but the rename detection still needs to become more conservative before this is safe to merge.

@CloverDew CloverDew force-pushed the feature/cdc-sqlserver-schema-evolution-on-flink branch from ef2bff3 to 940c243 Compare May 16, 2026 12:03
Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I re-reviewed the latest head from the full SQL Server schema-evolution path, and the key blocker from the previous round is fixed now: the resolver no longer treats a generic drop/add shape as a rename unless the DDL explicitly says so.

What problem this PR solves

  • User pain point
    SQL Server CDC did not have a reliable schema-evolution event path, especially when add/drop/rename/modify changes had to be translated into downstream SchemaChangeEvents.
  • Fix approach
    The PR introduces SqlServerSchemaChangeResolver to convert Debezium schema-change records into SeaTunnel column events, with unit and e2e coverage for add/drop/rename/modify paths.
  • One-line summary
    This adds SQL Server CDC schema-evolution event resolution and propagation.

1. Code change review

1.1 Core logic analysis

The runtime path is:

Debezium SQL Server schema-change SourceRecord
  -> SqlServerSchemaChangeResolver.support(record)
  -> resolve(record, catalogTables)
     -> resolveTablePath(...)
     -> getCurrentTable(...)
     -> parseSpRenameColumns(ddl)
     -> diffColumns(...)
        -> pairRenameColumns(...)
        -> add / drop / modify / change events
  -> downstream schema operator / coordinator consumes AlterTableColumnsEvent

The important latest-head behavior is here:

if (!StringUtils.equalsIgnoreCase(addedName, explicitRenames.get(removedName))) {
    continue;
}
if (!sameDefinitionExceptName(removed.value, convertedAdded)) {
    continue;
}

That means rename is now gated by an explicit sp_rename ... 'COLUMN' signal, instead of being inferred from any matching drop/add pair with the same definition.

The new tests cover both branches:

  • explicit rename -> AlterTableChangeColumnEvent
  • non-explicit rename-like DDL -> keep ADD + DROP

And the e2e rename SQL now forces real post-switch row updates so the CDC stream actually emits verifiable events.

1.2 Compatibility impact

Fully compatible from the source-review perspective. This is an additive SQL Server CDC schema-evolution capability and I do not see a new protocol/default/serialization regression on the latest head.

1.3 Performance / side effects

The resolver work is schema-change-event scoped, not row-hot-path scoped. I do not see a new CPU, memory, GC, concurrency, retry, idempotency, or resource-release problem in the current implementation.

1.4 Error handling and logging

No new source-level blocking issue found.

2. Code quality evaluation

2.1 Code style

The latest implementation is much clearer than the earlier heuristic-only version. The rename decision is now concentrated in parseSpRenameColumns(...) and pairRenameColumns(...), which makes the behavior easier to reason about.

2.2 Test coverage and stability

  • The unit tests cover add, explicit rename, non-explicit rename, and bracketed identifiers.
  • The e2e test runs the full add -> drop -> rename -> modify evolution sequence.
  • I did not find a flaky-test pattern in the newly added rename path.

Test stability rating: Stable.

2.3 Documentation

I am not expanding the docs gap into a new blocker in this round. The main previous logic issue is fixed on the latest head.

3. Architecture

3.1 Solution quality

This is a precise long-term direction. Using explicit SQL Server rename DDL as the strong signal is much safer than generic shape-based inference.

3.2 Maintainability

Good. Future SQL Server schema-change support can continue to build on the same “DDL signal + table_changes diff” structure.

3.3 Extensibility

Good reuse potential for future column-level schema changes.

3.4 Historical compatibility

No new historical-compatibility concern found on the latest head.

4. Issue summary

No new source-level blocking issue found.

5. Merge conclusion

Conclusion: can merge

  1. Blocking items
    No source blocker from the latest head.

  2. Suggested follow-ups
    None.

From the source-review side, the latest head looks good to me. The rename false-positive risk that blocked the earlier version is addressed in the current implementation. The remaining gate is just waiting for the Build check to finish.

Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for missing this in the last review — that's on me. I re-reviewed the latest head from scratch, and I want to flag one more item that I should have caught earlier. This is a carryover from the previous version, not something introduced by the final fix ci commit.

What problem this PR solves

  • User pain point
    SQL Server CDC needs to translate upstream DDL into SeaTunnel schema-change events so downstream Flink schema-evolution sinks can keep the target table aligned.
  • Fix approach
    This PR adds SQL Server schema-change resolution and wires the events into the shared Flink schema-evolution path.
  • One sentence
    The SQL Server-specific direction is good, but the latest head still carries one blocking recovery-chain bug in the shared Flink schema-evolution layer.

Simple example: if SQL Server emits ALTER TABLE orders ADD region VARCHAR(20) while the Flink job is checkpointing, the source side now waits for sink-side schema application. But after a failure/recovery window, one subtask can still remember "I already processed this epoch" before the actual ALTER TABLE finished, and then ACK the replayed event without reapplying the DDL.

1. Code change review

1.1 Core logic analysis

Runtime chain I checked:

SQL Server CDC schema change
  -> SqlServerSchemaChangeResolver.resolve(...)
  -> Flink schema operator / coordinator emits SchemaChangeEvent
  -> BroadcastSchemaSinkOperator.handleBroadcastedSchemaChange(...)
      -> emitApplySchemaEventToSink(event, epoch)
      -> FlinkSinkWriter.handleSchemaChangeEvent(...)
          -> applySchemaChange(...)
          -> sendSchemaChangeAck(...)
  -> LocalSchemaCoordinator tracks applied epochs and acks

The important current-head pieces are:

emitApplySchemaEventToSink(event, epoch);
lastProcessedEpoch.put(tableId, epoch);

in BroadcastSchemaSinkOperator, while the real sink-side completion still happens later in:

((SupportSchemaEvolutionSinkWriter) sinkWriter).applySchemaChange(schemaChangeEvent);
sendSchemaChangeAck(schemaChangeEvent, epoch, subtaskId, success);

That means the operator state advances before the actual sink DDL is durably known to have succeeded.

Key findings:

  1. The normal SQL Server schema-evolution path absolutely hits this shared Flink layer.
  2. The ACK timing improvement is real: the writer now ACKs only after applySchemaChange(...) finishes.
  3. But BroadcastSchemaSinkOperator still snapshots lastProcessedEpoch too early.
  4. On recovery, that early state can turn an unfinished DDL into a duplicate fast-path ACK.

Logic correctness in the failure path:

Schema event broadcast
  -> BroadcastSchemaSinkOperator forwards event downstream
  -> BroadcastSchemaSinkOperator immediately writes lastProcessedEpoch
  -> checkpoint captures that operator state
  -> job fails before FlinkSinkWriter finishes ALTER TABLE
  -> restore
  -> same schema event is replayed
  -> BroadcastSchemaSinkOperator sees epoch <= lastProcessedEpoch
  -> treats it as already processed and sends ACK without reapplying DDL
  -> coordinator can release data for the new schema even though the sink table was never altered

1.2 Compatibility impact

Partially incompatible in practice if this bug is left in place, because recovery can make the sink-side schema state diverge from the source-side event history. API/config/default-value compatibility is otherwise unchanged by this specific finding.

1.3 Performance / side effects

The main risk is not CPU or GC. It is recovery correctness, idempotency, and state consistency across failures. Once lastProcessedEpoch gets ahead of the real sink DDL, replay can no longer trust the duplicate-detection shortcut.

1.4 Error handling and logging

Issue 1: lastProcessedEpoch is persisted before the sink writer actually finishes the schema change

  • Location: seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/BroadcastSchemaSinkOperator.java:169-176
  • Problem description:
    This code is on the main Flink schema-evolution path for this PR. handleBroadcastedSchemaChange(...) forwards the event to the sink writer and immediately records the epoch as processed. But the real DDL completion still happens later in FlinkSinkWriter.handleSchemaChangeEvent(...) when applySchemaChange(...) returns and the writer sends the ACK.
  • Potential risk:
    If a checkpoint captures lastProcessedEpoch after the forward but before the real sink-side DDL finishes, recovery can replay the same event and take the duplicate branch (epoch <= lastProcessedEpoch) instead of reapplying the schema change. That can release post-DDL rows while the sink table is still on the old schema. This is a blocking recovery / correctness issue.
  • Best improvement suggestion:
    Move the "already processed" state transition behind the real sink-side completion boundary. In practice, lastProcessedEpoch should only advance after the operator can prove the schema change completed successfully, or it should be reconstructed from an ACK-backed state source rather than from the pre-ACK forward step.
  • Severity: High

2. Code quality evaluation

2.1 Code style

The SQL Server resolver itself looks clearer on the latest head, and the shared Flink code is readable. The blocker is semantic rather than stylistic.

2.2 Test coverage and stability

The added SQL Server tests cover the happy path well, and I did not see a new flaky-test pattern in the newly added assertions. Stability rating: Stable.

Missing coverage that matters for this blocker:

  • recovery after BroadcastSchemaSinkOperator has checkpointed the epoch but before FlinkSinkWriter.applySchemaChange(...) completes;
  • replay of the same epoch after partial progress.

2.3 Documentation

No extra doc blocker from this round. The remaining issue is in the shared runtime semantics, not the SQL Server-specific explanation.

3. Architecture

3.1 Solution elegance

The direction is still a precise long-term fix, but the recovery boundary is not fully closed yet.

3.2 Maintainability

The current split between "forwarded" and "actually applied" is understandable, but the processed-epoch state still sits on the wrong side of that boundary.

3.3 Extensibility

This matters beyond SQL Server. The same shared Flink schema-evolution layer will affect every connector that reuses it.

3.4 Historical compatibility

Runtime compatibility across failover is still the concern here. A recovered job can misclassify an unfinished schema change as completed.

4. Issue summary

No. Issue Location Severity
Issue 1 lastProcessedEpoch advances before the real sink-side schema change completes seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/BroadcastSchemaSinkOperator.java:169-176 High

5. Merge conclusion

Conclusion: can merge after fixes

  1. Blocking items
  • Issue 1: close the recovery gap so replay cannot ACK an unfinished schema change as already processed.
  1. Suggested but non-blocking follow-ups
  • Add one focused recovery test that covers "epoch checkpointed in broadcast operator, DDL not yet finished in sink writer, then restore".

Overall, I still like the SQL Server-specific direction and the rename-path fixes from the previous round. But with the current shared Flink state transition, I do not think this latest head is safe to merge yet.

@github-actions github-actions Bot removed the reviewed label May 18, 2026
@CloverDew
Copy link
Copy Markdown
Contributor Author

Hello, the issue you reviewed is from another pull request (#10648). The reason I kept these commits in SQL Server CDC is that without the fixes in 10648, the results of testWithSchemaEvolution in SQL Server CDCIT running on Flink would be unstable. Therefore, I will wait for 10648 to be merged before rebasing, preserving the concise implementation of SQL Server CDC. If the issue you pointed out does exist, I will wait until both pull requests are merged before creating a new pull request to address it. Thank you.

@DanielLeens
Copy link
Copy Markdown
Contributor

Thanks for clarifying this.

I agree that the shared Flink schema-evolution changes are also being worked in #10648, and I should have said that dependency more explicitly.

The reason I still treated it as a blocker on this PR is that the current head of #10890 still carries those shared-layer changes in the actual diff and runtime path. On the latest head here, BroadcastSchemaSinkOperator still advances lastProcessedEpoch before the sink-side applySchemaChange(...) completion boundary, so from a merge-decision perspective I still cannot clear that path yet.

If your plan is to drop that carried shared change once #10648 lands, or to rework it directly in this PR, please push that as a new commit and I will gladly re-review the latest head.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants