Skip to content

Commit

Permalink
Merge pull request #985 from jack15083/fix_zk_reconnect_issue
Browse files Browse the repository at this point in the history
Fix: consumer invoker cache set nil after the ZK connection is lost
  • Loading branch information
AlexStocks authored Feb 3, 2021
2 parents 0abac1d + ab56cd8 commit 6a63a99
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 37 deletions.
5 changes: 5 additions & 0 deletions config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ func (c *ReferenceConfig) GenericLoad(id string) {
c.Implement(genericService)
}

// GetInvoker get invoker from ReferenceConfig
func (c *ReferenceConfig) GetInvoker() protocol.Invoker {
return c.invoker
}

func publishConsumerDefinition(url *common.URL) {
if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil {
remoteMetadataService.PublishServiceDefinition(url)
Expand Down
27 changes: 13 additions & 14 deletions registry/zookeeper/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,23 +223,22 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {

// try to register the node
zkPath, err = r.client.RegisterTemp(root, node)
if err != nil {
logger.Errorf("Register temp node(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
if perrors.Cause(err) == zk.ErrNodeExists {
// should delete the old node
logger.Info("Register temp node failed, try to delete the old and recreate (root{%s}, node{%s}) , ignore!", root, node)
if err = r.client.Delete(zkPath); err == nil {
_, err = r.client.RegisterTemp(root, node)
}
if err != nil {
logger.Errorf("Recreate the temp node failed, (root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
}
if err == nil {
return nil
}

if perrors.Cause(err) == zk.ErrNodeExists {
if err = r.client.Delete(zkPath); err == nil {
_, err = r.client.RegisterTemp(root, node)
}

if err == nil {
return nil
}
return perrors.WithMessagef(err, "RegisterTempNode(root{%s}, node{%s})", root, node)
}
logger.Debugf("Create a zookeeper node:%s", zkPath)

return nil
logger.Errorf("Register temp node(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
return perrors.WithMessagef(err, "RegisterTempNode(root{%s}, node{%s})", root, node)
}

func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListener, error) {
Expand Down
9 changes: 6 additions & 3 deletions remoting/zookeeper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
// ConnDelay connection delay interval
ConnDelay = 3
// MaxFailTimes max fail times
MaxFailTimes = 15
MaxFailTimes = 3
)

var (
Expand Down Expand Up @@ -259,14 +259,14 @@ func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
switch (int)(event.State) {
case (int)(zk.StateDisconnected):
logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.ZkAddrs, z.name)
z.stop()
z.Lock()
conn := z.Conn
z.Conn = nil
z.Unlock()
if conn != nil {
conn.Close()
}
z.stop()
return
case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged):
logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path)
Expand Down Expand Up @@ -555,7 +555,7 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event,
if err == zk.ErrNoNode {
return nil, nil, errNilNode
}
logger.Errorf("zk.ChildrenW(path{%s}) = error(%v)", path, err)
logger.Warnf("zk.ChildrenW(path{%s}) = error(%v)", path, err)
return nil, nil, perrors.WithMessagef(err, "zk.ChildrenW(path:%s)", path)
}
if stat == nil {
Expand Down Expand Up @@ -637,6 +637,9 @@ func (z *ZookeeperClient) SetContent(zkPath string, content []byte, version int3

// getConn gets zookeeper connection safely
func (z *ZookeeperClient) getConn() *zk.Conn {
if z == nil {
return nil
}
z.RLock()
defer z.RUnlock()
return z.Conn
Expand Down
1 change: 1 addition & 0 deletions remoting/zookeeper/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ LOOP:
break
}
failTimes++
logger.Warnf("ZK reconnect failed %d times", failTimes)
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
Expand Down
41 changes: 21 additions & 20 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remotin
go func(zkPath string, listener remoting.DataListener) {
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
delete(l.pathMap, zkPath)
l.pathMapLock.Unlock()
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
logger.Warnf("ListenServiceNodeEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
}

Expand All @@ -87,7 +90,7 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo

select {
case zkEvent = <-keyEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
logger.Warnf("get a zookeeper keyEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
switch zkEvent.Type {
case zk.EventNodeDataChanged:
Expand Down Expand Up @@ -146,6 +149,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
} else {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
}
return
}

// a node was added -- listen the new node
Expand All @@ -165,19 +169,21 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
newNode, perrors.WithStack(connErr))
}

if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
if !listener.DataChange(remoting.Event{Path: newNode, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
// listen l service node
l.wg.Add(1)
go func(node string, zkPath string, listener remoting.DataListener) {
logger.Infof("delete zkNode{%s}", node)
go func(node string, listener remoting.DataListener) {
if l.listenServiceNodeEvent(node, listener) {
logger.Infof("delete content{%s}", node)
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
logger.Warnf("delete zkNode{%s}", node)
listener.DataChange(remoting.Event{Path: node, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
delete(l.pathMap, zkPath)
l.pathMapLock.Unlock()
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode, zkPath, listener)
logger.Warnf("handleZkNodeEvent->listenSelf(zk path{%s}) goroutine exit now", node)
}(newNode, listener)
}

// old node was deleted
Expand All @@ -188,12 +194,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
}

oldNode = path.Join(zkPath, n)
logger.Warnf("delete zkPath{%s}", oldNode)

if err != nil {
logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
continue
}
logger.Warnf("delete oldNode{%s}", oldNode)
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}
Expand Down Expand Up @@ -304,10 +305,10 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
if l.listenServiceNodeEvent(zkPath) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
defer l.pathMapLock.Unlock()
delete(l.pathMap, zkPath)
l.pathMapLock.Unlock()
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)

// listen sub path recursive
Expand All @@ -329,7 +330,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
case <-ticker.C:
l.handleZkNodeEvent(zkPath, children, listener)
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
if zkEvent.Type != zk.EventNodeChildrenChanged {
Expand All @@ -338,7 +339,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
l.handleZkNodeEvent(zkEvent.Path, children, listener)
break WATCH
case <-l.client.Done():
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
logger.Warnf("watch client.done(), listen(path{%s}) goroutine exit now...", zkPath)
ticker.Stop()
return
}
Expand All @@ -360,7 +361,7 @@ func (l *ZkEventListener) ListenServiceEvent(conf *common.URL, zkPath string, li
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(conf, zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
logger.Warnf("ListenServiceEvent->listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
}

Expand Down

0 comments on commit 6a63a99

Please sign in to comment.