-
Notifications
You must be signed in to change notification settings - Fork 2.1k
feat: Add GaussDB CDC connector with critical bug fixes #4195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
liumingjian
wants to merge
12
commits into
apache:master
from
liumingjian:feature/gaussdb-cdc-connector
Closed
feat: Add GaussDB CDC connector with critical bug fixes #4195
liumingjian
wants to merge
12
commits into
apache:master
from
liumingjian:feature/gaussdb-cdc-connector
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## Summary Add complete GaussDB CDC connector implementation with support for snapshot and streaming modes using GaussDB's mppdb_decoding logical replication plugin. ## Key Features - Full snapshot support for initial data capture - Streaming CDC using GaussDB logical replication - Support for all common GaussDB data types - Configurable connection pooling and retry mechanisms - Comprehensive test suite ## Critical Bug Fixes ### 1. Default Value Converter Issue - **Problem**: GaussDB returns function calls (e.g., `pg_systimestamp()`, `CURRENT_TIMESTAMP`) as default values, causing Debezium to fail when trying to use them as actual values - **Solution**: Created `GaussDBDefaultValueConverter` that properly handles function-based defaults by returning `Optional.empty()` for them - **Files**: - `io/debezium/connector/gaussdb/GaussDBDefaultValueConverter.java` (new) - `io/debezium/connector/gaussdb/GaussDBSchema.java` (modified) ### 2. Missing Source Info Fields - **Problem**: Debezium envelope requires multiple fields (version, connector, name, snapshot) in the source struct, but they were not being set, causing validation errors - **Solution**: Added all required source info fields to both snapshot and streaming source struct builders - **Files**: - `GaussDBScanFetchTask.java:294-339` (snapshot) - `GaussDBStreamFetchTask.java:495-551` (streaming) ## Implementation Details ### Core Components - **GaussDBSource**: Main source implementation - **GaussDBDialect**: Dialect for GaussDB-specific SQL and behavior - **GaussDBConnection**: Connection management with retry logic - **GaussDBReplicationConnection**: Logical replication connection handling - **GaussDBScanFetchTask**: Snapshot data reading - **GaussDBStreamFetchTask**: Streaming CDC data reading ### Configuration - Hostname, port, database, username, password - Plugin name (mppdb_decoding) - Slot name for logical replication - Connection timeout and retry settings - Snapshot fetch size ### Testing - Unit tests for all major components - Integration tests for snapshot and streaming modes - Data type compatibility tests - Boundary condition tests ## Verified Configuration - ✅ GaussDB `wal_level = logical` (required for CDC) - ✅ `mppdb_decoding` plugin available and functional - ✅ Replication slot creation and management working ## Known Issues - Integration test timeout issue under investigation (fetch task execution) - Requires further debugging of Flink job initialization flow ## Dependencies - GaussDB JDBC driver (included in lib/) - Debezium PostgreSQL connector (for replication protocol) - Flink CDC base framework 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
- Add SERIAL/BIGSERIAL support in GaussDBTypeUtils - Bundle PostgreSQL driver in flink-sql-connector-gaussdb-cdc to avoid NoClassDefFoundError - Implement GaussDbE2eITCase with robust replication slot cleanup - Improve replication connection stability with KeepAlive and socket timeout settings - Add diagnostic logging for GaussDB replication stream
…eption - Add getSlotNameForBackfillTask() to GaussDBSourceConfig returning slotName_backfill - Add backfillReplicationConnection field to GaussDBSourceFetchTaskContext - Add isBoundedRead() method to detect bounded reads (SnapshotSplit or StreamSplit with endOffset) - Modify createReplicationStream() to use backfill connection for bounded reads - Add endingPos field and reachEnd() check for bounded stream reads - Add 10ms stabilization delay after stream.start() This fixes the replication slot contention issue where GaussDB would fail with EOFException when trying to restart the replication stream during backfill phase. Tested E2E: - Snapshot: 2 records captured successfully - INSERT: id=103 captured and synced to MySQL - UPDATE: id=101 value change captured and synced
… support - Resolved TableId catalog mismatch in GaussDBSourceFetchTaskContext - Implemented robust polling mechanism in E2E tests for reliable verification - Added native UTF-8 encoding support for Chinese character synchronization - Enhanced documentation including READMEs and setup guides - Cleaned up duplicate JAR conflicts in deployment process
- 新增分布式GaussDB CDC测试脚本 (test_gaussdb_distributed_cdc.sh) - 新增分布式同步验证脚本 (check_distributed_sync_result.sh) - 新增分布式Flink SQL配置 (gaussdb_distributed_to_mysql.sql) - 优化state TTL和upsert-materialize配置以支持DELETE同步 - 添加详细的中文端到端测试手册 (GAUSSDB_CDC_E2E_TEST_MANUAL.md) - 支持集中式和分布式两种场景的完整验证流程
- Implement GaussDBJdbcDialect with MERGE INTO syntax for upsert operations - Add GaussDBJdbcDialectFactory for SPI discovery with jdbc:gaussdb:// URL - Add deployment script deploy_gaussdb_to_gaussdb.sh with dialect injection - Add test scripts for GaussDB-to-GaussDB CDC testing - Add Flink SQL configuration for distributed GaussDB to GaussDB sync - Update GAUSSDB_CDC_E2E_TEST_MANUAL.md with GaussDB Sink documentation - Add flink-connector-jdbc dependency to pom.xml for dialect compilation
- Add JDBC sink batch write settings (5000 rows, 2s interval) - Increase source fetch.size to 10000 and chunk.size to 50000 - Set Flink parallelism.default to 4 - Add pipeline.object-reuse for memory optimization - Increase checkpoint interval to 5 minutes - Upgrade TaskManager: 4GB memory, 6 slots, 0.4 managed fraction
测试脚本改进: - test_gaussdb_to_gaussdb_cdc.sh: 添加Step 0重置Flink集群,使用docker-compose down/up确保干净环境 - run_gaussdb_to_gaussdb_test.sh: 添加wait_for_cdc_stream_ready()函数等待CDC slots激活后再插入测试数据 - run_gaussdb_to_gaussdb_test.sh: 修改init_test_env()不再DROP/RECREATE表,改为检查表是否存在 部署脚本改进: - deploy_gaussdb_to_gaussdb.sh: 清理所有DN上的旧replication slots - deploy_gaussdb_to_gaussdb.sh: 添加分布式source表创建逻辑 - deploy_gaussdb_to_gaussdb.sh: 插入种子数据确保CDC快照阶段有数据,正确进入stream阶段 关键修复: - 解决空表导致CDC跳过stream阶段的问题 - 解决DN1 CDC slot未激活的问题 - 排除旧Flink状态和残留slots对测试的干扰 测试结果: INSERT/UPDATE/DELETE三项测试全部通过
- Implemented unique backfill slot naming using split ID hashes to prevent parallel split conflicts. - Disabled dropSlotOnClose for backfill connections to handle transient reconnection robustly. - Added explicit backfill slot cleanup in GaussDBScanFetchTask using try-finally. - Integrated is_snapshot metadata field for differentiated record routing. - Improved performance test script with 100k volume verification and data integrity checks. - Verified 100k snapshot at ~6.1k records/second with 100% data integrity.
|
您项目里边有两个分支,一个cdc,一个cdc-patch,哪个可用呢?并且您的GaussDB是openGauss还是商业版的GaussDB |
|
能提供下您的联系方式么?我们私下交流下 |
Author
15521328037 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
This PR adds a complete GaussDB CDC connector implementation with support for snapshot and streaming modes using GaussDB's mppdb_decoding logical replication plugin.
Key Features
Critical Bug Fixes
1. Default Value Converter Issue
pg_systimestamp(),CURRENT_TIMESTAMP) as default values, causing Debezium to fail when trying to use them as actual valuesGaussDBDefaultValueConverterthat properly handles function-based defaults by returningOptional.empty()for them2. Missing Source Info Fields
Implementation Details
Core Components
Configuration Options
Testing
Verified Configuration
wal_level = logical(required for CDC)mppdb_decodingplugin available and functionalKnown Issues
testReadSingleTableAllRecordstimes out (fetch task execution issue under investigation)Test Plan
Dependencies
Documentation
Checklist
🤖 Generated with Claude Code
Co-Authored-By: Claude Sonnet 4.5 noreply@anthropic.com