通过提供初始输入并传递处理后的输出以供下一阶段使用,从而允许在一系列阶段中进行数据处理。 Pipeline模式为管道模式,也称为流水线模式。 通过预先设定好的一系列的阶段来处理输入的数据,每个阶段的输出即是下一个阶段的输入。 模型图如下:
Flume:
从图中可以看出,整个流水线内数据流转是从上游到下游,上游的输出是下游的输入,按阶段依次执行。
- Source: 表示数据来源,比如:RcoketSource、 RabbitSource, KafkaSource, HttpSource等。
- Channel:表示对数据进行处理的组件,比如:JsonChannel,对数据进行json转换和处理。
- Sink:表示数据落地或下沉的地方,比如:KafkaSink,表示数据发送到指定的kafka;DbSInk表示数据落地到DB。
可以看出,Pipeline是由Source(必须有),Channel(不一定需要),Sink(必须有)三种类型的组件自由组合而成的。
后续考虑到,在某些特定场景下,数据可能并不一定会有有个特定的下沉,同时在每个channel中, 我们可以做的事情更多, 所以在实现的版本中,并没有要求一定需要在pipeline的最后需要接入一个sink
- 参考flume、netty(内部存在pipeline)等框架的底层实现及设计思路
- 灵活的DAG编排流程
- 快速开发框架,启动快速
- 业务、框架关注点分离
- 统一资源管理,采用池化思想
- 插件关注点完全分离,高度可拓展
- 提供框架级别的日志采集,完成Metric分析
- 内置告警机制(可通过dingtalk告警配置)
- 可通过系统框架级别日志,快速构建各种业务监控大盘,TPS, QPS, 吞吐等核心性能指标快速可视
- 欢迎fork或提出宝贵的修改经验
- 多元的数据接入需求
- 复杂的算法调用过程,冗长的链路流程
- DAG编排流程
- 存在多种数据中间状态,多个数据处理分支
- 处理计算量大
- 数据分发
- 可覆盖较多链路场景
- 千万级别的数据量下验证
🧨🧨🧨🧨🧨🧨🧨🧨🧨 如果你的设计,需满足以上场景,那么欢迎你👏🏻使用它,fork && advice, 🌈🌈🌈🌈🌈🌈 🧨🧨🧨🧨🧨🧨🧨
可直接使用该工程开发,但是建议使用脚手架快速创建流程进行开发,这样能做到框架和业务分离,十分友好。
<module>pipeline-common</module><!--公共层-->
<module>pipeline-framework</module><!--框架层-->
<module>pipeline-plugin</module><!--插件层-->
<module>pipeline-resource</module><!--资源层-->
如果启动看到该信息,说明需要添加启动参数:
usage: pipeline [-c] [-h] [-r]
-c,--config config file path for program, required param
-h,--help help information for program pipeline app
-r,--res res file path for program, required param
-c --config 流程配置文件路径
-r --res 资源文件路径
启动参数:
-c="/Users/dick/workspace/mine/pipeline-mvp/pipeline-framework/src/main/resources/app.json" -r="/Users/dick/workspace/mine/pipeline-mvp/pipeline-framework/src/main/resources/resource.json"
VM options:
-Dlogging.config=logback.xml
资源文件格式,采用json。 框架resource资源路径下存在一个 app.json (样例代码)
具体实际应用场景,可通过 -c 命令参数指定其他流程定义文件
{
"app": {
"name": "pipe-first-demo",
"emergency": "dick"
},
"sls": {
"enable": true,
"loggerKeys": [
"count"
]
},
"monitor": {
"enable": "true",
"dingTalkUrl": "",
"dingTalkSecret": "",
"alertName": "pipeline告警"
},
"process": {
"sources": [
{
"name": "fake_source",
"className": "com.rany.ops.framework.core.source.FakeSource",
"config": {
"timeIntervalMs": 3000
},
"next": [
"fake_channel"
]
}
],
"channels": [
{
"name": "fake_channel",
"className": "com.rany.ops.framework.core.channel.DummyChannel",
"config": {},
"next": []
}
],
"sinks": [
{
"name": "fake_sink",
"className": "xxxx",
"config": {}
}
]
}
}
"app": {
"name": "pipe-first-demo", //应用名
"emergency": "dick" // 紧急联系人
}
"sls": {
"enable": true, // 是否开启日志记录功能
"loggerKeys": [
"count" // 最终流程处理完成后需要打印哪些字段
]
}
"monitor": {
"enable": "true", //是否开启
"dingTalkUrl": "", //钉钉告警机器人
"dingTalkSecret": "", //secret
"alertName": "pipeline告警" //alertName
}
"process": {
"sources": [
{
"name": "fake_source",
"className": "com.rany.ops.framework.core.source.FakeSource",
"config": {
"timeIntervalMs": 3000
},
"convertor": {
"className": "",
"config": {
}
}
"next": [
"fake_channel"
]
}
],
"channels": [
{
"name": "fake_channel",
"className": "com.rany.ops.framework.core.channel.DummyChannel",
"config": {
},
"next": []
}
],
"sinks": [
{
"name": "fake_sink",
"className": "xxxx",
"config": {
}
}
]
}
自定义流程核心配置。需保证内部不会成环(DAG),框架层面启动时候会做校验。
- source: 流程起点
- convertor: 当source启动后,会将消息进行下发,存在很多时候,需要对MQ或者其他数据源中的数据进行一定格式的转换,因此可以在source配置节点中额外配置一个convertor, 具体抽象可以参看 MessageConvertor.convert(Object object)方法
- channel: 流程数据处理环节
- 数据加工: 可以对数据进行加工并扭转到下一工作channel,或进行数据下沉
- sink: 流程结束
- 数据下沉: 可以将由前置流程处理完成后的数据进行存储或二次分发
资源文件格式,采用json,每个资源的name需要独一无二。 框架resource资源路径下存在一个 resource.json (样例代码) 。具体实际应用场景,可通过 -r 命令参数指定其他资源文件。
[
{
"name": "counter",
"className": "com.rany.ops.framework.resource.DummyResource",
"configMap": {
"initValue": 20
},
"instanceNum": 1
}
]
框架层面内置告警通知:
机器人接入:
需配置dingTalkUrl && dingTalkSecret 即可。
"monitor": {
"enable": "true", #开关
"dingTalkUrl": "",
"dingTalkSecret": "",
"mobiles": [
"1866848xxxx"
],
"alertName": "pipeline告警"
}