Skip to content

Commit

Permalink
add flow.CommitRowBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
aceld committed Mar 26, 2024
1 parent ec90a75 commit eae7443
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 0 deletions.
16 changes: 16 additions & 0 deletions flow/kis_flow_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/aceld/kis-flow/log"
"github.com/aceld/kis-flow/metrics"
"github.com/patrickmn/go-cache"
"reflect"
"time"
)

Expand All @@ -20,6 +21,21 @@ func (flow *KisFlow) CommitRow(row interface{}) error {
return nil
}

// CommitRowBatch 提交Flow数据, 批量数据
func (flow *KisFlow) CommitRowBatch(rows interface{}) error {
v := reflect.ValueOf(rows)
if v.Kind() != reflect.Slice {
return fmt.Errorf("Commit Data is not a slice")
}

for i := 0; i < v.Len(); i++ {
row := v.Index(i).Interface().(common.KisRow)
flow.buffer = append(flow.buffer, row)
}

return nil
}

// Input 得到flow当前执行Function的输入源数据
func (flow *KisFlow) Input() common.KisRowArr {
return flow.inPut
Expand Down
3 changes: 3 additions & 0 deletions kis/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ type Flow interface {
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow 提交Flow数据到即将执行的Function层
CommitRow(row interface{}) error
// CommitRowBatch 提交Flow数据到即将执行的Function层(批量提交)
// row: Must be a slice
CommitRowBatch(row interface{}) error
// Input 得到flow当前执行Function的输入源数据
Input() common.KisRowArr
// GetName 得到Flow的名称
Expand Down
38 changes: 38 additions & 0 deletions test/kis_flow_commit_batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package test

import (
"context"
"github.com/aceld/kis-flow/file"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/log"
"testing"
)

func TestForkFlowCommitBatch(t *testing.T) {
ctx := context.Background()

// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("load_conf/"); err != nil {
panic(err)
}

// 2. 获取Flow
flow1 := kis.Pool().GetFlow("flowName1")

stringRows := []string{
"This is Data1 from Test",
"This is Data2 from Test",
"This is Data3 from Test",
}

// 3. 提交原始数据
if err := flow1.CommitRowBatch(stringRows); err != nil {
log.Logger().ErrorF("CommitRowBatch Error, err = %+v", err)
panic(err)
}

// 4. 执行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}

0 comments on commit eae7443

Please sign in to comment.