diff --git a/client/circuit_breaker.go b/client/circuit_breaker.go index 5c5a9bf..f5ab377 100644 --- a/client/circuit_breaker.go +++ b/client/circuit_breaker.go @@ -42,7 +42,9 @@ func WithCircuitBreaker(dest, src string, nacosClient nacos.Client, opts utils.O f(¶m) } - cbSuite := initCircuitBreaker(param, dest, src, nacosClient) + uniqueID := nacos.GetUniqueID() + + cbSuite := initCircuitBreaker(param, dest, src, nacosClient, uniqueID) return []client.Option{ client.WithCircuitBreaker(cbSuite), @@ -52,7 +54,7 @@ func WithCircuitBreaker(dest, src string, nacosClient nacos.Client, opts utils.O return err } // cancel the configuration listener when client is closed. - return nacosClient.DeregisterConfig(param) + return nacosClient.DeregisterConfig(param, uniqueID) }), } } @@ -77,7 +79,7 @@ func genServiceCBKey(toService, method string) string { } func initCircuitBreaker(param vo.ConfigParam, dest, src string, - nacosClient nacos.Client, + nacosClient nacos.Client, uniqueID int64, ) *circuitbreak.CBSuite { cb := circuitbreak.NewCBSuite(genServiceCBKeyWithRPCInfo) lcb := utils.ThreadSafeSet{} @@ -104,7 +106,7 @@ func initCircuitBreaker(param vo.ConfigParam, dest, src string, } } - nacosClient.RegisterConfigCallback(param, onChangeCallback) + nacosClient.RegisterConfigCallback(param, onChangeCallback, uniqueID) return cb } diff --git a/client/retry.go b/client/retry.go index 26ba913..b75f326 100644 --- a/client/retry.go +++ b/client/retry.go @@ -39,19 +39,21 @@ func WithRetryPolicy(dest, src string, nacosClient nacos.Client, opts utils.Opti f(¶m) } - rc := initRetryContainer(param, dest, nacosClient) + uniqueID := nacos.GetUniqueID() + + rc := initRetryContainer(param, dest, nacosClient, uniqueID) return []client.Option{ client.WithRetryContainer(rc), client.WithCloseCallbacks(rc.Close), client.WithCloseCallbacks(func() error { // cancel the configuration listener when client is closed. - return nacosClient.DeregisterConfig(param) + return nacosClient.DeregisterConfig(param, uniqueID) }), } } func initRetryContainer(param vo.ConfigParam, dest string, - nacosClient nacos.Client, + nacosClient nacos.Client, uniqueID int64, ) *retry.Container { retryContainer := retry.NewRetryContainerWithPercentageLimit() @@ -87,7 +89,7 @@ func initRetryContainer(param vo.ConfigParam, dest string, } } - nacosClient.RegisterConfigCallback(param, onChangeCallback) + nacosClient.RegisterConfigCallback(param, onChangeCallback, uniqueID) return retryContainer } diff --git a/client/rpc_timeout.go b/client/rpc_timeout.go index e919c52..2f0af06 100644 --- a/client/rpc_timeout.go +++ b/client/rpc_timeout.go @@ -39,17 +39,19 @@ func WithRPCTimeout(dest, src string, nacosClient nacos.Client, opts utils.Optio f(¶m) } + uniqueID := nacos.GetUniqueID() + return []client.Option{ - client.WithTimeoutProvider(initRPCTimeoutContainer(param, dest, nacosClient)), + client.WithTimeoutProvider(initRPCTimeoutContainer(param, dest, nacosClient, uniqueID)), client.WithCloseCallbacks(func() error { // cancel the configuration listener when client is closed. - return nacosClient.DeregisterConfig(param) + return nacosClient.DeregisterConfig(param, uniqueID) }), } } func initRPCTimeoutContainer(param vo.ConfigParam, dest string, - nacosClient nacos.Client, + nacosClient nacos.Client, uniqueID int64, ) rpcinfo.TimeoutProvider { rpcTimeoutContainer := rpctimeout.NewContainer() @@ -63,7 +65,7 @@ func initRPCTimeoutContainer(param vo.ConfigParam, dest string, rpcTimeoutContainer.NotifyPolicyChange(configs) } - nacosClient.RegisterConfigCallback(param, onChangeCallback) + nacosClient.RegisterConfigCallback(param, onChangeCallback, uniqueID) return rpcTimeoutContainer } diff --git a/example/go.mod b/example/go.mod index 57728e5..7563fbc 100644 --- a/example/go.mod +++ b/example/go.mod @@ -6,6 +6,7 @@ require ( github.com/cloudwego/kitex v0.7.2 github.com/cloudwego/kitex-examples v0.2.0 github.com/kitex-contrib/config-nacos v0.1.1 + github.com/nacos-group/nacos-sdk-go v1.1.4 ) require ( @@ -37,7 +38,6 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/gls v0.0.0-20220109145502-612d0167dce5 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/nacos-group/nacos-sdk-go v1.1.4 // indirect github.com/oleiade/lane v1.0.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/nacos/nacos.go b/nacos/nacos.go index 2a87ad7..29cbb20 100644 --- a/nacos/nacos.go +++ b/nacos/nacos.go @@ -16,6 +16,7 @@ package nacos import ( "bytes" + "sync" "text/template" "github.com/cloudwego/kitex/pkg/klog" @@ -26,13 +27,30 @@ import ( "github.com/nacos-group/nacos-sdk-go/vo" ) +// callbackHandler ... +type callbackHandler func(namespace, group, dataId, data string) + +type configParam struct { + DataID string + Group string +} + +// NOTE: the nacos client use namespace + dataID + group as cache key, and the namespace +// in client is fixed. +func configParamKey(in vo.ConfigParam) configParam { + return configParam{ + DataID: in.DataId, + Group: in.Group, + } +} + // Client the wrapper of nacos client. type Client interface { SetParser(ConfigParser) ClientConfigParam(cpc *ConfigParamConfig) (vo.ConfigParam, error) ServerConfigParam(cpc *ConfigParamConfig) (vo.ConfigParam, error) - RegisterConfigCallback(vo.ConfigParam, func(string, ConfigParser)) - DeregisterConfig(vo.ConfigParam) error + RegisterConfigCallback(vo.ConfigParam, func(string, ConfigParser), int64) + DeregisterConfig(vo.ConfigParam, int64) error } type client struct { @@ -42,6 +60,9 @@ type client struct { groupTemplate *template.Template serverDataIDTemplate *template.Template clientDataIDTemplate *template.Template + + handlerMutex sync.RWMutex + handlers map[configParam]map[int64]callbackHandler } // Options nacos config options. All the fields have default value. @@ -121,6 +142,7 @@ func NewClient(opts Options) (Client, error) { groupTemplate: groupTemplate, serverDataIDTemplate: serverDataIDTemplate, clientDataIDTemplate: clientDataIDTemplate, + handlers: map[configParam]map[int64]callbackHandler{}, } return c, nil } @@ -174,19 +196,77 @@ func (c *client) configParam(cpc *ConfigParamConfig, t *template.Template) (vo.C } // DeregisterConfig deregister the config. -func (c *client) DeregisterConfig(cfg vo.ConfigParam) error { - return c.ncli.CancelListenConfig(cfg) +func (c *client) DeregisterConfig(cfg vo.ConfigParam, uniqueID int64) error { + key := configParamKey(cfg) + klog.Debugf("deregister key %v for uniqueID %d", key, uniqueID) + c.handlerMutex.Lock() + defer c.handlerMutex.Unlock() + handlers, ok := c.handlers[key] + if ok { + delete(handlers, uniqueID) + } + if len(handlers) == 0 { + klog.Debugf("the handlers for key %v is empty, cancel listen config from nacos", key) + return c.ncli.CancelListenConfig(cfg) + } + return nil +} + +func (c *client) onChange(namespace, group, dataId, data string) { + handlers := make([]callbackHandler, 0, 5) + c.handlerMutex.RLock() + key := configParam{ + DataID: dataId, + Group: group, + } + for _, handler := range c.handlers[key] { + handlers = append(handlers, handler) + } + c.handlerMutex.RUnlock() + + for _, handler := range handlers { + handler(namespace, group, dataId, data) + } +} + +func (c *client) listenConfig(param vo.ConfigParam, uniqueID int64) { + key := configParamKey(param) + klog.Debugf("register key %v for uniqueID %d", key, uniqueID) + c.handlerMutex.Lock() + handlers, ok := c.handlers[key] + if !ok { + handlers = map[int64]callbackHandler{} + c.handlers[key] = handlers + } + handlers[uniqueID] = param.OnChange + c.handlerMutex.Unlock() + + if !ok { + klog.Debugf("the first time %v register, listen config from nacos", key) + err := c.ncli.ListenConfig(vo.ConfigParam{ + DataId: param.DataId, + Group: param.Group, + Content: param.Content, + DatumId: param.DatumId, + Type: param.Type, + OnChange: c.onChange, + }) + if err != nil { + panic(err) + } + } } // RegisterConfigCallback register the callback function to nacos client. func (c *client) RegisterConfigCallback(param vo.ConfigParam, - callback func(string, ConfigParser), + callback func(string, ConfigParser), uniqueID int64, ) { param.OnChange = func(namespace, group, dataId, data string) { - klog.Debugf("[nacos] config %s updated, namespace %s group %s dataId %s data %s", - param.DataId, namespace, group, dataId, data) + klog.Debugf("[nacos] uniqueID %d config %s updated, namespace %s group %s dataId %s data %s", + uniqueID, param.DataId, namespace, group, dataId, data) callback(data, c.parser) } + data, err := c.ncli.GetConfig(param) // the nacos client has handled the not exist error. if err != nil { @@ -195,8 +275,5 @@ func (c *client) RegisterConfigCallback(param vo.ConfigParam, callback(data, c.parser) - err = c.ncli.ListenConfig(param) - if err != nil { - panic(err) - } + c.listenConfig(param, uniqueID) } diff --git a/nacos/nacos_test.go b/nacos/nacos_test.go new file mode 100644 index 0000000..fcc0c67 --- /dev/null +++ b/nacos/nacos_test.go @@ -0,0 +1,192 @@ +// Copyright 2023 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nacos + +import ( + "fmt" + "sync" + "testing" + + "github.com/nacos-group/nacos-sdk-go/model" + "github.com/nacos-group/nacos-sdk-go/vo" + "github.com/stretchr/testify/assert" +) + +type fakeNacos struct { + sync.RWMutex + handlers map[configParam]callbackHandler +} + +func (fn *fakeNacos) GetConfig(param vo.ConfigParam) (string, error) { + return "", nil +} +func (fn *fakeNacos) PublishConfig(param vo.ConfigParam) (bool, error) { + return false, nil +} + +func (fn *fakeNacos) DeleteConfig(param vo.ConfigParam) (bool, error) { + return false, nil +} + +func (fn *fakeNacos) ListenConfig(params vo.ConfigParam) (err error) { + fn.Lock() + defer fn.Unlock() + fn.handlers[configParamKey(params)] = params.OnChange + return nil +} + +func (fn *fakeNacos) CancelListenConfig(params vo.ConfigParam) (err error) { + fn.Lock() + defer fn.Unlock() + delete(fn.handlers, configParamKey(params)) + return nil +} + +func (fn *fakeNacos) SearchConfig(param vo.SearchConfigParam) (*model.ConfigPage, error) { + return nil, nil +} + +func (fn *fakeNacos) PublishAggr(param vo.ConfigParam) (published bool, err error) { + return false, nil +} + +func (fn *fakeNacos) change(cfg configParam, data string) { + fn.Lock() + defer fn.Unlock() + + handler, ok := fn.handlers[cfg] + if !ok { + return + } + fmt.Println("find handers ", fn.handlers, "cfg ", cfg, " data", data) + handler("", cfg.Group, cfg.DataID, data) +} + +func TestRegisterAndDeRegister(t *testing.T) { + fake := &fakeNacos{ + handlers: map[configParam]callbackHandler{}, + } + c := &client{ + ncli: fake, + handlers: map[configParam]map[int64]callbackHandler{}, + } + + var gotlock sync.Mutex + gots := make(map[configParam]map[int64]string) + key := configParam{ + Group: "g1", + DataID: "d1", + } + + id1 := GetUniqueID() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // register + c.RegisterConfigCallback(vo.ConfigParam{ + DataId: "d1", + Group: "g1", + }, func(s string, cp ConfigParser) { + gotlock.Lock() + defer gotlock.Unlock() + ids, ok := gots[key] + if !ok { + ids = map[int64]string{} + gots[key] = ids + } + ids[id1] = s + }, id1) + }() + + id2 := GetUniqueID() + wg.Add(1) + go func() { + defer wg.Done() + c.RegisterConfigCallback(vo.ConfigParam{ + DataId: "d1", + Group: "g1", + }, func(s string, cp ConfigParser) { + gotlock.Lock() + defer gotlock.Unlock() + ids, ok := gots[key] + if !ok { + ids = map[int64]string{} + gots[key] = ids + } + ids[id2] = s + }, id2) + }() + wg.Wait() + + // first change + fake.change(configParam{ + DataID: "d1", + Group: "g1", + }, "first change") + + assert.Equal(t, map[configParam]map[int64]string{ + { + Group: "g1", + DataID: "d1", + }: { + id1: "first change", + id2: "first change", + }, + }, gots) + + // second change + c.DeregisterConfig(vo.ConfigParam{ + DataId: "d1", + Group: "g1", + }, id2) + + fake.change(configParam{ + DataID: "d1", + Group: "g1", + }, "second change") + + assert.Equal(t, map[configParam]map[int64]string{ + { + Group: "g1", + DataID: "d1", + }: { + id1: "second change", + id2: "first change", + }, + }, gots) + + // third change + c.DeregisterConfig(vo.ConfigParam{ + DataId: "d1", + Group: "g1", + }, id1) + + fake.change(configParam{ + DataID: "d1", + Group: "g1", + }, "third change") + + assert.Equal(t, map[configParam]map[int64]string{ + { + Group: "g1", + DataID: "d1", + }: { + id1: "second change", + id2: "first change", + }, + }, gots) +} diff --git a/nacos/uid.go b/nacos/uid.go new file mode 100644 index 0000000..7d25ea8 --- /dev/null +++ b/nacos/uid.go @@ -0,0 +1,28 @@ +// Copyright 2023 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nacos + +import "sync/atomic" + +var uniqueID atomic.Int64 + +func init() { + uniqueID.Store(0) +} + +// GetUniqueID get the unique id +func GetUniqueID() int64 { + return uniqueID.Add(1) +} diff --git a/server/limiter.go b/server/limiter.go index 9912d69..b870a34 100644 --- a/server/limiter.go +++ b/server/limiter.go @@ -40,11 +40,14 @@ func WithLimiter(dest string, nacosClient nacos.Client, opts utils.Options) serv for _, f := range opts.NacosCustomFunctions { f(¶m) } - - return server.WithLimit(initLimitOptions(param, dest, nacosClient)) + uniqueID := nacos.GetUniqueID() + server.RegisterShutdownHook(func() { + nacosClient.DeregisterConfig(param, uniqueID) + }) + return server.WithLimit(initLimitOptions(param, dest, nacosClient, uniqueID)) } -func initLimitOptions(param vo.ConfigParam, dest string, nacosClient nacos.Client) *limit.Option { +func initLimitOptions(param vo.ConfigParam, dest string, nacosClient nacos.Client, uniqueID int64) *limit.Option { var updater atomic.Value opt := &limit.Option{} opt.UpdateControl = func(u limit.Updater) { @@ -71,6 +74,6 @@ func initLimitOptions(param vo.ConfigParam, dest string, nacosClient nacos.Clien } } - nacosClient.RegisterConfigCallback(param, onChangeCallback) + nacosClient.RegisterConfigCallback(param, onChangeCallback, uniqueID) return opt }