Skip to content

zwz9599/flink-sql-k8s-runner

Repository files navigation

flink-sql-k8s-runner

English | 中文


English

A lightweight Flink SQL job runner designed for submitting SQL-based streaming jobs to Kubernetes via flink-k8s-operator in application mode. It reads SQL files and executes them using the Flink Table API, supporting SET, RESET, BEGIN STATEMENT SET / END syntax, and all Flink SQL connectors.

Features

  • SQL File Execution — Read a .sql file and execute all statements sequentially
  • Full Flink SQL Syntax — Supports SET/RESET config, DDL, DML, and BEGIN STATEMENT SET/END blocks
  • Application Mode Deployment — Each SQL job runs as an independent Flink cluster via FlinkDeployment CRD
  • Built-in Connectors — MySQL CDC, Kafka, StarRocks included in the shaded JAR
  • ConfigMap Integration — SQL files are mounted from Kubernetes ConfigMaps
  • Extensible — Add any Flink connector by editing pom.xml, rebuild and redeploy

Built-in Connector Versions

Connector Version Notes
Flink SQL MySQL CDC 3.6.0-1.20 For Flink 1.20
MySQL Connector Java 8.0.33 JDBC driver for CDC
Flink Connector Kafka 3.4.0-1.20 For Flink 1.20
StarRocks Connector 1.2.14_flink-1.20 Stream Load sink
Flink JSON 1.20.0 Kafka format support

To add more connectors (e.g., Elasticsearch, Iceberg, Hudi, Doris), simply add the corresponding dependency to pom.xml and rebuild.

Architecture

┌──────────────────────────────────────────────────┐
│  flink-k8s-operator                              │
│                                                  │
│  FlinkDeployment (application mode)              │
│       │                                          │
│       ▼                                          │
│  ┌────────────────────────────────────────────┐ │
│  │  Dedicated Flink Cluster (1 JobManager     │ │
│  │  + N TaskManagers) — lifecycle per job     │ │
│  │                                            │ │
│  │  ┌──────────────────────────────────────┐ │ │
│  │  │  flink-sql-k8s-runner.jar            │ │ │
│  │  │  → reads SQL from ConfigMap volume   │ │ │
│  │  │  → executes via Table API            │ │ │
│  │  └──────────────────────────────────────┘ │ │
│  └────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────┘

In application mode, each FlinkDeployment creates a dedicated Flink cluster whose lifecycle is bound to the SQL job — when the job finishes or fails, the cluster is managed by the operator accordingly.

Quick Start

1. Build the JAR

mvn clean package -DskipTests

The shaded JAR will be at target/flink-sql-k8s-runner-1.0.jar.

2. Build the Docker Image

docker build -t flink-sql-k8s-runner:latest -f docker/Dockerfile .

3. Deploy to Kubernetes

kubectl apply -f yaml/flink-pvc.yaml
kubectl apply -f yaml/flink-sql-config.yaml
kubectl apply -f yaml/flink-deployment.yaml

4. Write Your SQL

Place your SQL file in sql/ directory. Example SQL files are provided in sql/examples/:

File Description
mysql-cdc-to-kafka.sql MySQL CDC source → Kafka sink
kafka-to-starrocks.sql Kafka source → StarRocks sink (with BEGIN STATEMENT SET)

Replace <PLACEHOLDER> values with your actual configuration before deploying.

SQL Syntax Support

Syntax Behavior
SET 'key' = 'value' Set Flink configuration
RESET 'key' Reset configuration to default
CREATE TABLE ... DDL — register source/sink tables
INSERT INTO ... DML — execute and await completion
BEGIN STATEMENT SET; ... END; Group multiple INSERTs into one execution

Project Structure

flink-sql-k8s-runner/
├── docker/Dockerfile              # Docker image definition
├── pom.xml                        # Maven build with shade plugin
├── sql/examples/                  # Example SQL files
│   ├── mysql-cdc-to-kafka.sql
│   └── kafka-to-starrocks.sql
├── src/main/java/.../SqlK8sRunner.java  # Main entry point
└── yaml/                          # Kubernetes manifests
    ├── flink-deployment.yaml       # FlinkDeployment (application mode)
    ├── flink-pvc.yaml              # PVC for checkpoints/savepoints
    └── flink-sql-config.yaml       # ConfigMap for SQL files

Configuration

Flink configs are managed via SET statements in SQL files. Key configs:

Config Description Default
state.checkpoints.dir Checkpoint storage path file:///opt/flink/checkpoints
execution.checkpointing.interval Checkpoint interval 30s
execution.checkpointing.mode Checkpoint mode EXACTLY_ONCE
state.backend.type State backend hashmap

Requirements

  • Flink 1.20+
  • Kubernetes with flink-kubernetes-operator 1.10.0+ installed (1.14.0 recommended)
  • Java 11+
  • Maven 3.6+

License

Apache License 2.0


中文

一个轻量级的 Flink SQL 作业运行器,专为通过 flink-k8s-operatorapplication 模式 向 Kubernetes 提交 SQL 流式作业而设计。它读取 SQL 文件并通过 Flink Table API 执行,支持 SETRESETBEGIN STATEMENT SET / END 语法以及所有 Flink SQL 连接器。

