Skip to content

Commit

Permalink
Merge pull request #692 from wenxuwan/master
Browse files Browse the repository at this point in the history
Fix: zk lost event
  • Loading branch information
AlexStocks committed Aug 6, 2020
2 parents 269714a + 9b4745f commit c4fbd60
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 10 deletions.
1 change: 1 addition & 0 deletions common/constant/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
DEFAULT_RETRIES_INT = 2
DEFAULT_PROTOCOL = "dubbo"
DEFAULT_REG_TIMEOUT = "10s"
DEFAULT_REG_TTL = "15m"
DEFAULT_CLUSTER = "failover"
DEFAULT_FAILBACK_TIMES = "3"
DEFAULT_FAILBACK_TIMES_INT = 3
Expand Down
1 change: 1 addition & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ const (
ROLE_KEY = "registry.role"
REGISTRY_DEFAULT_KEY = "registry.default"
REGISTRY_TIMEOUT_KEY = "registry.timeout"
REGISTRY_TTL_KEY = "registry.ttl"
)

const (
Expand Down
2 changes: 2 additions & 0 deletions config/registry_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type RegistryConfig struct {
// I changed "type" to "protocol" ,the same as "protocol" field in java class RegistryConfig
TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second
Group string `yaml:"group" json:"group,omitempty" property:"group"`
TTL string `yaml:"ttl" default:"10m" json:"ttl,omitempty" property:"ttl"` // unit: minute
// for registry
Address string `yaml:"address" json:"address,omitempty" property:"address"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Expand Down Expand Up @@ -118,6 +119,7 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values {
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType)))
urlMap.Set(constant.REGISTRY_KEY, c.Protocol)
urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, c.TimeoutStr)
urlMap.Set(constant.REGISTRY_TTL_KEY, c.TTL)
for k, v := range c.Params {
urlMap.Set(k, v)
}
Expand Down
2 changes: 2 additions & 0 deletions registry/zookeeper/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen

var zkListener *RegistryConfigurationListener
dataListener := r.dataListener
ttl := r.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL)
conf.SetParam(constant.REGISTRY_TTL_KEY, ttl)
dataListener.mutex.Lock()
defer dataListener.mutex.Unlock()
if r.dataListener.subscribed[conf.ServiceKey()] != nil {
Expand Down
45 changes: 35 additions & 10 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ import (
"github.com/apache/dubbo-go/remoting"
)

var (
defaultTTL = 15 * time.Minute
)

// nolint
type ZkEventListener struct {
client *ZookeeperClient
Expand Down Expand Up @@ -197,10 +201,20 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen

var (
failTimes int
ttl time.Duration
event chan struct{}
zkEvent zk.Event
)
event = make(chan struct{}, 4)
ttl = defaultTTL
if conf != nil {
timeout, err := time.ParseDuration(conf.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL))
if err == nil {
ttl = timeout
} else {
logger.Warnf("wrong configuration for registry ttl, error:=%+v, using default value %v instead", err, defaultTTL)
}
}
defer close(event)
for {
// get current children for a zkPath
Expand Down Expand Up @@ -302,18 +316,29 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
}(dubboPath, listener)
}
}
select {
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
if zkEvent.Type != zk.EventNodeChildrenChanged {
continue
// Periodically update provider information
ticker := time.NewTicker(ttl)
WATCH:
for {
select {
case <-ticker.C:
l.handleZkNodeEvent(zkEvent.Path, children, listener)
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
if zkEvent.Type != zk.EventNodeChildrenChanged {
break WATCH
}
l.handleZkNodeEvent(zkEvent.Path, children, listener)
break WATCH
case <-l.client.Done():
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
ticker.Stop()
return
}
l.handleZkNodeEvent(zkEvent.Path, children, listener)
case <-l.client.Done():
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
return
}

}
}

Expand Down

0 comments on commit c4fbd60

Please sign in to comment.