Skip to content

Commit 969b562

Browse files
authored
feat: support function config (#192)
1 parent 4fab108 commit 969b562

File tree

14 files changed

+419
-162
lines changed

14 files changed

+419
-162
lines changed

clients/gofs/gofs.go

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,22 @@ type moduleWrapper struct {
7373
registerErr error
7474
}
7575

76+
func (m *moduleWrapper) AddInitFunc(initFunc func(context.Context) error) *moduleWrapper {
77+
parentInit := m.initFunc
78+
if parentInit != nil {
79+
m.initFunc = func(ctx context.Context) error {
80+
err := parentInit(ctx)
81+
if err != nil {
82+
return err
83+
}
84+
return initFunc(ctx)
85+
}
86+
} else {
87+
m.initFunc = initFunc
88+
}
89+
return m
90+
}
91+
7692
func (c *fsClient) Register(module string, wrapper *moduleWrapper) FSClient {
7793
if c.err != nil {
7894
return c
@@ -200,23 +216,29 @@ func Custom(init func(ctx context.Context) error, execute func(ctx context.Conte
200216
}
201217

202218
type FunctionContext struct {
203-
c *fsClient
219+
c *fsClient
220+
name string
221+
module string
204222
}
205223

206224
func (c *FunctionContext) GetState(ctx context.Context, key string) ([]byte, error) {
207-
return c.c.rpc.GetState(ctx, key)
225+
return c.c.rpc.GetState(c.warpContext(ctx), key)
208226
}
209227

210228
func (c *FunctionContext) PutState(ctx context.Context, key string, value []byte) error {
211-
return c.c.rpc.PutState(ctx, key, value)
229+
return c.c.rpc.PutState(c.warpContext(ctx), key, value)
212230
}
213231

214232
func (c *FunctionContext) Write(ctx context.Context, payload []byte) error {
215-
return c.c.rpc.Write(ctx, payload)
233+
return c.c.rpc.Write(c.warpContext(ctx), payload)
216234
}
217235

218236
func (c *FunctionContext) Read(ctx context.Context) ([]byte, error) {
219-
return c.c.rpc.Read(ctx)
237+
return c.c.rpc.Read(c.warpContext(ctx))
238+
}
239+
240+
func (c *FunctionContext) GetConfig(ctx context.Context) (map[string]string, error) {
241+
return c.c.rpc.GetConfig(c.warpContext(ctx))
220242
}
221243

222244
type funcCtxKey struct{}
@@ -241,15 +263,6 @@ func (c *fsClient) Run() error {
241263
if funcName == "" {
242264
return fmt.Errorf("%s is not set", FSFunctionName)
243265
}
244-
funcCtx := &FunctionContext{c: c}
245-
if c.rpc == nil {
246-
rpc, err := newFSRPCClient()
247-
if err != nil {
248-
return err
249-
}
250-
c.rpc = rpc
251-
}
252-
ctx := c.rpc.GetContext(context.WithValue(context.Background(), funcCtxKey{}, funcCtx), funcName)
253266
module := os.Getenv(FSModuleName)
254267
if module == "" {
255268
module = DefaultModule
@@ -258,6 +271,15 @@ func (c *fsClient) Run() error {
258271
if !ok {
259272
return fmt.Errorf("module %s not found", module)
260273
}
274+
funcCtx := &FunctionContext{c: c, name: funcName, module: module}
275+
if c.rpc == nil {
276+
rpc, err := newFSRPCClient()
277+
if err != nil {
278+
return err
279+
}
280+
c.rpc = rpc
281+
}
282+
ctx := funcCtx.warpContext(context.WithValue(context.Background(), funcCtxKey{}, funcCtx))
261283
m.fsClient = c
262284
err := m.initFunc(ctx)
263285
if err != nil {

clients/gofs/gofs_socket.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,19 @@ import (
2525
"os"
2626
"strings"
2727

28+
"google.golang.org/grpc/metadata"
29+
2830
"github.com/functionstream/function-stream/fs/runtime/external/model"
2931
"google.golang.org/grpc"
3032
"google.golang.org/grpc/credentials/insecure"
31-
"google.golang.org/grpc/metadata"
3233
)
3334

35+
func (c *FunctionContext) warpContext(parent context.Context) context.Context {
36+
return metadata.NewOutgoingContext(parent, metadata.New(map[string]string{
37+
"name": c.name,
38+
}))
39+
}
40+
3441
type fsRPCClient struct {
3542
grpcCli model.FunctionClient
3643
}
@@ -65,13 +72,6 @@ func newFSRPCClient() (*fsRPCClient, error) {
6572
return &fsRPCClient{grpcCli: client}, nil
6673
}
6774

68-
func (c *fsRPCClient) GetContext(parent context.Context, funcName string) context.Context {
69-
md := metadata.New(map[string]string{
70-
"name": funcName,
71-
})
72-
return metadata.NewOutgoingContext(parent, md)
73-
}
74-
7575
func (c *fsRPCClient) RegisterSchema(ctx context.Context, schema string) error {
7676
_, err := c.grpcCli.RegisterSchema(ctx, &model.RegisterSchemaRequest{Schema: schema})
7777
if err != nil {
@@ -124,6 +124,14 @@ func (c *fsRPCClient) ListStates(ctx context.Context, path string) ([]string, er
124124
return res.Keys, nil
125125
}
126126

127+
func (c *fsRPCClient) GetConfig(ctx context.Context) (map[string]string, error) {
128+
res, err := c.grpcCli.GetConfig(ctx, &model.GetConfigRequest{})
129+
if err != nil {
130+
return nil, err
131+
}
132+
return res.Config, nil
133+
}
134+
127135
func (c *fsRPCClient) loadModule(_ *moduleWrapper) {
128136
// no-op
129137
}

clients/gofs/gofs_wasmfs.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,17 @@ func process() {
4848
}
4949
}
5050

51+
func (c *FunctionContext) warpContext(parent context.Context) context.Context {
52+
return parent
53+
}
54+
5155
type fsRPCClient struct {
5256
}
5357

5458
func newFSRPCClient() (*fsRPCClient, error) {
5559
return &fsRPCClient{}, nil
5660
}
5761

58-
func (c *fsRPCClient) GetContext(parent context.Context, funcName string) context.Context {
59-
return context.Background()
60-
}
61-
6262
func (c *fsRPCClient) RegisterSchema(ctx context.Context, schema string) error {
6363
_, err := syscall.Write(registerSchemaFd, []byte(schema))
6464
if err != nil {
@@ -68,19 +68,23 @@ func (c *fsRPCClient) RegisterSchema(ctx context.Context, schema string) error {
6868
}
6969

7070
func (c *fsRPCClient) Write(ctx context.Context, payload []byte) error {
71-
panic("rpc write not implemented")
71+
panic("rpc write not supported")
7272
}
7373

7474
func (c *fsRPCClient) Read(ctx context.Context) ([]byte, error) {
75-
panic("rpc read not implemented")
75+
panic("rpc read not supported")
7676
}
7777

7878
func (c *fsRPCClient) GetState(ctx context.Context, key string) ([]byte, error) {
79-
panic("rpc get state not implemented")
79+
panic("rpc get state not supported")
8080
}
8181

8282
func (c *fsRPCClient) PutState(ctx context.Context, key string, value []byte) error {
83-
panic("rpc put state not implemented")
83+
panic("rpc put state not supported")
84+
}
85+
86+
func (c *fsRPCClient) GetConfig(ctx context.Context) (map[string]string, error) {
87+
panic("rpc get config not supported")
8488
}
8589

8690
func (c *fsRPCClient) loadModule(m *moduleWrapper) {

fs/api/func_ctx.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,5 @@ type FunctionContext interface {
2828
ListStates(ctx context.Context, startInclusive string, endExclusive string) ([]string, error)
2929
DeleteState(ctx context.Context, key string) error
3030
Write(record contube.Record) error
31+
GetConfig() map[string]string
3132
}

fs/func_ctx_impl.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package fs
1919
import (
2020
"context"
2121

22+
"github.com/functionstream/function-stream/common/model"
23+
2224
"github.com/functionstream/function-stream/fs/api"
2325
"github.com/functionstream/function-stream/fs/contube"
2426
"github.com/pkg/errors"
@@ -28,12 +30,13 @@ var ErrStateStoreNotLoaded = errors.New("state store not loaded")
2830

2931
type funcCtxImpl struct {
3032
api.FunctionContext
33+
function *model.Function
3134
stateStore api.StateStore
3235
sink chan<- contube.Record
3336
}
3437

35-
func newFuncCtxImpl(store api.StateStore) *funcCtxImpl {
36-
return &funcCtxImpl{stateStore: store}
38+
func newFuncCtxImpl(function *model.Function, store api.StateStore) *funcCtxImpl {
39+
return &funcCtxImpl{function: function, stateStore: store}
3740
}
3841

3942
func (f *funcCtxImpl) checkStateStore() error {
@@ -79,6 +82,10 @@ func (f *funcCtxImpl) Write(record contube.Record) error {
7982
return nil
8083
}
8184

85+
func (f *funcCtxImpl) GetConfig() map[string]string {
86+
return f.function.Config
87+
}
88+
8289
func (f *funcCtxImpl) setSink(sink chan<- contube.Record) {
8390
f.sink = sink
8491
}

fs/func_ctx_impl_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
)
2525

2626
func TestFuncCtx_NilStore(t *testing.T) {
27-
f := newFuncCtxImpl(nil)
27+
f := newFuncCtxImpl(nil, nil)
2828
assert.ErrorIs(t, f.PutState(context.Background(), "key", []byte("value")), ErrStateStoreNotLoaded)
2929
_, err := f.GetState(context.Background(), "key")
3030
assert.ErrorIs(t, err, ErrStateStoreNotLoaded)

fs/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func (fm *functionManagerImpl) StartFunction(f *model.Function) error { // TODO:
243243
return fmt.Errorf("failed to create state store: %w", err)
244244
}
245245

246-
funcCtx := newFuncCtxImpl(store)
246+
funcCtx := newFuncCtxImpl(f, store)
247247
instanceLogger := fm.log.SubLogger("functionName", f.Name, "instanceIndex", int(i), "runtimeType", runtimeConfig.Type)
248248
instance := fm.options.instanceFactory.NewFunctionInstance(f, funcCtx, i, instanceLogger)
249249
fm.functionsLock.Lock()

0 commit comments

Comments
 (0)