Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor zk dynamic configuration listener #1665

Merged
merged 1 commit into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions cluster/router/v3router/router_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/remoting"
)

// RouterChain contains all uniform router logic
Expand All @@ -46,31 +47,36 @@ type RouterChain struct {

// nolint
func NewUniformRouterChain() (router.PriorityRouter, error) {
// 1. add mesh route listener
// 1. Add mesh route listener
r := &RouterChain{}
rootConfig := config.GetRootConfig()
dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration()
if dynamicConfiguration == nil {
logger.Infof("[Mesh Router] Config center does not start, please check if the configuration center has been properly configured in dubbogo.yml")
logger.Infof("[NewUniformRouterChain] Config center does not start, please check if the configuration center has been properly configured in dubbogo.yml")
return nil, nil
}
dynamicConfiguration.AddListener(rootConfig.Application.Name, r)

// 2. try to get mesh route configuration, default key is "dubbo.io.MESHAPPRULE" with group "dubbo"
// 2. Try to get mesh rules configuration, default key is "dubbo.io.MESHAPPRULE" with group "dubbo"
key := rootConfig.Application.Name + constant.MeshRouteSuffix
group := rootConfig.ConfigCenter.Group
if group == "" {
group = constant.Dubbo
}
dynamicConfiguration.AddListener(group+constant.PathSeparator+key, r)
meshRouteValue, err := dynamicConfiguration.GetProperties(key, config_center.WithGroup(rootConfig.ConfigCenter.Group))
if err != nil {
// the mesh route may not be initialized now
logger.Warnf("Can not get mesh route for key=%s, error=%v", key, err)
// The mesh rules may not be initialized now
logger.Warnf("[NewUniformRouterChain]Can not get mesh rules for group=%s, key=%s, error=%+v", rootConfig.ConfigCenter.Group, key, err)
return r, nil
}
logger.Debugf("Successfully get mesh route:%s", meshRouteValue)
logger.Debugf("[NewUniformRouterChain]Successfully get mesh rules:%s", meshRouteValue)
routes, err := parseRoute(meshRouteValue)
if err != nil {
logger.Warnf("Parse mesh route failed, error=%v", err)
logger.Warnf("[NewUniformRouterChain]Parse mesh rules failed, error=%+v", err)
return nil, err
}
r.routers = routes
logger.Infof("[NewUniformRouterChain]Successfully init mesh rules with:\n%s", meshRouteValue)
return r, nil
}

Expand All @@ -84,13 +90,19 @@ func (r *RouterChain) Route(invokers []protocol.Invoker, url *common.URL, invoca

// Process process route config change event
func (r *RouterChain) Process(event *config_center.ConfigChangeEvent) {
logger.Debugf("RouteChain process event:\n%+v", event)
logger.Infof("[RouteChain]Process config change event:%+v", event)
if event.ConfigType == remoting.EventTypeDel {
r.routers = nil
return
}
routers, err := parseRoute(event.Value.(string))
if err != nil {
logger.Warnf("[RouteChain]Parse new mesh route config error, %+v "+
"and we will use the original mesh rule configuration.", err)
return
}
r.routers = routers
// todo delete router
logger.Infof("[RouteChain]Parse Mesh Rule Success.")
}

// Name get name of ConnCheckerRouter
Expand All @@ -108,7 +120,7 @@ func (r *RouterChain) URL() *common.URL {
return nil
}

// parseFromConfigToRouters parse virtualService and destinationRule yaml file bytes to target router list
// Deprecated parseFromConfigToRouters parse virtualService and destinationRule yaml file bytes to target router list
func parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig []byte) ([]*UniformRouter, error) {
var virtualServiceConfigList []*config.VirtualServiceConfig
destRuleConfigsMap := make(map[string]map[string]map[string]string)
Expand Down
7 changes: 2 additions & 5 deletions common/constant/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,8 @@ const (
)

const (
DefaultWeight = 100 //
DefaultWarmup = 10 * 60 // in java here is 10*60*1000 because of System.currentTimeMillis() is measured in milliseconds & in go time.Unix() is second
)

const (
DefaultWeight = 100 //
DefaultWarmup = 10 * 60 // in java here is 10*60*1000 because of System.currentTimeMillis() is measured in milliseconds & in go time.Unix() is second
DefaultLoadBalance = "random"
DefaultRetries = "2"
DefaultRetriesInt = 2
Expand Down
1 change: 1 addition & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
PortKey = "port"
ProtocolKey = "protocol"
PathSeparator = "/"
DotSeparator = "."
CommaSeparator = ","
SslEnabledKey = "ssl-enabled"
// ParamsTypeKey key used in pass through invoker factory, to define param type
Expand Down
2 changes: 1 addition & 1 deletion config/consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (cc *ConsumerConfig) Init(rc *RootConfig) error {
// try to use interface name defined by pb
triplePBService, ok := reference.(common.TriplePBService)
if !ok {
logger.Errorf("Dubbogo cannot get interface name with reference = %s."+
logger.Errorf("Dubbo-go cannot get interface name with reference = %s."+
"Please run the command 'go install github.com/dubbogo/tools/cmd/protoc-gen-go-triple@latest' to get the latest "+
"protoc-gen-go-triple, and then re-generate your pb file again by this tool."+
"If you are not using pb serialization, please set 'interfaceName' field in reference config to let dubbogo get the interface name.", key)
Expand Down
29 changes: 22 additions & 7 deletions config_center/zookeeper/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package zookeeper
import (
"encoding/base64"
"strconv"
"strings"
"sync"
)

Expand Down Expand Up @@ -65,8 +66,9 @@ type zookeeperDynamicConfiguration struct {

func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfiguration, error) {
c := &zookeeperDynamicConfiguration{
url: url,
rootPath: "/" + url.GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup) + "/config",
url: url,
// TODO adapt config center config
rootPath: "/dubbo/config",
}
logger.Infof("[Zookeeper ConfigCenter] New Zookeeper ConfigCenter with Configuration: %+v, url = %+v", c, c.GetURL())
if v, ok := config.GetRootConfig().ConfigCenter.Params["base64"]; ok {
Expand All @@ -93,13 +95,26 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu

// Start listener
c.listener = zookeeper.NewZkEventListener(c.client)
c.cacheListener = NewCacheListener(c.rootPath)
c.listener.ListenServiceEvent(url, c.rootPath, c.cacheListener)
c.cacheListener = NewCacheListener(c.rootPath, c.listener)
c.listener.ListenConfigurationEvent(c.rootPath, c.cacheListener)
return c, nil
}

func (c *zookeeperDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) {
c.cacheListener.AddListener(key, listener)
// AddListener add listener for key
// TODO this method should has a parameter 'group', and it does not now, so we should concat group and key with '/' manually
func (c *zookeeperDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, options ...config_center.Option) {
qualifiedKey := buildPath(c.rootPath, key)
c.cacheListener.AddListener(qualifiedKey, listener)
}

// buildPath build path and format
func buildPath(rootPath, subPath string) string {
path := strings.TrimRight(rootPath+pathSeparator+subPath, pathSeparator)
if !strings.HasPrefix(path, pathSeparator) {
path = pathSeparator + path
}
path = strings.ReplaceAll(path, "//", "/")
return path
}

func (c *zookeeperDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) {
Expand All @@ -118,7 +133,7 @@ func (c *zookeeperDynamicConfiguration) GetProperties(key string, opts ...config
if len(tmpOpts.Group) != 0 {
key = tmpOpts.Group + "/" + key
} else {
key = c.GetURL().GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup)
key = c.GetURL().GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup) + "/" + key
}
content, _, err := c.client.GetContent(c.rootPath + "/" + key)
if err != nil {
Expand Down
56 changes: 27 additions & 29 deletions config_center/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,33 @@ import (

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/remoting"
"dubbo.apache.org/dubbo-go/v3/remoting/zookeeper"
)

// CacheListener defines keyListeners and rootPath
type CacheListener struct {
keyListeners sync.Map
rootPath string
// key is zkNode Path and value is set of listeners
keyListeners sync.Map
zkEventListener *zookeeper.ZkEventListener
rootPath string
}

// NewCacheListener creates a new CacheListener
func NewCacheListener(rootPath string) *CacheListener {
return &CacheListener{rootPath: rootPath}
func NewCacheListener(rootPath string, listener *zookeeper.ZkEventListener) *CacheListener {
return &CacheListener{zkEventListener: listener, rootPath: rootPath}
}

// AddListener will add a listener if loaded
func (l *CacheListener) AddListener(key string, listener config_center.ConfigurationListener) {
// FIXME do not use Client.ExistW, cause it has a bug(can not watch zk node that do not exist)
_, _, _, err := l.zkEventListener.Client.Conn.ExistsW(key)
// reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure
// make a map[your type]struct{} like set in java
if err != nil {
return
}
listeners, loaded := l.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{listener: {}})
if loaded {
listeners.(map[config_center.ConfigurationListener]struct{})[listener] = struct{}{}
Expand All @@ -62,36 +68,28 @@ func (l *CacheListener) RemoveListener(key string, listener config_center.Config

// DataChange changes all listeners' event
func (l *CacheListener) DataChange(event remoting.Event) bool {
changeType := event.Action
if event.Content == "" {
// meanings new node
return true
}
var key string
// TODO use common way
if strings.HasSuffix(event.Path, constant.MeshRouteSuffix) {
key = config.GetRootConfig().Application.Name
} else {
key = l.pathToKey(event.Path)
changeType = remoting.EventTypeDel
}
if key != "" {
if listeners, ok := l.keyListeners.Load(key); ok {
for listener := range listeners.(map[config_center.ConfigurationListener]struct{}) {
listener.Process(&config_center.ConfigChangeEvent{Key: key, Value: event.Content, ConfigType: event.Action})
}
return true

if listeners, ok := l.keyListeners.Load(event.Path); ok {
for listener := range listeners.(map[config_center.ConfigurationListener]struct{}) {
listener.Process(&config_center.ConfigChangeEvent{
Key: l.pathToKey(event.Path),
Value: event.Content,
ConfigType: changeType,
})
}
return true
}
return false
}

func (l *CacheListener) pathToKey(path string) string {
key := strings.Replace(strings.Replace(path, l.rootPath+"/", "", -1), "/", ".", -1)
if strings.HasSuffix(key, constant.ConfiguratorSuffix) ||
strings.HasSuffix(key, constant.TagRouterRuleSuffix) ||
strings.HasSuffix(key, constant.ConditionRouterRuleSuffix) {
// governance config, so we remove the "dubbo." prefix
key = key[strings.Index(key, ".")+1:]
if len(path) == 0 {
return path
}
logger.Debugf("pathToKey path:%s, key:%s\n", path, key)
return key
groupKey := strings.Replace(strings.Replace(path, l.rootPath+constant.PathSeparator, "", -1), constant.PathSeparator, constant.DotSeparator, -1)
return groupKey[strings.Index(groupKey, constant.DotSeparator)+1:]
}
2 changes: 1 addition & 1 deletion remoting/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

// DataListener defines common data listener interface
type DataListener interface {
DataChange(eventType Event) bool // bool is return for interface implement is interesting
DataChange(event Event) bool // bool is return for interface implement is interesting
}

//////////////////////////////////////////
Expand Down
2 changes: 1 addition & 1 deletion remoting/zookeeper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func ValidateZookeeperClient(container ZkClientFacade, zkName string) error {
timeout := url.GetParamDuration(constant.ConfigTimeoutKey, constant.DefaultRegTimeout)

zkAddresses := strings.Split(url.Location, ",")
logger.Infof("[Zookeeper Client] New zookeeper client with name = %s, zkAddress = %s, timeout = %d", zkName, url.Location, timeout.String())
logger.Infof("[Zookeeper Client] New zookeeper client with name = %s, zkAddress = %s, timeout = %s", zkName, url.Location, timeout.String())
newClient, cltErr := gxzookeeper.NewZookeeperClient(zkName, zkAddresses, true, gxzookeeper.WithZkTimeOut(timeout))
if cltErr != nil {
logger.Warnf("newZookeeperClient(name{%s}, zk address{%v}, timeout{%d}) = error{%v}",
Expand Down
Loading