Skip to content

Commit

Permalink
feat(nacos): support reuse nacos client
Browse files Browse the repository at this point in the history
  • Loading branch information
whalecold committed Nov 1, 2023
1 parent eef5d99 commit 4495421
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 28 deletions.
10 changes: 6 additions & 4 deletions client/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ func WithCircuitBreaker(dest, src string, nacosClient nacos.Client, opts utils.O
f(&param)
}

cbSuite := initCircuitBreaker(param, dest, src, nacosClient)
uniqueID := nacos.GetUniqueID()

cbSuite := initCircuitBreaker(param, dest, src, nacosClient, uniqueID)

return []client.Option{
client.WithCircuitBreaker(cbSuite),
Expand All @@ -52,7 +54,7 @@ func WithCircuitBreaker(dest, src string, nacosClient nacos.Client, opts utils.O
return err
}
// cancel the configuration listener when client is closed.
return nacosClient.DeregisterConfig(param)
return nacosClient.DeregisterConfig(param, uniqueID)
}),
}
}
Expand All @@ -77,7 +79,7 @@ func genServiceCBKey(toService, method string) string {
}

func initCircuitBreaker(param vo.ConfigParam, dest, src string,
nacosClient nacos.Client,
nacosClient nacos.Client, uniqueID int64,
) *circuitbreak.CBSuite {
cb := circuitbreak.NewCBSuite(genServiceCBKeyWithRPCInfo)
lcb := utils.ThreadSafeSet{}
Expand All @@ -104,7 +106,7 @@ func initCircuitBreaker(param vo.ConfigParam, dest, src string,
}
}

nacosClient.RegisterConfigCallback(param, onChangeCallback)
nacosClient.RegisterConfigCallback(param, onChangeCallback, uniqueID)

return cb
}
10 changes: 6 additions & 4 deletions client/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,21 @@ func WithRetryPolicy(dest, src string, nacosClient nacos.Client, opts utils.Opti
f(&param)
}

rc := initRetryContainer(param, dest, nacosClient)
uniqueID := nacos.GetUniqueID()

rc := initRetryContainer(param, dest, nacosClient, uniqueID)
return []client.Option{
client.WithRetryContainer(rc),
client.WithCloseCallbacks(rc.Close),
client.WithCloseCallbacks(func() error {
// cancel the configuration listener when client is closed.
return nacosClient.DeregisterConfig(param)
return nacosClient.DeregisterConfig(param, uniqueID)
}),
}
}

func initRetryContainer(param vo.ConfigParam, dest string,
nacosClient nacos.Client,
nacosClient nacos.Client, uniqueID int64,
) *retry.Container {
retryContainer := retry.NewRetryContainerWithPercentageLimit()

Expand Down Expand Up @@ -87,7 +89,7 @@ func initRetryContainer(param vo.ConfigParam, dest string,
}
}

nacosClient.RegisterConfigCallback(param, onChangeCallback)
nacosClient.RegisterConfigCallback(param, onChangeCallback, uniqueID)

return retryContainer
}
10 changes: 6 additions & 4 deletions client/rpc_timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,19 @@ func WithRPCTimeout(dest, src string, nacosClient nacos.Client, opts utils.Optio
f(&param)
}

uniqueID := nacos.GetUniqueID()

return []client.Option{
client.WithTimeoutProvider(initRPCTimeoutContainer(param, dest, nacosClient)),
client.WithTimeoutProvider(initRPCTimeoutContainer(param, dest, nacosClient, uniqueID)),
client.WithCloseCallbacks(func() error {
// cancel the configuration listener when client is closed.
return nacosClient.DeregisterConfig(param)
return nacosClient.DeregisterConfig(param, uniqueID)
}),
}
}

func initRPCTimeoutContainer(param vo.ConfigParam, dest string,
nacosClient nacos.Client,
nacosClient nacos.Client, uniqueID int64,
) rpcinfo.TimeoutProvider {
rpcTimeoutContainer := rpctimeout.NewContainer()

Expand All @@ -63,7 +65,7 @@ func initRPCTimeoutContainer(param vo.ConfigParam, dest string,
rpcTimeoutContainer.NotifyPolicyChange(configs)
}

nacosClient.RegisterConfigCallback(param, onChangeCallback)
nacosClient.RegisterConfigCallback(param, onChangeCallback, uniqueID)

return rpcTimeoutContainer
}
2 changes: 1 addition & 1 deletion example/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/cloudwego/kitex v0.7.2
github.com/cloudwego/kitex-examples v0.2.0
github.com/kitex-contrib/config-nacos v0.1.1
github.com/nacos-group/nacos-sdk-go v1.1.4
)

require (
Expand Down Expand Up @@ -37,7 +38,6 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/gls v0.0.0-20220109145502-612d0167dce5 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nacos-group/nacos-sdk-go v1.1.4 // indirect
github.com/oleiade/lane v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
99 changes: 88 additions & 11 deletions nacos/nacos.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package nacos

import (
"bytes"
"sync"
"text/template"

"github.com/cloudwego/kitex/pkg/klog"
Expand All @@ -26,13 +27,30 @@ import (
"github.com/nacos-group/nacos-sdk-go/vo"
)

// callbackHandler ...
type callbackHandler func(namespace, group, dataId, data string)

type configParam struct {
DataID string
Group string
}

// NOTE: the nacos client use namespace + dataID + group as cache key, and the namespace
// in client is fixed.
func configParamKey(in vo.ConfigParam) configParam {
return configParam{
DataID: in.DataId,
Group: in.Group,
}
}

// Client the wrapper of nacos client.
type Client interface {
SetParser(ConfigParser)
ClientConfigParam(cpc *ConfigParamConfig) (vo.ConfigParam, error)
ServerConfigParam(cpc *ConfigParamConfig) (vo.ConfigParam, error)
RegisterConfigCallback(vo.ConfigParam, func(string, ConfigParser))
DeregisterConfig(vo.ConfigParam) error
RegisterConfigCallback(vo.ConfigParam, func(string, ConfigParser), int64)
DeregisterConfig(vo.ConfigParam, int64) error
}

type client struct {
Expand All @@ -42,6 +60,9 @@ type client struct {
groupTemplate *template.Template
serverDataIDTemplate *template.Template
clientDataIDTemplate *template.Template

handlerMutex sync.RWMutex
handlers map[configParam]map[int64]callbackHandler
}

// Options nacos config options. All the fields have default value.
Expand Down Expand Up @@ -121,6 +142,7 @@ func NewClient(opts Options) (Client, error) {
groupTemplate: groupTemplate,
serverDataIDTemplate: serverDataIDTemplate,
clientDataIDTemplate: clientDataIDTemplate,
handlers: map[configParam]map[int64]callbackHandler{},
}
return c, nil
}
Expand Down Expand Up @@ -174,19 +196,77 @@ func (c *client) configParam(cpc *ConfigParamConfig, t *template.Template) (vo.C
}

// DeregisterConfig deregister the config.
func (c *client) DeregisterConfig(cfg vo.ConfigParam) error {
return c.ncli.CancelListenConfig(cfg)
func (c *client) DeregisterConfig(cfg vo.ConfigParam, uniqueID int64) error {
key := configParamKey(cfg)
klog.Debugf("deregister key %v for uniqueID %d", key, uniqueID)
c.handlerMutex.Lock()
defer c.handlerMutex.Unlock()
handlers, ok := c.handlers[key]
if ok {
delete(handlers, uniqueID)
}
if len(handlers) == 0 {
klog.Debugf("the handlers for key %v is empty, cancel listen config from nacos", key)
return c.ncli.CancelListenConfig(cfg)
}
return nil
}

func (c *client) onChange(namespace, group, dataId, data string) {
handlers := make([]callbackHandler, 0, 5)
c.handlerMutex.RLock()
key := configParam{
DataID: dataId,
Group: group,
}
for _, handler := range c.handlers[key] {
handlers = append(handlers, handler)
}
c.handlerMutex.RUnlock()

for _, handler := range handlers {
handler(namespace, group, dataId, data)
}
}

func (c *client) listenConfig(param vo.ConfigParam, uniqueID int64) {
key := configParamKey(param)
klog.Debugf("register key %v for uniqueID %d", key, uniqueID)
c.handlerMutex.Lock()
handlers, ok := c.handlers[key]
if !ok {
handlers = map[int64]callbackHandler{}
c.handlers[key] = handlers
}
handlers[uniqueID] = param.OnChange
c.handlerMutex.Unlock()

if !ok {
klog.Debugf("the first time %v register, listen config from nacos", key)
err := c.ncli.ListenConfig(vo.ConfigParam{
DataId: param.DataId,
Group: param.Group,
Content: param.Content,
DatumId: param.DatumId,
Type: param.Type,
OnChange: c.onChange,
})
if err != nil {
panic(err)
}
}
}

// RegisterConfigCallback register the callback function to nacos client.
func (c *client) RegisterConfigCallback(param vo.ConfigParam,
callback func(string, ConfigParser),
callback func(string, ConfigParser), uniqueID int64,
) {
param.OnChange = func(namespace, group, dataId, data string) {
klog.Debugf("[nacos] config %s updated, namespace %s group %s dataId %s data %s",
param.DataId, namespace, group, dataId, data)
klog.Debugf("[nacos] uniqueID %d config %s updated, namespace %s group %s dataId %s data %s",
uniqueID, param.DataId, namespace, group, dataId, data)
callback(data, c.parser)
}

data, err := c.ncli.GetConfig(param)
// the nacos client has handled the not exist error.
if err != nil {
Expand All @@ -195,8 +275,5 @@ func (c *client) RegisterConfigCallback(param vo.ConfigParam,

callback(data, c.parser)

err = c.ncli.ListenConfig(param)
if err != nil {
panic(err)
}
c.listenConfig(param, uniqueID)
}
Loading

0 comments on commit 4495421

Please sign in to comment.