Skip to content

Commit

Permalink
change CaaS Function Type, add return value:interface{}
Browse files Browse the repository at this point in the history
  • Loading branch information
aceld committed Apr 3, 2024
1 parent dc5d9a3 commit dd634e6
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 11 deletions.
12 changes: 8 additions & 4 deletions conn/kis_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@ func (conn *KisConnector) Init() error {
}

// Call 调用Connector 外挂存储逻辑的读写操作
func (conn *KisConnector) Call(ctx context.Context, flow kis.Flow, args interface{}) error {
if err := kis.Pool().CallConnector(ctx, flow, conn, args); err != nil {
return err
func (conn *KisConnector) Call(ctx context.Context, flow kis.Flow, args interface{}) (interface{}, error) {
var result interface{}
var err error

result, err = kis.Pool().CallConnector(ctx, flow, conn, args)
if err != nil {
return nil, err
}

return nil
return result, nil
}

func (conn *KisConnector) GetName() string {
Expand Down
2 changes: 1 addition & 1 deletion kis/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type Connector interface {
// Init 初始化Connector所关联的存储引擎链接等
Init() error
// Call 调用Connector 外挂存储逻辑的读写操作
Call(ctx context.Context, flow Flow, args interface{}) error
Call(ctx context.Context, flow Flow, args interface{}) (interface{}, error)
// GetId 获取Connector的ID
GetId() string
// GetName 获取Connector的名称
Expand Down
4 changes: 2 additions & 2 deletions kis/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (pool *kisPool) CaaS(cname string, fname string, mode common.KisMode, c Caa
}

// CallConnector 调度 Connector
func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connector, args interface{}) error {
func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connector, args interface{}) (interface{}, error) {
pool.cLock.RLock() // 读锁
defer pool.cLock.RUnlock()
fn := flow.GetThisFunction()
Expand All @@ -225,7 +225,7 @@ func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connecto

log.Logger().ErrorFX(ctx, "CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.\n", conn.GetName(), fnConf.FName, mode)

return errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode))
return nil, errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode))
}

// GetFlows 得到全部的Flow
Expand Down
2 changes: 1 addition & 1 deletion kis/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type connInitRouter map[string]ConnInit
Connector Call
*/
// CaaS Connector的存储读取业务实现
type CaaS func(context.Context, Connector, Function, Flow, interface{}) error
type CaaS func(context.Context, Connector, Function, Flow, interface{}) (interface{}, error)

// connFuncRouter 通过FunctionName索引到CaaS回调存储业务的映射关系
// key: Function Name
Expand Down
4 changes: 2 additions & 2 deletions test/caas/caas_demo1.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (

// type CaaS func(context.Context, Connector, Function, Flow, interface{}) error

func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) error {
func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) (interface{}, error) {
fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n",
flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode)

fmt.Printf("Params = %+v\n", conn.GetConfig().Params)

fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args)

return nil
return nil, nil
}
2 changes: 1 addition & 1 deletion test/faas/faas_demo2.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error {
return err
}

if conn.Call(ctx, flow, row) != nil {
if _, err := conn.Call(ctx, flow, row); err != nil {
log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): Call err = %s\n", err.Error())
return err
}
Expand Down

0 comments on commit dd634e6

Please sign in to comment.