A lightweight Flink SQL job runner designed for submitting SQL-based streaming jobs to Kubernetes via flink-k8s-operator in application mode. It reads SQL files and executes them using the Flink Table API, supporting SET, RESET, BEGIN STATEMENT SET / END syntax, and all Flink SQL connectors.
- SQL File Execution — Read a
.sqlfile and execute all statements sequentially - Full Flink SQL Syntax — Supports
SET/RESETconfig, DDL, DML, andBEGIN STATEMENT SET/ENDblocks - Application Mode Deployment — Each SQL job runs as an independent Flink cluster via
FlinkDeploymentCRD - Built-in Connectors — MySQL CDC, Kafka, StarRocks included in the shaded JAR
- ConfigMap Integration — SQL files are mounted from Kubernetes ConfigMaps
- Extensible — Add any Flink connector by editing
pom.xml, rebuild and redeploy
| Connector | Version | Notes |
|---|---|---|
| Flink SQL MySQL CDC | 3.6.0-1.20 | For Flink 1.20 |
| MySQL Connector Java | 8.0.33 | JDBC driver for CDC |
| Flink Connector Kafka | 3.4.0-1.20 | For Flink 1.20 |
| StarRocks Connector | 1.2.14_flink-1.20 | Stream Load sink |
| Flink JSON | 1.20.0 | Kafka format support |
To add more connectors (e.g., Elasticsearch, Iceberg, Hudi, Doris), simply add the corresponding dependency to
pom.xmland rebuild.
┌──────────────────────────────────────────────────┐
│ flink-k8s-operator │
│ │
│ FlinkDeployment (application mode) │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────┐ │
│ │ Dedicated Flink Cluster (1 JobManager │ │
│ │ + N TaskManagers) — lifecycle per job │ │
│ │ │ │
│ │ ┌──────────────────────────────────────┐ │ │
│ │ │ flink-sql-k8s-runner.jar │ │ │
│ │ │ → reads SQL from ConfigMap volume │ │ │
│ │ │ → executes via Table API │ │ │
│ │ └──────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────┘
In application mode, each FlinkDeployment creates a dedicated Flink cluster whose lifecycle is bound to the SQL job — when the job finishes or fails, the cluster is managed by the operator accordingly.
mvn clean package -DskipTestsThe shaded JAR will be at target/flink-sql-k8s-runner-1.0.jar.
docker build -t flink-sql-k8s-runner:latest -f docker/Dockerfile .kubectl apply -f yaml/flink-pvc.yaml
kubectl apply -f yaml/flink-sql-config.yaml
kubectl apply -f yaml/flink-deployment.yamlPlace your SQL file in sql/ directory. Example SQL files are provided in sql/examples/:
| File | Description |
|---|---|
mysql-cdc-to-kafka.sql |
MySQL CDC source → Kafka sink |
kafka-to-starrocks.sql |
Kafka source → StarRocks sink (with BEGIN STATEMENT SET) |
Replace <PLACEHOLDER> values with your actual configuration before deploying.
| Syntax | Behavior |
|---|---|
SET 'key' = 'value' |
Set Flink configuration |
RESET 'key' |
Reset configuration to default |
CREATE TABLE ... |
DDL — register source/sink tables |
INSERT INTO ... |
DML — execute and await completion |
BEGIN STATEMENT SET; ... END; |
Group multiple INSERTs into one execution |
flink-sql-k8s-runner/
├── docker/Dockerfile # Docker image definition
├── pom.xml # Maven build with shade plugin
├── sql/examples/ # Example SQL files
│ ├── mysql-cdc-to-kafka.sql
│ └── kafka-to-starrocks.sql
├── src/main/java/.../SqlK8sRunner.java # Main entry point
└── yaml/ # Kubernetes manifests
├── flink-deployment.yaml # FlinkDeployment (application mode)
├── flink-pvc.yaml # PVC for checkpoints/savepoints
└── flink-sql-config.yaml # ConfigMap for SQL files
Flink configs are managed via SET statements in SQL files. Key configs:
| Config | Description | Default |
|---|---|---|
state.checkpoints.dir |
Checkpoint storage path | file:///opt/flink/checkpoints |
execution.checkpointing.interval |
Checkpoint interval | 30s |
execution.checkpointing.mode |
Checkpoint mode | EXACTLY_ONCE |
state.backend.type |
State backend | hashmap |
- Flink 1.20+
- Kubernetes with flink-kubernetes-operator 1.10.0+ installed (1.14.0 recommended)
- Java 11+
- Maven 3.6+
Apache License 2.0
一个轻量级的 Flink SQL 作业运行器,专为通过 flink-k8s-operator 以 application 模式 向 Kubernetes 提交 SQL 流式作业而设计。它读取 SQL 文件并通过 Flink Table API 执行,支持 SET、RESET、BEGIN STATEMENT SET / END 语法以及所有 Flink SQL 连接器。
- SQL 文件执行 — 读取
.sql文件并按顺序执行所有语句 - 完整 Flink SQL 语法 — 支持
SET/RESET配置、DDL、DML 及BEGIN STATEMENT SET/END块 - Application 模式部署 — 每个 SQL 作业通过
FlinkDeploymentCRD 运行为独立的 Flink 集群 - 内置连接器 — MySQL CDC、Kafka、StarRocks 已包含在 shaded JAR 中
- ConfigMap 集成 — SQL 文件通过 Kubernetes ConfigMap 挂载
- 可扩展 — 修改
pom.xml即可添加任意 Flink 连接器,重新构建后部署
| 连接器 | 版本 | 说明 |
|---|---|---|
| Flink SQL MySQL CDC | 3.6.0-1.20 | 适配 Flink 1.20 |
| MySQL Connector Java | 8.0.33 | CDC JDBC 驱动 |
| Flink Connector Kafka | 3.4.0-1.20 | 适配 Flink 1.20 |
| StarRocks Connector | 1.2.14_flink-1.20 | Stream Load 写入 |
| Flink JSON | 1.20.0 | Kafka 格式支持 |
如需添加更多连接器(如 Elasticsearch、Iceberg、Hudi、Doris),只需在
pom.xml中添加对应依赖并重新构建即可。
┌──────────────────────────────────────────────────┐
│ flink-k8s-operator │
│ │
│ FlinkDeployment (application 模式) │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────┐ │
│ │ 专属 Flink 集群 (1 JobManager │ │
│ │ + N TaskManagers) — 集群生命周期跟随作业 │ │
│ │ │ │
│ │ ┌──────────────────────────────────────┐ │ │
│ │ │ flink-sql-k8s-runner.jar │ │ │
│ │ │ → 从 ConfigMap volume 读取 SQL │ │ │
│ │ │ → 通过 Table API 执行 │ │ │
│ │ └──────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────┘
在 application 模式下,每个 FlinkDeployment 创建一个专属的 Flink 集群,集群生命周期与 SQL 作业绑定 — 作业完成或失败时,operator 相应地管理集群。
mvn clean package -DskipTestsShaded JAR 位于 target/flink-sql-k8s-runner-1.0.jar。
docker build -t flink-sql-k8s-runner:latest -f docker/Dockerfile .kubectl apply -f yaml/flink-pvc.yaml
kubectl apply -f yaml/flink-sql-config.yaml
kubectl apply -f yaml/flink-deployment.yaml将 SQL 文件放在 sql/ 目录下。示例 SQL 文件位于 sql/examples/:
| 文件 | 说明 |
|---|---|
mysql-cdc-to-kafka.sql |
MySQL CDC 源 → Kafka 目标 |
kafka-to-starrocks.sql |
Kafka 源 → StarRocks 目标(含 BEGIN STATEMENT SET) |
部署前请将 <PLACEHOLDER> 占位符替换为实际配置值。
| 语法 | 行为 |
|---|---|
SET 'key' = 'value' |
设置 Flink 配置项 |
RESET 'key' |
重置配置项为默认值 |
CREATE TABLE ... |
DDL — 注册源/目标表 |
INSERT INTO ... |
DML — 执行并等待完成 |
BEGIN STATEMENT SET; ... END; |
将多个 INSERT 组合为一次执行 |
flink-sql-k8s-runner/
├── docker/Dockerfile # Docker 镜像定义
├── pom.xml # Maven 构建配置(含 shade 插件)
├── sql/examples/ # 示例 SQL 文件
│ ├── mysql-cdc-to-kafka.sql
│ └── kafka-to-starrocks.sql
├── src/main/java/.../SqlK8sRunner.java # 主入口
└── yaml/ # Kubernetes 资源清单
├── flink-deployment.yaml # FlinkDeployment(application 模式)
├── flink-pvc.yaml # PVC(checkpoint/savepoint 存储)
└── flink-sql-config.yaml # ConfigMap(SQL 文件挂载)
Flink 配置通过 SQL 文件中的 SET 语句管理。关键配置:
| 配置项 | 说明 | 默认值 |
|---|---|---|
state.checkpoints.dir |
Checkpoint 存储路径 | file:///opt/flink/checkpoints |
execution.checkpointing.interval |
Checkpoint 间隔 | 30s |
execution.checkpointing.mode |
Checkpoint 模式 | EXACTLY_ONCE |
state.backend.type |
状态后端 | hashmap |
- Flink 1.20+
- Kubernetes 集群(已安装 flink-kubernetes-operator 1.10.0+,推荐 1.14.0)
- Java 11+
- Maven 3.6+
Apache License 2.0