Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

修复CallFunction调度业务函数并发读写问题&格式化README.md文档 #15

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 42 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# kis-flow


#### KisFlow(Keep It Simple Flowing)
#### KisFlow(Keep It Simple Flowing)

基于Golang的流式计算框架. 为保持简单的流动,强调在进行各种活动或工作时保持简洁、清晰、流畅的过程。

Expand All @@ -10,55 +9,47 @@
---

## KisFlow源代码

Github
Git: https://github.com/aceld/kis-flow

Gitee(China)
Git: https://gitee.com/Aceld/kis-flow



## 《KisFlow开发者文档》

https://www.yuque.com/aceld/kis-flow-doc


## KisFlow框架开发教程:《基于Golang的流式计算框架实战教程》

https://www.yuque.com/aceld/hsa94o




## KisFlow系统定位

KisFlow为业务上游计算层,上层接数仓/其他业务方ODS层、下游接本业务存储数据中心。<br />
![yuque_diagram (2)](https://github.com/aceld/kis-flow/assets/7778936/b9e1957a-2d11-45d9-84c1-e92c9ac833cc)


<a name="elhiR"></a>

## KisFlow整体架构图

| 层级 | 层级说明 | 包括子模块 |
| --- | --- | --- |
| 层级 | 层级说明 | 包括子模块 |
|-------|------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 流式计算层 | 为KisFlow上游计算层,直接对接业务存储及数仓ODS层,如上游可以为Mysql Binlog、日志、接口数据等,为被动消费模式,提供KisFlow实时计算能力。 | **KisFlow**:分布式批量消费者,一个KisFlow是由多个KisFunction组合。<br /><br />**KisConnectors**:计算数据流流中间状态持久存储及连接器。<br /><br />**KisFunctions**:支持算子表达式拼接,Connectors集成、策略配置、Stateful Function模式、Slink流式拼接等。<br /><br />**KisConfig:** KisFunction的绑定的流处理策略,可以绑定ReSource让Function具有固定的独立流处理能力。<br /><br />**KisSource:** 对接ODS的数据源 |
| 任务调度层 | 定时任务调度及执行器业务逻辑,包括任务调度平台、执行器管理、调度日志及用户管理等。提供KisFlow的定时任务、统计、聚合运算等调度计算能力。 | **任务调度平台可视化**:包括任务的运行报表、调度报表、成功比例、任务管理、配置管理、GLUE IDE等可视化管理平台。<br /><br />执行器管理**KisJobs**:Golang SDK及计算自定义业务逻辑、执行器的自动注册、任务触发、终止及摘除等。<br /><br />**执行器场景KisScenes:** 根据业务划分的逻辑任务集合。<br /><br />**调度日志及用户管理**:任务调度日志收集、调度详细、调度流程痕迹等。 |
| 任务调度层 | 定时任务调度及执行器业务逻辑,包括任务调度平台、执行器管理、调度日志及用户管理等。提供KisFlow的定时任务、统计、聚合运算等调度计算能力。 | **任务调度平台可视化**:包括任务的运行报表、调度报表、成功比例、任务管理、配置管理、GLUE IDE等可视化管理平台。<br /><br />执行器管理**KisJobs**:Golang SDK及计算自定义业务逻辑、执行器的自动注册、任务触发、终止及摘除等。<br /><br />**执行器场景KisScenes:** 根据业务划分的逻辑任务集合。<br /><br />**调度日志及用户管理**:任务调度日志收集、调度详细、调度流程痕迹等。 |

![KisFlow架构图drawio](https://github.com/aceld/kis-flow/assets/7778936/3b829bdb-600d-4ab9-9e62-e14f90737cc3)



![KisFlow架构设计-KisFlow整体结构 drawio](https://github.com/aceld/kis-flow/assets/7778936/efc1b29d-9dd4-4945-a35a-fb9a618002d7)


KisFlow是一种流式概念形态,具体表现的特征如下:<br />

1、一个KisFlow可以由任意KisFunction组成,且KisFlow可以动态的调整长度。<br />

2、一个KisFunction可以随时动态的加入到某个KisFlow中,且KisFlow和KisFlow之间的关系可以通过KisFunction的Load和Save节点的加入,进行动态的并流和分流动作。<br />

3、KisFlow在编程行为上,从面向流进行数据业务编程,变成了面向KisFunction的函数单计算逻辑的开发,接近FaaS(Function as a service)体系。

3、KisFlow在编程行为上,从面向流进行数据业务编程,变成了面向KisFunction的函数单计算逻辑的开发,接近FaaS(Function as a
service)体系。

## Example

Expand All @@ -71,6 +62,7 @@ https://github.com/aceld/kis-flow-usage
中文:https://www.yuque.com/aceld/kis-flow-doc

#### 安装KisFlow

```bash
$go get github.com/aceld/kis-flow
```
Expand All @@ -79,6 +71,7 @@ $go get github.com/aceld/kis-flow
<summary>1. Quick Start(快速开始)</summary>

### 案例源代码

https://github.com/aceld/kis-flow-usage/tree/main/1-quick_start

### 项目目录
Expand All @@ -94,7 +87,9 @@ https://github.com/aceld/kis-flow-usage/tree/main/1-quick_start
<img width="770" alt="image" src="https://github.com/aceld/kis-flow/assets/7778936/3747ed10-aba1-417e-a3c1-c6205a02444b">

### Main

> main.go

```go
package main

Expand Down Expand Up @@ -147,6 +142,7 @@ func init() {
### Function1

> faas_stu_score_avg.go

```go
package main

Expand Down Expand Up @@ -188,7 +184,9 @@ func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) erro
```

### Function2

> faas_stu_score_avg_print.go

```go
package main

Expand Down Expand Up @@ -220,6 +218,7 @@ func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgSco
```

### OutPut

```bash
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
Expand All @@ -229,18 +228,18 @@ stuid: [101], avg score: [90]
stuid: [102], avg score: [76.66666666666667]
```


</details>


<details>
<summary>2. Quick Start With Config(快速开始)</summary>

### 案例源代码

https://github.com/aceld/kis-flow-usage/tree/main/2-quick_start_with_config

项目目录

```bash
├── Makefile
├── conf
Expand All @@ -253,19 +252,22 @@ https://github.com/aceld/kis-flow-usage/tree/main/2-quick_start_with_config
```

### Flow

<img width="770" alt="image" src="https://github.com/aceld/kis-flow/assets/7778936/3747ed10-aba1-417e-a3c1-c6205a02444b">

### Config

#### (1) Flow Config

> conf/flow-CalStuAvgScore.yml

```yaml
kistype: flow
status: 1
flow_name: CalStuAvgScore
flows:
- fname: AvgStuScore
- fname: PrintStuAvgScore
- fname: AvgStuScore
- fname: PrintStuAvgScore
```

#### (2) Function1 Config
Expand All @@ -277,9 +279,9 @@ kistype: func
fname: AvgStuScore
fmode: Calculate
source:
name: 学生学分
must:
- stu_id
name: 学生学分
must:
- stu_id
```

#### (3) Function2(Slink) Config
Expand All @@ -291,12 +293,13 @@ kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
name: 学生学分
must:
- stu_id
name: 学生学分
must:
- stu_id
```

### Main

> main.go

```go
Expand Down Expand Up @@ -344,7 +347,9 @@ func init() {
```

### Function1

> faas_stu_score_avg.go

```go
package main

Expand Down Expand Up @@ -386,7 +391,9 @@ func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) erro
```

### Function2

> faas_stu_score_avg_print.go

```go
package main

Expand Down Expand Up @@ -418,6 +425,7 @@ func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgSco
```

### OutPut

```bash
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
Expand All @@ -433,23 +441,21 @@ stuid: [102], avg score: [76.66666666666667]

### 开发者

* 刘丹冰([@aceld](https://github.com/aceld))
* 刘丹冰([@aceld](https://github.com/aceld))
* 胡辰豪([@ChenHaoHu](https://github.com/ChenHaoHu))


Thanks to all the developers who contributed to KisFlow!

<a href="https://github.com/aceld/kis-flow/graphs/contributors">
<img src="https://contrib.rocks/image?repo=aceld/kis-flow" />
</a>


### 加入KisFlow 社区

| platform | Entry |
| ---- | ---- |
| <img src="https://user-images.githubusercontent.com/7778936/236775008-6bd488e3-249a-4d43-8885-7e3889e11e2d.png" width = "100" height = "100" alt="" align=center />| https://discord.gg/xQ8Xxfyfcz|
| <img src="https://user-images.githubusercontent.com/7778936/236775137-5381f8a6-f534-49c4-8628-e52bf245c3bc.jpeg" width = "100" height = "100" alt="" align=center /> | 加微信: `ace_ld` 或扫二维码,备注`flow`即可。</br><img src="https://user-images.githubusercontent.com/7778936/236781258-2f0371bd-5797-49e8-a74c-680e9f15843d.png" width = "150" height = "150" alt="" align=center /> |
|<img src="https://user-images.githubusercontent.com/7778936/236778547-9cdadfb6-0f62-48ac-851a-b940389038d0.jpeg" width = "100" height = "100" alt="" align=center />|<img src="https://s1.ax1x.com/2020/07/07/UFyUdx.th.jpg" height = "150" alt="" align=center /> **WeChat Public Account** |
|<img src="https://user-images.githubusercontent.com/7778936/236779000-70f16c8f-0eec-4b5f-9faa-e1d5229a43e0.png" width = "100" height = "100" alt="" align=center />|<img src="https://s1.ax1x.com/2020/07/07/UF6Y9S.th.png" width = "150" height = "150" alt="" align=center /> **QQ Group** |
| platform | Entry |
|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| <img src="https://user-images.githubusercontent.com/7778936/236775008-6bd488e3-249a-4d43-8885-7e3889e11e2d.png" width = "100" height = "100" alt="" align=center /> | https://discord.gg/xQ8Xxfyfcz |
| <img src="https://user-images.githubusercontent.com/7778936/236775137-5381f8a6-f534-49c4-8628-e52bf245c3bc.jpeg" width = "100" height = "100" alt="" align=center /> | 加微信: `ace_ld` 或扫二维码,备注`flow`即可。</br><img src="https://user-images.githubusercontent.com/7778936/236781258-2f0371bd-5797-49e8-a74c-680e9f15843d.png" width = "150" height = "150" alt="" align=center /> |
| <img src="https://user-images.githubusercontent.com/7778936/236778547-9cdadfb6-0f62-48ac-851a-b940389038d0.jpeg" width = "100" height = "100" alt="" align=center /> | <img src="https://s1.ax1x.com/2020/07/07/UFyUdx.th.jpg" height = "150" alt="" align=center /> **WeChat Public Account** |
| <img src="https://user-images.githubusercontent.com/7778936/236779000-70f16c8f-0eec-4b5f-9faa-e1d5229a43e0.png" width = "100" height = "100" alt="" align=center /> | <img src="https://s1.ax1x.com/2020/07/07/UF6Y9S.th.png" width = "150" height = "150" alt="" align=center /> **QQ Group** |

3 changes: 2 additions & 1 deletion kis/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func (pool *kisPool) FaaS(fnName string, f FaaS) {

// CallFunction 调度 Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {

pool.fnLock.RLock() // 读锁
defer pool.fnLock.RUnlock()
if funcDesc, ok := pool.fnRouter[fnName]; ok {

// 被调度Function的形参列表
Expand Down