特性

  • SQL 文件执行 — 读取 .sql 文件并按顺序执行所有语句
  • 完整 Flink SQL 语法 — 支持 SET/RESET 配置、DDL、DML 及 BEGIN STATEMENT SET/END
  • Application 模式部署 — 每个 SQL 作业通过 FlinkDeployment CRD 运行为独立的 Flink 集群
  • 内置连接器 — MySQL CDC、Kafka、StarRocks 已包含在 shaded JAR 中
  • ConfigMap 集成 — SQL 文件通过 Kubernetes ConfigMap 挂载
  • 可扩展 — 修改 pom.xml 即可添加任意 Flink 连接器,重新构建后部署

内置连接器版本

连接器 版本 说明
Flink SQL MySQL CDC 3.6.0-1.20 适配 Flink 1.20
MySQL Connector Java 8.0.33 CDC JDBC 驱动
Flink Connector Kafka 3.4.0-1.20 适配 Flink 1.20
StarRocks Connector 1.2.14_flink-1.20 Stream Load 写入
Flink JSON 1.20.0 Kafka 格式支持

如需添加更多连接器(如 Elasticsearch、Iceberg、Hudi、Doris),只需在 pom.xml 中添加对应依赖并重新构建即可。

架构图

┌──────────────────────────────────────────────────┐
│  flink-k8s-operator                              │
│                                                  │
│  FlinkDeployment (application 模式)              │
│       │                                          │
│       ▼                                          │
│  ┌────────────────────────────────────────────┐ │
│  │  专属 Flink 集群 (1 JobManager             │ │
│  │  + N TaskManagers) — 集群生命周期跟随作业  │ │
│  │                                            │ │
│  │  ┌──────────────────────────────────────┐ │ │
│  │  │  flink-sql-k8s-runner.jar            │ │ │
│  │  │  → 从 ConfigMap volume 读取 SQL      │ │ │
│  │  │  → 通过 Table API 执行               │ │ │
│  │  └──────────────────────────────────────┘ │ │
│  └────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────┘

在 application 模式下,每个 FlinkDeployment 创建一个专属的 Flink 集群,集群生命周期与 SQL 作业绑定 — 作业完成或失败时,operator 相应地管理集群。

快速开始

1. 构建 JAR

mvn clean package -DskipTests

Shaded JAR 位于 target/flink-sql-k8s-runner-1.0.jar

2. 构建 Docker 镜像

docker build -t flink-sql-k8s-runner:latest -f docker/Dockerfile .

3. 部署到 Kubernetes

kubectl apply -f yaml/flink-pvc.yaml
kubectl apply -f yaml/flink-sql-config.yaml
kubectl apply -f yaml/flink-deployment.yaml

4. 编写 SQL

将 SQL 文件放在 sql/ 目录下。示例 SQL 文件位于 sql/examples/

文件 说明
mysql-cdc-to-kafka.sql MySQL CDC 源 → Kafka 目标
kafka-to-starrocks.sql Kafka 源 → StarRocks 目标(含 BEGIN STATEMENT SET

部署前请将 <PLACEHOLDER> 占位符替换为实际配置值。

支持的 SQL 语法

语法 行为
SET 'key' = 'value' 设置 Flink 配置项
RESET 'key' 重置配置项为默认值
CREATE TABLE ... DDL — 注册源/目标表
INSERT INTO ... DML — 执行并等待完成
BEGIN STATEMENT SET; ... END; 将多个 INSERT 组合为一次执行

项目结构

flink-sql-k8s-runner/
├── docker/Dockerfile              # Docker 镜像定义
├── pom.xml                        # Maven 构建配置(含 shade 插件)
├── sql/examples/                  # 示例 SQL 文件
│   ├── mysql-cdc-to-kafka.sql
│   └── kafka-to-starrocks.sql
├── src/main/java/.../SqlK8sRunner.java  # 主入口
└── yaml/                          # Kubernetes 资源清单
    ├── flink-deployment.yaml       # FlinkDeployment(application 模式)
    ├── flink-pvc.yaml              # PVC(checkpoint/savepoint 存储)
    └── flink-sql-config.yaml       # ConfigMap(SQL 文件挂载)

配置

Flink 配置通过 SQL 文件中的 SET 语句管理。关键配置:

配置项 说明 默认值
state.checkpoints.dir Checkpoint 存储路径 file:///opt/flink/checkpoints
execution.checkpointing.interval Checkpoint 间隔 30s
execution.checkpointing.mode Checkpoint 模式 EXACTLY_ONCE
state.backend.type 状态后端 hashmap

环境要求

  • Flink 1.20+
  • Kubernetes 集群(已安装 flink-kubernetes-operator 1.10.0+,推荐 1.14.0)
  • Java 11+
  • Maven 3.6+

许可证

Apache License 2.0

About

A lightweight Flink SQL job runner for Kubernetes — execute SQL files via Flink Table API in application mode

https://github.com/zwz9599/flink-sql-k8s-runner

Topics

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors