A production-ready Change Data Capture (CDC) pipeline that streams data from MariaDB to S3-compatible object storage using Apache Flink and Apache Hudi. This project demonstrates how to build a data lake with ACID transactions, supporting both batch and streaming workloads.
This pipeline implements a complete CDC solution:
- Source: MariaDB with binary logging enabled for Change Data Capture
- Processing: Apache Flink processes CDC events in real-time
- Storage Format: Apache Hudi provides ACID transactions and efficient upserts
- Object Storage: MinIO (S3-compatible) for local development and testing
| Component | Version | Purpose |
|---|---|---|
| Apache Flink | 1.19.1 | Stream processing engine |
| Apache Hudi | 1.0.2 | Data lake table format with ACID support |
| MySQL CDC Connector | 3.2.0 | Debezium-based CDC connector for MariaDB/MySQL |
| JDBC Connector | 3.2.0-1.19 | Flink JDBC connector |
| MySQL JDBC Driver | 8.0.33 | Database connectivity |
| Hadoop MapReduce Core | 3.3.6 | Parquet file writing support |
| Flink S3 Filesystem | 1.19.1 | S3A filesystem integration |
| MariaDB | 10.6.14 | Source database |
| MinIO | latest | S3-compatible object storage |
The Flink cluster includes the following JARs (automatically downloaded during Docker build):
hudi-flink1.19-bundle-1.0.2.jar(57 MB)flink-sql-connector-mysql-cdc-3.2.0.jar(21 MB)flink-connector-jdbc-3.2.0-1.19.jar(379 KB)flink-s3-fs-hadoop-1.19.1.jar(31 MB)mysql-connector-j-8.0.33.jar(2.4 MB)hadoop-mapreduce-client-core-3.3.6.jar(1.7 MB)
docker compose up -dThis starts all required services:
- MinIO on ports 9000 (API) and 9001 (Console)
- MariaDB on port 3306 with sample data pre-loaded
- Flink JobManager on port 8081
- Two Flink TaskManager instances for parallel processing
Wait approximately 30 seconds for all services to be healthy, then submit the CDC pipeline:
docker exec jobmanager /opt/flink/bin/sql-client.sh embedded -f /opt/flink/job.sqlYou should see output confirming the job submission:
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: <job-id>
Flink Web UI: http://localhost:8081
- View job status, task metrics, and execution plan
- Monitor checkpointing and backpressure
- Access job logs and exceptions
MinIO Console: http://localhost:9001
- Login: minioadmin / minioadmin
- Browse the
hudi-bucketto see CDC data - View Hudi table metadata and Parquet files
docker exec apache_flink_and_hudi-mariadb-1 mysql -uroot -prootpassword \
-e "SELECT * FROM mydatabase.products"View the directory structure:
docker exec minio ls -lR /data/hudi-bucket/Or use the MinIO client:
docker exec minio mc ls --recursive myminio/hudi-bucket/docker exec apache_flink_and_hudi-mariadb-1 mysql -uroot -prootpassword \
-e "INSERT INTO mydatabase.products VALUES (5, 'Product E', 59.99)"The change will be captured and written to MinIO within seconds (default checkpoint interval: 60s).
The CDC pipeline creates the following structure in MinIO:
hudi-bucket/
└── my_products/
├── .hoodie/
│ ├── .aux/ # Auxiliary metadata
│ ├── .schema__XLDIR__/ # Schema information
│ ├── .temp__XLDIR__/ # Temporary files during writes
│ ├── hoodie.properties/ # Table configuration
│ └── timeline/ # Commit timeline (ACID transactions)
│ ├── 20251020210907535.commit # Completed commits
│ ├── 20251020210907535.inflight # In-progress writes
│ └── ...
└── <partition>/
└── <file-id>_<write-token>_<timestamp>.parquet
└── <erasure-coding-id>/
└── part.1 # Actual Parquet data file
- Table Type: COPY_ON_WRITE (optimized for read performance)
- File Format: Apache Parquet (columnar storage)
- Commit Timeline:
.hoodie/timeline/contains commit metadata for time-travel queries - Schema Evolution: Managed in
.hoodie/.schema__XLDIR__/ - ACID Guarantees: Hudi ensures atomic commits visible in the timeline
The CDC pipeline is configured in jobs/job.sql:
-- Hudi sink table with S3/MinIO configuration
CREATE TABLE my_products (
id INT PRIMARY KEY NOT ENFORCED,
name VARCHAR,
price DECIMAL(10, 2)
) WITH (
'connector' = 'hudi',
'path' = 's3a://hudi-bucket/my_products/',
'table.type' = 'COPY_ON_WRITE',
'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.InProcessLockProvider',
'write.metadata.enabled' = 'false',
...
);
-- MySQL CDC source
create temporary table products (
id INT,
name VARCHAR,
price DECIMAL(10, 2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'scan.startup.mode' = 'initial', # Read existing data then stream changes
'hostname' = 'mariadb',
'database-name' = 'mydatabase',
'table-name' = 'products'
);The MariaDB instance is pre-configured for CDC:
- Binary logging:
ON - Binary log format:
ROW(required for CDC) - Configuration file:
sql/mariadb.cnf
MinIO is configured for S3-compatible access:
- Endpoint: http://minio:9000
- Access Key: minioadmin
- Secret Key: minioadmin
- Path-style access: enabled
.
├── Dockerfile # Flink image with all required JARs
├── docker-compose.yml # Service orchestration
├── conf/
│ └── core-site.xml # Hadoop S3A filesystem configuration
├── jobs/
│ └── job.sql # Flink SQL CDC pipeline definition
├── sql/
│ ├── init.sql # MariaDB initial schema and data
│ └── mariadb.cnf # MariaDB binary log configuration
└── README.md
curl -s http://localhost:8081/jobs | python3 -m json.tooldocker logs jobmanager
docker logs apache_flink_and_hudi-taskmanager-1docker exec apache_flink_and_hudi-mariadb-1 mysql -uroot -prootpassword \
-e "SHOW VARIABLES LIKE 'log_bin%'"docker exec minio mc alias set test http://localhost:9000 minioadmin minioadmin
docker exec minio mc ls test/hudi-bucketStop all services and remove volumes:
docker compose down -vThis removes all containers, networks, and data volumes.
This CDC pipeline pattern is suitable for:
- Data Lake Ingestion: Stream operational data to data lakes for analytics
- Real-time Reporting: Make transactional data available for BI tools with low latency
- Event-driven Architectures: Trigger downstream processes based on database changes
- Data Replication: Maintain synchronized copies across environments
- Audit and Compliance: Capture all data changes with ACID guarantees
- Checkpoint Interval: Set to 60 seconds (configurable in
job.sql) - Parallelism: 2 TaskManager instances with configurable task slots
- Hudi Compaction: Automatic compaction disabled for simplicity (enable for production)
- S3 Writes: Batched based on checkpoint interval
This project is provided as-is for educational and development purposes.