Skip to content

Commit

Permalink
sopport customise decode function.
Browse files Browse the repository at this point in the history
  • Loading branch information
whalecold committed Jun 30, 2023
1 parent 4a73b75 commit f604a76
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 46 deletions.
11 changes: 6 additions & 5 deletions client/client_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@ package client

import (
"github.com/cloudwego/kitex/client"
"github.com/nacos-group/nacos-sdk-go/clients/config_client"

"github.com/kitex-contrib/config-nacos/nacos"
)

// NacosClientSuite nacos client config suite, configure retry timeout limit and circuitbreak dynamically from nacos.
type NacosClientSuite struct {
nacosClient config_client.IConfigClient
nacosClient nacos.Client
service string
fns []nacos.CustomFunction
}

// NewSuite ...
func NewSuite(service string, cli config_client.IConfigClient, cfs ...nacos.CustomFunction) *NacosClientSuite {
func NewSuite(service string, cli nacos.Client,
cfs ...nacos.CustomFunction,
) *NacosClientSuite {
return &NacosClientSuite{
service: service,
nacosClient: cli,
Expand All @@ -39,7 +40,7 @@ func NewSuite(service string, cli config_client.IConfigClient, cfs ...nacos.Cust

// Options return a list client.Option
func (s *NacosClientSuite) Options() []client.Option {
opts := make([]client.Option, 0, 5)
opts = append(opts, WithRetryPolicy(s.service, s.nacosClient)...)
opts := make([]client.Option, 0, 8)
opts = append(opts, WithRetryPolicy(s.service, s.nacosClient, s.fns...)...)
return opts
}
27 changes: 15 additions & 12 deletions client/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package client

import (
"github.com/kitex-contrib/config-nacos/nacos"
"github.com/nacos-group/nacos-sdk-go/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/vo"

"github.com/cloudwego/kitex/client"
Expand All @@ -29,7 +28,9 @@ const (
)

// WithRetryPolicy sets the retry policy from nacos config center.
func WithRetryPolicy(dest string, nacosClient config_client.IConfigClient, cfs ...nacos.CustomFunction) []client.Option {
func WithRetryPolicy(dest string, nacosClient nacos.Client,
cfs ...nacos.CustomFunction,
) []client.Option {
param := nacos.NaocsConfigParam(&nacos.ConfigParamConfig{
Category: retryConfigName,
ClientServiceName: dest,
Expand All @@ -39,41 +40,43 @@ func WithRetryPolicy(dest string, nacosClient config_client.IConfigClient, cfs .
client.WithRetryContainer(initRetryContainer(param, dest, nacosClient)),
client.WithCloseCallbacks(func() error {
// cancel the config listener when client is closed.
return nacosClient.CancelListenConfig(param)
return nacosClient.DeregisterConfig(param)
}),
}
}

type retryConfigs map[string]*retry.Policy

func initRetryContainer(param vo.ConfigParam, dest string, nacosClient config_client.IConfigClient) *retry.Container {
func initRetryContainer(param vo.ConfigParam, dest string,
nacosClient nacos.Client,
) *retry.Container {
retryContainer := retry.NewRetryContainer()

onChangeCallback := func(data string) {
onChangeCallback := func(data string, parser nacos.ConfigParser) {
rcs := retryConfigs{}
err := nacos.Unmarshal(param.Type, data, rcs)
err := parser.Decode(param.Type, data, rcs)
if err != nil {
klog.Warnf("[nacos] %s client nacos retry: unmarshal data %s failed: %s, skip...", dest, data, err)
return
}
for method, pilocy := range rcs {
if pilocy.BackupPolicy != nil && pilocy.FailurePolicy != nil {

for method, policy := range rcs {
if policy.BackupPolicy != nil && policy.FailurePolicy != nil {
klog.Warnf("[nacos] %s client policy for method %s BackupPolicy and FailurePolicy must not be set at same time",
dest, method)
continue
}
if pilocy.BackupPolicy == nil && pilocy.FailurePolicy == nil {
if policy.BackupPolicy == nil && policy.FailurePolicy == nil {
klog.Warnf("[nacos] %s client policy for method %s BackupPolicy and FailurePolicy must not be empty at same time",
dest, method)
continue
}
retryContainer.NotifyPolicyChange(method, *pilocy)
retryContainer.NotifyPolicyChange(method, *policy)
}
}

nacos.RegistryConfigUpdateCallback(dest,
nacosClient.RegisterConfigCallback(dest,
retryConfigName,
nacosClient,
param,
onChangeCallback,
)
Expand Down
8 changes: 4 additions & 4 deletions nacos/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ func render(name, format string, cpc *ConfigParamConfig) string {
return tpl.String()
}

// NaocsConfigParam Get nacos config group from environment variables
// NaocsConfigParam Get nacos config from environment variables. All the parameters can be customised with CustomFunction.
// ConfigParam explain:
// 1. Type: data format, only support json and yaml, JSON by default. customize it use CustomFunction.
// 2. Context: empty by default, customize it use CustomFunction.
// 3. Group: DEFAULT_GROUP by default. Customize it by CustomFunction or use specified format. ref: nacos/env.go:46
// 1. Type: data format, support JSON YMAL, JSON by default. Could extend it by implementing the ConfigParser interface.
// 2. Context: empty by default it use CustomFunction.
// 3. Group: DEFAULT_GROUP by default.
// 4. DataId: {{.ClientServiceName}}.{{.ServerServiceName}}.{{.Category}} by default. Customize it by CustomFunction or
// use specified format. ref: nacos/env.go:46
func NaocsConfigParam(cpc *ConfigParamConfig, cfs ...CustomFunction) vo.ConfigParam {
Expand Down
66 changes: 41 additions & 25 deletions nacos/nacos.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,30 @@
package nacos

import (
"fmt"

"github.com/cloudwego/kitex/pkg/klog"
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
"sigs.k8s.io/yaml"
)

// NewDefaultNacosClient Create a default Nacos client
// Client the wrapper of nacos client.
type Client interface {
SetParser(ConfigParser)
RegisterConfigCallback(string, string, vo.ConfigParam, func(string, ConfigParser))
DeregisterConfig(vo.ConfigParam) error
}

type client struct {
ncli config_client.IConfigClient
// support customise parser
parser ConfigParser
}

// DefaultClient Create a default Nacos client
// It can create a client with default config by env variable.
// See: env.go
func NewDefaultNacosClient() (config_client.IConfigClient, error) {
func DefaultClient() (Client, error) {
sc := []constant.ServerConfig{
*constant.NewServerConfig(NacosAddr(), uint64(NacosPort())),
}
Expand All @@ -38,45 +48,51 @@ func NewDefaultNacosClient() (config_client.IConfigClient, error) {
NotLoadCacheAtStart: true,
CustomLogger: NewCustomNacosLogger(),
}
return clients.NewConfigClient(
nacosClient, err := clients.NewConfigClient(
vo.NacosClientParam{
ClientConfig: &cc,
ServerConfigs: sc,
},
)
if err != nil {
return nil, err
}
c := &client{
ncli: nacosClient,
parser: defaultConfigParse(),
}
return c, nil
}

// RegistryConfigUpdateCallback registry the callback function to nacos client.
func RegistryConfigUpdateCallback(dest, category string,
nacosClient config_client.IConfigClient,
// SetParser support customise parser
func (c *client) SetParser(parser ConfigParser) {
c.parser = parser
}

// DeregisterConfig deregister the config.
func (c *client) DeregisterConfig(cfg vo.ConfigParam) error {
return c.ncli.CancelListenConfig(cfg)
}

// RegisterConfigCallback register the callback function to nacos client.
func (c *client) RegisterConfigCallback(dest, category string,
param vo.ConfigParam,
callback func(string),
callback func(string, ConfigParser),
) {
param.OnChange = func(namespace, group, dataId, data string) {
klog.Debugf("[nacos] %s client %s config updated, namespace %s group %s dataId %s data %s",
dest, category, namespace, group, dataId, data)
callback(data)
callback(data, c.parser)
}
data, err := nacosClient.GetConfig(param)
data, err := c.ncli.GetConfig(param)
if err != nil && !IsNotExistError(err) {
panic(err)
}

callback(data)
callback(data, c.parser)

err = nacosClient.ListenConfig(param)
err = c.ncli.ListenConfig(param)
if err != nil {
panic(err)
}
}

// Unmarshal unmarshals the data to struct in specified format.
func Unmarshal(kind vo.ConfigType, data string, config interface{}) error {
switch kind {
case vo.YAML, vo.JSON:
// support json and yaml since YAML is a superset of JSON, it can parse JSON using a YAML parser
return yaml.Unmarshal([]byte(data), config)
default:
return fmt.Errorf("unsupport config data type %s", kind)
}
}
47 changes: 47 additions & 0 deletions nacos/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nacos

import (
"fmt"

"github.com/nacos-group/nacos-sdk-go/vo"
"sigs.k8s.io/yaml"
)

var _ ConfigParser = &parser{}

// ConfigParser the parser for nacos config.
type ConfigParser interface {
Decode(kind vo.ConfigType, data string, config interface{}) error
}

type parser struct{}

// Decode decodes the data to struct in specified format.
func (p *parser) Decode(kind vo.ConfigType, data string, config interface{}) error {
switch kind {
case vo.YAML, vo.JSON:
// since YAML is a superset of JSON, it can parse JSON using a YAML parser
return yaml.Unmarshal([]byte(data), config)
default:
return fmt.Errorf("unsupported config data type %s", kind)
}
}

// DefaultConfigParse default nacos config parser.
func defaultConfigParse() ConfigParser {
return &parser{}
}

0 comments on commit f604a76

Please sign in to comment.