Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: zk lost event #692

Merged
merged 9 commits into from
Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ Finished List:
* [Nacos](https://github.com/apache/dubbo-go/pull/522)
* [Zookeeper](https://github.com/apache/dubbo-go/pull/633)
* [Etcd](https://github.com/apache/dubbo-go/blob/9a5990d9a9c3d5e6633c0d7d926c156416bcb931/metadata/report/etcd/report.go)
* [Consul](https://github.com/apache/dubbo-go/pull/633)

- Service discovery
* [Nacos](https://github.com/apache/dubbo-go/blob/9a5990d9a9c3d5e6633c0d7d926c156416bcb931/registry/nacos/service_discovery.go)
Expand Down
3 changes: 2 additions & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ Apache License, Version 2.0
* [Nacos](https://github.com/apache/dubbo-go/pull/522)
* [Zookeeper](https://github.com/apache/dubbo-go/pull/633)
* [Etcd](https://github.com/apache/dubbo-go/blob/9a5990d9a9c3d5e6633c0d7d926c156416bcb931/metadata/report/etcd/report.go)
* [Consul](https://github.com/apache/dubbo-go/pull/633)

- 服务发现
* [Nacos](https://github.com/apache/dubbo-go/blob/9a5990d9a9c3d5e6633c0d7d926c156416bcb931/registry/nacos/service_discovery.go)
Expand Down Expand Up @@ -179,7 +180,7 @@ go test ./... -coverprofile=coverage.txt -covermode=atomic

## [User List](https://github.com/apache/dubbo-go/issues/2)

若你正在使用 [apache/dubbo-go](github.com/apache/dubbo-go) 且认为其有用或者向对其做改进,请忝列贵司信息于 [用户列表](https://github.com/apache/dubbo-go/issues/2),以便我们知晓之
若你正在使用 [apache/dubbo-go](github.com/apache/dubbo-go) 且认为其有用或者想对其做改进,请添列贵司信息于 [用户列表](https://github.com/apache/dubbo-go/issues/2),以便我们知晓

<div>
<table>
Expand Down
1 change: 1 addition & 0 deletions common/constant/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
DEFAULT_RETRIES_INT = 2
DEFAULT_PROTOCOL = "dubbo"
DEFAULT_REG_TIMEOUT = "10s"
DEFAULT_REG_TTL = "15m"
DEFAULT_CLUSTER = "failover"
DEFAULT_FAILBACK_TIMES = "3"
DEFAULT_FAILBACK_TIMES_INT = 3
Expand Down
1 change: 1 addition & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ const (
ROLE_KEY = "registry.role"
REGISTRY_DEFAULT_KEY = "registry.default"
REGISTRY_TIMEOUT_KEY = "registry.timeout"
REGISTRY_TTL_KEY = "registry.ttl"
)

const (
Expand Down
2 changes: 2 additions & 0 deletions config/registry_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type RegistryConfig struct {
// I changed "type" to "protocol" ,the same as "protocol" field in java class RegistryConfig
TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second
Group string `yaml:"group" json:"group,omitempty" property:"group"`
TTL string `yaml:"ttl" default:"10m" json:"ttl,omitempty" property:"ttl"` // unit: minute
// for registry
Address string `yaml:"address" json:"address,omitempty" property:"address"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Expand Down Expand Up @@ -118,6 +119,7 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values {
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType)))
urlMap.Set(constant.REGISTRY_KEY, c.Protocol)
urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, c.TimeoutStr)
urlMap.Set(constant.REGISTRY_TTL_KEY, c.TTL)
for k, v := range c.Params {
urlMap.Set(k, v)
}
Expand Down
2 changes: 2 additions & 0 deletions registry/zookeeper/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen

var zkListener *RegistryConfigurationListener
dataListener := r.dataListener
ttl := r.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL)
conf.SetParam(constant.REGISTRY_TTL_KEY, ttl)
dataListener.mutex.Lock()
defer dataListener.mutex.Unlock()
if r.dataListener.subscribed[conf.ServiceKey()] != nil {
Expand Down
45 changes: 35 additions & 10 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ import (
"github.com/apache/dubbo-go/remoting"
)

var (
DefaultTTL = 15 * time.Minute
zouyx marked this conversation as resolved.
Show resolved Hide resolved
)

// nolint
type ZkEventListener struct {
client *ZookeeperClient
Expand Down Expand Up @@ -197,10 +201,20 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen

var (
failTimes int
ttl time.Duration
event chan struct{}
zkEvent zk.Event
)
event = make(chan struct{}, 4)
ttl = DefaultTTL
if conf != nil {
timeout, err := time.ParseDuration(conf.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL))
if err == nil {
ttl = timeout
} else {
logger.Warnf("wrong configuration for registry ttl, error:=%+v", err)
wenxuwan marked this conversation as resolved.
Show resolved Hide resolved
}
}
defer close(event)
for {
// get current children for a zkPath
Expand Down Expand Up @@ -302,18 +316,29 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
}(dubboPath, listener)
}
}
select {
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
if zkEvent.Type != zk.EventNodeChildrenChanged {
continue
// Periodically update provider information
ticker := time.NewTicker(ttl)
WATCH:
for {
select {
case <-ticker.C:
l.handleZkNodeEvent(zkEvent.Path, children, listener)
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
if zkEvent.Type != zk.EventNodeChildrenChanged {
break WATCH
}
l.handleZkNodeEvent(zkEvent.Path, children, listener)
break WATCH
case <-l.client.Done():
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
ticker.Stop()
return
}
l.handleZkNodeEvent(zkEvent.Path, children, listener)
case <-l.client.Done():
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
return
}

}
}

Expand Down