Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change CaaS Function Type, add return value:interface{} #18

Merged
merged 1 commit into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions conn/kis_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@ func (conn *KisConnector) Init() error {
}

// Call 调用Connector 外挂存储逻辑的读写操作
func (conn *KisConnector) Call(ctx context.Context, flow kis.Flow, args interface{}) error {
if err := kis.Pool().CallConnector(ctx, flow, conn, args); err != nil {
return err
func (conn *KisConnector) Call(ctx context.Context, flow kis.Flow, args interface{}) (interface{}, error) {
var result interface{}
var err error

result, err = kis.Pool().CallConnector(ctx, flow, conn, args)
if err != nil {
return nil, err
}

return nil
return result, nil
}

func (conn *KisConnector) GetName() string {
Expand Down
2 changes: 1 addition & 1 deletion kis/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type Connector interface {
// Init 初始化Connector所关联的存储引擎链接等
Init() error
// Call 调用Connector 外挂存储逻辑的读写操作
Call(ctx context.Context, flow Flow, args interface{}) error
Call(ctx context.Context, flow Flow, args interface{}) (interface{}, error)
// GetId 获取Connector的ID
GetId() string
// GetName 获取Connector的名称
Expand Down
4 changes: 2 additions & 2 deletions kis/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (pool *kisPool) CaaS(cname string, fname string, mode common.KisMode, c Caa
}

// CallConnector 调度 Connector
func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connector, args interface{}) error {
func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connector, args interface{}) (interface{}, error) {
pool.cLock.RLock() // 读锁
defer pool.cLock.RUnlock()
fn := flow.GetThisFunction()
Expand All @@ -225,7 +225,7 @@ func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connecto

log.Logger().ErrorFX(ctx, "CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.\n", conn.GetName(), fnConf.FName, mode)

return errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode))
return nil, errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode))
}

// GetFlows 得到全部的Flow
Expand Down
2 changes: 1 addition & 1 deletion kis/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type connInitRouter map[string]ConnInit
Connector Call
*/
// CaaS Connector的存储读取业务实现
type CaaS func(context.Context, Connector, Function, Flow, interface{}) error
type CaaS func(context.Context, Connector, Function, Flow, interface{}) (interface{}, error)

// connFuncRouter 通过FunctionName索引到CaaS回调存储业务的映射关系
// key: Function Name
Expand Down
4 changes: 2 additions & 2 deletions test/caas/caas_demo1.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (

// type CaaS func(context.Context, Connector, Function, Flow, interface{}) error

func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) error {
func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) (interface{}, error) {
fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n",
flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode)

fmt.Printf("Params = %+v\n", conn.GetConfig().Params)

fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args)

return nil
return nil, nil
}
2 changes: 1 addition & 1 deletion test/faas/faas_demo2.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error {
return err
}

if conn.Call(ctx, flow, row) != nil {
if _, err := conn.Call(ctx, flow, row); err != nil {
log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): Call err = %s\n", err.Error())
return err
}
Expand Down