Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: consumer invoker cache set nil after the ZK connection is lost #985

Merged
merged 20 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,15 @@ func (c *ReferenceConfig) GenericLoad(id string) {
c.Implement(genericService)
}

// GetInvoker get invoker from ReferenceConfig
func (c *ReferenceConfig) GetInvoker() protocol.Invoker {
return c.invoker
}

jack15083 marked this conversation as resolved.
Show resolved Hide resolved
// postProcessConfig asks registered ConfigPostProcessor to post-process the current ReferenceConfig.
func (c *ReferenceConfig) postProcessConfig(url *common.URL) {
for _, p := range extension.GetConfigPostProcessors() {
p.PostProcessReferenceConfig(url)
}

jack15083 marked this conversation as resolved.
Show resolved Hide resolved
}
5 changes: 3 additions & 2 deletions registry/zookeeper/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,15 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
if err != nil {
logger.Errorf("Register temp node(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
if perrors.Cause(err) == zk.ErrNodeExists {
return nil
// should delete the old node
logger.Info("Register temp node failed, try to delete the old and recreate (root{%s}, node{%s}) , ignore!", root, node)
/*logger.Info("Register temp node failed, try to delete the old and recreate (root{%s}, node{%s}) , ignore!", root, node)
if err = r.client.Delete(zkPath); err == nil {
_, err = r.client.RegisterTemp(root, node)
}
if err != nil {
logger.Errorf("Recreate the temp node failed, (root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
}
}*/
jack15083 marked this conversation as resolved.
Show resolved Hide resolved
}
return perrors.WithMessagef(err, "RegisterTempNode(root{%s}, node{%s})", root, node)
}
Expand Down
5 changes: 4 additions & 1 deletion remoting/zookeeper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,14 @@ func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
switch (int)(event.State) {
case (int)(zk.StateDisconnected):
logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.ZkAddrs, z.name)
z.stop()
z.Lock()
conn := z.Conn
z.Conn = nil
z.Unlock()
if conn != nil {
conn.Close()
}
z.stop()
jack15083 marked this conversation as resolved.
Show resolved Hide resolved
return
case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged):
logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path)
Expand Down Expand Up @@ -637,6 +637,9 @@ func (z *ZookeeperClient) SetContent(zkPath string, content []byte, version int3

// getConn gets zookeeper connection safely
func (z *ZookeeperClient) getConn() *zk.Conn {
if z == nil {
return nil
}
z.RLock()
defer z.RUnlock()
return z.Conn
Expand Down
16 changes: 8 additions & 8 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remotin
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
logger.Warnf("ListenServiceNodeEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
}

Expand All @@ -87,7 +87,7 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo

select {
case zkEvent = <-keyEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
logger.Warnf("get a zookeeper keyEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
switch zkEvent.Type {
case zk.EventNodeDataChanged:
Expand Down Expand Up @@ -174,7 +174,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
logger.Infof("delete content{%s}", node)
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
logger.Warnf("handleZkNodeEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
jack15083 marked this conversation as resolved.
Show resolved Hide resolved
}(newNode, zkPath, listener)
}

Expand All @@ -185,13 +185,13 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
continue
}

oldNode = path.Join(zkPath, n)
logger.Warnf("delete zkPath{%s}", oldNode)

if err != nil {
logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
continue
}

oldNode = path.Join(zkPath, n)
logger.Warnf("delete zkPath{%s}", oldNode)
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}
Expand Down Expand Up @@ -305,7 +305,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
defer l.pathMapLock.Unlock()
jack15083 marked this conversation as resolved.
Show resolved Hide resolved
delete(l.pathMap, zkPath)
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)

// listen sub path recursive
Expand All @@ -327,7 +327,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
case <-ticker.C:
l.handleZkNodeEvent(zkPath, children, listener)
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
if zkEvent.Type != zk.EventNodeChildrenChanged {
Expand Down