Skip to content

Conversation

@jiangxinmeng1
Copy link
Contributor

@jiangxinmeng1 jiangxinmeng1 commented Nov 3, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #22718

What this PR does / why we need it:

This PR implements a robust retry mechanism for Change Data Capture (CDC) operations with improved error handling and thread-safety.
Key Changes:
Structured Retry Error Handling
Added ParseRetryableError() and GenerateRetryableError() functions
Implemented error format: retryable error::<retry_count>:<error_message>
Thread-Safe DbTableInfo
Converted public fields to private with mutex protection and getter/setter methods
Added retry tracking fields: retryStartTS, retryTimes
Enhanced Reader/Sinker Components
Updated table reader to track and persist retry attempts with timestamps
Changed watermark updater to interface type for better testability
Fixed concurrent access issues in table scanner by removing global handling flag
Testing and Code Cleanup
Added comprehensive CDC test suite (cdc_test.go, 646 lines)


PR Type

Bug fix, Enhancement, Tests


Description

  • Implement CDC retry mechanism with transaction rollback on connection errors

    • Track retry attempts with timestamp and count in error messages
    • Rollback transactions when ErrConnDone is encountered
    • Add ParseRetryableError and GenerateRetryableError utility functions
  • Refactor watermark updater to use interface instead of concrete type

    • Change CDCWatermarkUpdater to WatermarkUpdater interface across codebase
    • Improve testability and decoupling
  • Add comprehensive CDC retry test suite with mock components

    • Test retry from zero, rollback, table truncation, and CDC restart scenarios
    • Implement MockWatermarkUpdater and MockEngineSink for testing
  • Improve retry logic with configurable durations and max retry limits

    • Add separate retry durations for reader and sinker operations
    • Implement max retry times limit for reader operations

Diagram Walkthrough

flowchart LR
  A["Error Detection"] -->|"ErrConnDone"| B["Rollback Transaction"]
  B --> C["Generate Retry Error"]
  C --> D["Track Retry State"]
  D --> E["Retry with Backoff"]
  E -->|"Success"| F["Update Watermark"]
  E -->|"Max Retries"| G["Mark as Failed"]
  H["WatermarkUpdater Interface"] -->|"Replaces"| I["CDCWatermarkUpdater"]
  J["Mock Components"] -->|"Enable"| K["Unit Tests"]
Loading

File Walkthrough

Relevant files
Enhancement
5 files
reader.go
Implement retry tracking with timestamp and count               
+11/-5   
types.go
Add retry configuration constants and fields                         
+6/-1     
util.go
Add retry error parsing and generation functions                 
+39/-0   
watermark_updater.go
Convert to interface-based design pattern                               
+2/-2     
cdc_exector.go
Refactor to use WatermarkUpdater interface and add setter methods
+55/-21 
Bug fix
1 files
sinker.go
Add connection error detection and retry logic                     
+30/-12 
Tests
5 files
util_test.go
Add comprehensive tests for retry error parsing                   
+83/-0   
cdc_test.go
Update test signatures for new retry error format               
+3/-3     
cdc_test.go
Add comprehensive CDC retry scenario tests                             
+656/-0 
cdc_testutil.go
Implement mock sink and watermark updater for testing       
+525/-11
change_handle_test.go
Update test calls for new retry error return values           
+6/-6     

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/bug Something isn't working Review effort 4/5 size/XXL Denotes a PR that changes 2000+ lines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants