Skip to content

Commit

Permalink
Merge pull request #15 from gcggcg/gcg_kis-flow
Browse files Browse the repository at this point in the history
修复CallFunction调度业务函数并发读写问题&格式化README.md文档
  • Loading branch information
aceld authored Mar 29, 2024
2 parents 8ed1398 + fe5e08f commit 5675319
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 37 deletions.
78 changes: 42 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# kis-flow


#### KisFlow(Keep It Simple Flowing)
#### KisFlow(Keep It Simple Flowing)

基于Golang的流式计算框架. 为保持简单的流动,强调在进行各种活动或工作时保持简洁、清晰、流畅的过程。

Expand All @@ -10,55 +9,47 @@
---

## KisFlow源代码

Github
Git: https://github.com/aceld/kis-flow

Gitee(China)
Git: https://gitee.com/Aceld/kis-flow



## 《KisFlow开发者文档》

https://www.yuque.com/aceld/kis-flow-doc


## KisFlow框架开发教程:《基于Golang的流式计算框架实战教程》

https://www.yuque.com/aceld/hsa94o




## KisFlow系统定位

KisFlow为业务上游计算层,上层接数仓/其他业务方ODS层、下游接本业务存储数据中心。<br />
![yuque_diagram (2)](https://github.com/aceld/kis-flow/assets/7778936/b9e1957a-2d11-45d9-84c1-e92c9ac833cc)


<a name="elhiR"></a>

## KisFlow整体架构图

| 层级 | 层级说明 | 包括子模块 |
| --- | --- | --- |
| 层级 | 层级说明 | 包括子模块 |
|-------|------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 流式计算层 | 为KisFlow上游计算层,直接对接业务存储及数仓ODS层,如上游可以为Mysql Binlog、日志、接口数据等,为被动消费模式,提供KisFlow实时计算能力。 | **KisFlow**:分布式批量消费者,一个KisFlow是由多个KisFunction组合。<br /><br />**KisConnectors**:计算数据流流中间状态持久存储及连接器。<br /><br />**KisFunctions**:支持算子表达式拼接,Connectors集成、策略配置、Stateful Function模式、Slink流式拼接等。<br /><br />**KisConfig:** KisFunction的绑定的流处理策略,可以绑定ReSource让Function具有固定的独立流处理能力。<br /><br />**KisSource:** 对接ODS的数据源 |
| 任务调度层 | 定时任务调度及执行器业务逻辑,包括任务调度平台、执行器管理、调度日志及用户管理等。提供KisFlow的定时任务、统计、聚合运算等调度计算能力。 | **任务调度平台可视化**:包括任务的运行报表、调度报表、成功比例、任务管理、配置管理、GLUE IDE等可视化管理平台。<br /><br />执行器管理**KisJobs**:Golang SDK及计算自定义业务逻辑、执行器的自动注册、任务触发、终止及摘除等。<br /><br />**执行器场景KisScenes:** 根据业务划分的逻辑任务集合。<br /><br />**调度日志及用户管理**:任务调度日志收集、调度详细、调度流程痕迹等。 |
| 任务调度层 | 定时任务调度及执行器业务逻辑,包括任务调度平台、执行器管理、调度日志及用户管理等。提供KisFlow的定时任务、统计、聚合运算等调度计算能力。 | **任务调度平台可视化**:包括任务的运行报表、调度报表、成功比例、任务管理、配置管理、GLUE IDE等可视化管理平台。<br /><br />执行器管理**KisJobs**:Golang SDK及计算自定义业务逻辑、执行器的自动注册、任务触发、终止及摘除等。<br /><br />**执行器场景KisScenes:** 根据业务划分的逻辑任务集合。<br /><br />**调度日志及用户管理**:任务调度日志收集、调度详细、调度流程痕迹等。 |

![KisFlow架构图drawio](https://github.com/aceld/kis-flow/assets/7778936/3b829bdb-600d-4ab9-9e62-e14f90737cc3)



![KisFlow架构设计-KisFlow整体结构 drawio](https://github.com/aceld/kis-flow/assets/7778936/efc1b29d-9dd4-4945-a35a-fb9a618002d7)


KisFlow是一种流式概念形态,具体表现的特征如下:<br />

1、一个KisFlow可以由任意KisFunction组成,且KisFlow可以动态的调整长度。<br />

2、一个KisFunction可以随时动态的加入到某个KisFlow中,且KisFlow和KisFlow之间的关系可以通过KisFunction的Load和Save节点的加入,进行动态的并流和分流动作。<br />

3、KisFlow在编程行为上,从面向流进行数据业务编程,变成了面向KisFunction的函数单计算逻辑的开发,接近FaaS(Function as a service)体系。

3、KisFlow在编程行为上,从面向流进行数据业务编程,变成了面向KisFunction的函数单计算逻辑的开发,接近FaaS(Function as a
service)体系。

## Example

Expand All @@ -71,6 +62,7 @@ https://github.com/aceld/kis-flow-usage
中文:https://www.yuque.com/aceld/kis-flow-doc

#### 安装KisFlow

```bash
$go get github.com/aceld/kis-flow
```
Expand All @@ -79,6 +71,7 @@ $go get github.com/aceld/kis-flow
<summary>1. Quick Start(快速开始)</summary>

### 案例源代码

https://github.com/aceld/kis-flow-usage/tree/main/1-quick_start

### 项目目录
Expand All @@ -94,7 +87,9 @@ https://github.com/aceld/kis-flow-usage/tree/main/1-quick_start
<img width="770" alt="image" src="https://github.com/aceld/kis-flow/assets/7778936/3747ed10-aba1-417e-a3c1-c6205a02444b">

### Main

> main.go
```go
package main

Expand Down Expand Up @@ -147,6 +142,7 @@ func init() {
### Function1

> faas_stu_score_avg.go
```go
package main

Expand Down Expand Up @@ -188,7 +184,9 @@ func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) erro
```

### Function2

> faas_stu_score_avg_print.go
```go
package main

Expand Down Expand Up @@ -220,6 +218,7 @@ func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgSco
```

### OutPut

```bash
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
Expand All @@ -229,18 +228,18 @@ stuid: [101], avg score: [90]
stuid: [102], avg score: [76.66666666666667]
```


</details>


<details>
<summary>2. Quick Start With Config(快速开始)</summary>

### 案例源代码

https://github.com/aceld/kis-flow-usage/tree/main/2-quick_start_with_config

项目目录

```bash
├── Makefile
├── conf
Expand All @@ -253,19 +252,22 @@ https://github.com/aceld/kis-flow-usage/tree/main/2-quick_start_with_config
```

### Flow

<img width="770" alt="image" src="https://github.com/aceld/kis-flow/assets/7778936/3747ed10-aba1-417e-a3c1-c6205a02444b">

### Config

#### (1) Flow Config

> conf/flow-CalStuAvgScore.yml
```yaml
kistype: flow
status: 1
flow_name: CalStuAvgScore
flows:
- fname: AvgStuScore
- fname: PrintStuAvgScore
- fname: AvgStuScore
- fname: PrintStuAvgScore
```
#### (2) Function1 Config
Expand All @@ -277,9 +279,9 @@ kistype: func
fname: AvgStuScore
fmode: Calculate
source:
name: 学生学分
must:
- stu_id
name: 学生学分
must:
- stu_id
```
#### (3) Function2(Slink) Config
Expand All @@ -291,12 +293,13 @@ kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
name: 学生学分
must:
- stu_id
name: 学生学分
must:
- stu_id
```
### Main
> main.go
```go
Expand Down Expand Up @@ -344,7 +347,9 @@ func init() {
```

### Function1

> faas_stu_score_avg.go
```go
package main

Expand Down Expand Up @@ -386,7 +391,9 @@ func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) erro
```

### Function2

> faas_stu_score_avg_print.go
```go
package main

Expand Down Expand Up @@ -418,6 +425,7 @@ func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgSco
```

### OutPut

```bash
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
Expand All @@ -433,23 +441,21 @@ stuid: [102], avg score: [76.66666666666667]

### 开发者

* 刘丹冰([@aceld](https://github.com/aceld))
* 刘丹冰([@aceld](https://github.com/aceld))
* 胡辰豪([@ChenHaoHu](https://github.com/ChenHaoHu))


Thanks to all the developers who contributed to KisFlow!

<a href="https://github.com/aceld/kis-flow/graphs/contributors">
<img src="https://contrib.rocks/image?repo=aceld/kis-flow" />
</a>


### 加入KisFlow 社区

| platform | Entry |
| ---- | ---- |
| <img src="https://user-images.githubusercontent.com/7778936/236775008-6bd488e3-249a-4d43-8885-7e3889e11e2d.png" width = "100" height = "100" alt="" align=center />| https://discord.gg/xQ8Xxfyfcz|
| <img src="https://user-images.githubusercontent.com/7778936/236775137-5381f8a6-f534-49c4-8628-e52bf245c3bc.jpeg" width = "100" height = "100" alt="" align=center /> | 加微信: `ace_ld` 或扫二维码,备注`flow`即可。</br><img src="https://user-images.githubusercontent.com/7778936/236781258-2f0371bd-5797-49e8-a74c-680e9f15843d.png" width = "150" height = "150" alt="" align=center /> |
|<img src="https://user-images.githubusercontent.com/7778936/236778547-9cdadfb6-0f62-48ac-851a-b940389038d0.jpeg" width = "100" height = "100" alt="" align=center />|<img src="https://s1.ax1x.com/2020/07/07/UFyUdx.th.jpg" height = "150" alt="" align=center /> **WeChat Public Account** |
|<img src="https://user-images.githubusercontent.com/7778936/236779000-70f16c8f-0eec-4b5f-9faa-e1d5229a43e0.png" width = "100" height = "100" alt="" align=center />|<img src="https://s1.ax1x.com/2020/07/07/UF6Y9S.th.png" width = "150" height = "150" alt="" align=center /> **QQ Group** |
| platform | Entry |
|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| <img src="https://user-images.githubusercontent.com/7778936/236775008-6bd488e3-249a-4d43-8885-7e3889e11e2d.png" width = "100" height = "100" alt="" align=center /> | https://discord.gg/xQ8Xxfyfcz |
| <img src="https://user-images.githubusercontent.com/7778936/236775137-5381f8a6-f534-49c4-8628-e52bf245c3bc.jpeg" width = "100" height = "100" alt="" align=center /> | 加微信: `ace_ld` 或扫二维码,备注`flow`即可。</br><img src="https://user-images.githubusercontent.com/7778936/236781258-2f0371bd-5797-49e8-a74c-680e9f15843d.png" width = "150" height = "150" alt="" align=center /> |
| <img src="https://user-images.githubusercontent.com/7778936/236778547-9cdadfb6-0f62-48ac-851a-b940389038d0.jpeg" width = "100" height = "100" alt="" align=center /> | <img src="https://s1.ax1x.com/2020/07/07/UFyUdx.th.jpg" height = "150" alt="" align=center /> **WeChat Public Account** |
| <img src="https://user-images.githubusercontent.com/7778936/236779000-70f16c8f-0eec-4b5f-9faa-e1d5229a43e0.png" width = "100" height = "100" alt="" align=center /> | <img src="https://s1.ax1x.com/2020/07/07/UF6Y9S.th.png" width = "150" height = "150" alt="" align=center /> **QQ Group** |

3 changes: 2 additions & 1 deletion kis/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func (pool *kisPool) FaaS(fnName string, f FaaS) {

// CallFunction 调度 Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {

pool.fnLock.RLock() // 读锁
defer pool.fnLock.RUnlock()
if funcDesc, ok := pool.fnRouter[fnName]; ok {

// 被调度Function的形参列表
Expand Down

0 comments on commit 5675319

Please sign in to comment.