Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Imp: Zk client #601

Merged
merged 7 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 44 additions & 57 deletions remoting/zookeeper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,23 @@ const (
)

var (
errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil")
errNilZkClientConn = perrors.New("zookeeper client{conn} is nil")
errNilChildren = perrors.Errorf("has none children")
errNilNode = perrors.Errorf("node does not exist")
)

// ZookeeperClient ...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe u can add some comments for this struct or just using "nolint" instead.

type ZookeeperClient struct {
name string
ZkAddrs []string
sync.RWMutex // for conn
Conn *zk.Conn
Timeout time.Duration
exit chan struct{}
Wait sync.WaitGroup
eventRegistry map[string][]*chan struct{}
name string
ZkAddrs []string
sync.RWMutex // for conn
Conn *zk.Conn
Timeout time.Duration
exit chan struct{}
Wait sync.WaitGroup

eventRegistry map[string][]*chan struct{}
eventRegistryLock sync.RWMutex
}

// StateToString ...
Expand Down Expand Up @@ -114,12 +116,11 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
var (
err error
)
opions := &Options{}
options := &Options{}
for _, opt := range opts {
opt(opions)
opt(options)
}
connected := false
err = nil

lock := container.ZkClientLock()
url := container.GetUrl()
Expand All @@ -128,18 +129,18 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
defer lock.Unlock()

if container.ZkClient() == nil {
//in dubbo ,every registry only connect one node ,so this is []string{r.Address}
// in dubbo, every registry only connect one node, so this is []string{r.Address}
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
logger.Errorf("timeout config %v is invalid ,err is %v",
url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location)
}
zkAddresses := strings.Split(url.Location, ",")
newClient, err := newZookeeperClient(opions.zkName, zkAddresses, timeout)
newClient, err := newZookeeperClient(options.zkName, zkAddresses, timeout)
if err != nil {
logger.Warnf("newZookeeperClient(name{%s}, zk address{%v}, timeout{%d}) = error{%v}",
opions.zkName, url.Location, timeout.String(), err)
options.zkName, url.Location, timeout.String(), err)
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location)
}
container.SetZkClient(newClient)
Expand All @@ -157,8 +158,8 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
}

if connected {
logger.Info("Connect to zookeeper successfully, name{%s}, zk address{%v}", opions.zkName, url.Location)
container.WaitGroup().Add(1) //zk client start successful, then registry wg +1
logger.Info("Connect to zookeeper successfully, name{%s}, zk address{%v}", options.zkName, url.Location)
container.WaitGroup().Add(1) // zk client start successful, then registry wg +1
}

return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL)
Expand Down Expand Up @@ -214,31 +215,25 @@ func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option)
eventRegistry: make(map[string][]*chan struct{}),
}

opions := &Options{}
options := &Options{}
for _, opt := range opts {
opt(opions)
opt(options)
}

// connect to zookeeper
if opions.ts != nil {
ts = opions.ts
if options.ts != nil {
ts = options.ts
} else {
ts, err = zk.StartTestCluster(1, nil, nil)
if err != nil {
return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
}
}

//callbackChan := make(chan zk.Event)
//f := func(event zk.Event) {
// callbackChan <- event
//}

z.Conn, event, err = ts.ConnectWithOptions(timeout)
if err != nil {
return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
}
//z.wait.Add(1)

return ts, z, event, nil
}
Expand All @@ -255,11 +250,10 @@ func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
logger.Infof("zk{path:%v, name:%s} connection goroutine game over.", z.ZkAddrs, z.name)
}()

LOOP:
for {
select {
case <-z.exit:
break LOOP
return
case event = <-session:
logger.Warnf("client{%s} get a zookeeper event{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
z.name, event.Type, event.Server, event.Path, event.State, StateToString(event.State), event.Err)
Expand All @@ -274,11 +268,10 @@ LOOP:
if conn != nil {
conn.Close()
}

break LOOP
return
case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged):
logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path)
z.RLock()
z.eventRegistryLock.RLock()
for p, a := range z.eventRegistry {
if strings.HasPrefix(p, event.Path) {
logger.Infof("send event{state:zk.EventNodeDataChange, Path:%s} notify event to path{%s} related listener",
Expand All @@ -288,16 +281,18 @@ LOOP:
}
}
}
z.RUnlock()
z.eventRegistryLock.RUnlock()
case (int)(zk.StateConnecting), (int)(zk.StateConnected), (int)(zk.StateHasSession):
if state == (int)(zk.StateHasSession) {
continue
}
z.eventRegistryLock.RLock()
if a, ok := z.eventRegistry[event.Path]; ok && 0 < len(a) {
for _, e := range a {
*e <- struct{}{}
}
}
z.eventRegistryLock.RUnlock()
}
state = (int)(event.State)
}
Expand All @@ -310,30 +305,29 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
return
}

z.Lock()
z.eventRegistryLock.Lock()
defer z.eventRegistryLock.Unlock()
a := z.eventRegistry[zkPath]
a = append(a, event)

z.eventRegistry[zkPath] = a
logger.Debugf("zkClient{%s} register event{path:%s, ptr:%p}", z.name, zkPath, event)
z.Unlock()
}

// UnregisterEvent ...
func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" {
return
}
z.Lock()
defer z.Unlock()

z.eventRegistryLock.Lock()
defer z.eventRegistryLock.Unlock()
infoList, ok := z.eventRegistry[zkPath]
if !ok {
return
}
for i, e := range infoList {
if e == event {
arr := infoList
infoList = append(arr[:i], arr[i+1:]...)
infoList = append(infoList[:i], infoList[i+1:]...)
logger.Infof("zkClient{%s} unregister event{path:%s, event:%p}", z.name, zkPath, event)
}
}
Expand Down Expand Up @@ -393,11 +387,11 @@ func (z *ZookeeperClient) Close() {
z.Conn = nil
z.Unlock()
if conn != nil {
logger.Warnf("zkClient Conn{name:%s, zk addr:%s} exit now.", z.name, conn.SessionID())
logger.Infof("zkClient Conn{name:%s, zk addr:%d} exit now.", z.name, conn.SessionID())
conn.Close()
}

logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs)
logger.Infof("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs)
}

// Create will create the node recursively, which means that if the parent node is absent,
Expand Down Expand Up @@ -428,9 +422,9 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error {

if err != nil {
if err == zk.ErrNodeExists {
logger.Debugf("zk.create(\"%s\") exists\n", tmpPath)
logger.Debugf("zk.create(\"%s\") exists", tmpPath)
} else {
logger.Errorf("zk.create(\"%s\") error(%v)\n", tmpPath, perrors.WithStack(err))
logger.Errorf("zk.create(\"%s\") error(%v)", tmpPath, perrors.WithStack(err))
return perrors.WithMessagef(err, "zk.Create(path:%s)", basePath)
}
}
Expand All @@ -441,11 +435,7 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error {

// Delete ...
func (z *ZookeeperClient) Delete(basePath string) error {
var (
err error
)

err = errNilZkClientConn
err := errNilZkClientConn
conn := z.getConn()
if conn != nil {
err = conn.Delete(basePath, -1)
Expand All @@ -458,25 +448,22 @@ func (z *ZookeeperClient) Delete(basePath string) error {
func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, error) {
var (
err error
data []byte
zkPath string
tmpPath string
)

err = errNilZkClientConn
data = []byte("")
zkPath = path.Join(basePath) + "/" + node
conn := z.getConn()
if conn != nil {
tmpPath, err = conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
tmpPath, err = conn.Create(zkPath, []byte(""), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
}

//if err != nil && err != zk.ErrNodeExists {
if err != nil {
logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)\n", zkPath, perrors.WithStack(err))
logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)", zkPath, perrors.WithStack(err))
return zkPath, perrors.WithStack(err)
}
logger.Debugf("zkClient{%s} create a temp zookeeper node:%s\n", z.name, tmpPath)
logger.Debugf("zkClient{%s} create a temp zookeeper node:%s", z.name, tmpPath)

return tmpPath, nil
}
Expand All @@ -501,11 +488,11 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string,

logger.Debugf("zookeeperClient.RegisterTempSeq(basePath{%s}) = tempPath{%s}", basePath, tmpPath)
if err != nil && err != zk.ErrNodeExists {
logger.Errorf("zkClient{%s} conn.Create(\"%s\", \"%s\", zk.FlagEphemeral|zk.FlagSequence) error(%v)\n",
logger.Errorf("zkClient{%s} conn.Create(\"%s\", \"%s\", zk.FlagEphemeral|zk.FlagSequence) error(%v)",
z.name, basePath, string(data), err)
return "", perrors.WithStack(err)
}
logger.Debugf("zkClient{%s} create a temp zookeeper node:%s\n", z.name, tmpPath)
logger.Debugf("zkClient{%s} create a temp zookeeper node:%s", z.name, tmpPath)

return tmpPath, nil
}
Expand Down
33 changes: 20 additions & 13 deletions remoting/zookeeper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package zookeeper

import (
"fmt"
"testing"
"time"
)
Expand All @@ -28,14 +27,18 @@ import (
"github.com/stretchr/testify/assert"
)

import (
"github.com/apache/dubbo-go/common/logger"
)

func verifyEventStateOrder(t *testing.T, c <-chan zk.Event, expectedStates []zk.State, source string) {
for _, state := range expectedStates {
for {
event, ok := <-c
if !ok {
t.Fatalf("unexpected channel close for %s", source)
}
fmt.Println(event)
logger.Debug(event)
if event.Type != zk.EventSession {
continue
}
Expand Down Expand Up @@ -87,32 +90,35 @@ func Test_newMockZookeeperClient(t *testing.T) {
}

func TestCreate(t *testing.T) {
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
defer ts.Stop()
err := z.Create("test1/test2/test3/test4")
err = z.Create("test1/test2/test3/test4")
assert.NoError(t, err)

states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
}

func TestCreateDelete(t *testing.T) {
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
defer ts.Stop()

states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
err := z.Create("/test1/test2/test3/test4")
err = z.Create("/test1/test2/test3/test4")
assert.NoError(t, err)
err = z.Delete("/test1/test2/test3/test4")
assert.NoError(t, err)
err2 := z.Delete("/test1/test2/test3/test4")
assert.NoError(t, err2)
//verifyEventOrder(t, event, []zk.EventType{zk.EventNodeCreated}, "event channel")
// verifyEventOrder(t, event, []zk.EventType{zk.EventNodeCreated}, "event channel")
}

func TestRegisterTemp(t *testing.T) {
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
defer ts.Stop()
err := z.Create("/test1/test2/test3")
err = z.Create("/test1/test2/test3")
assert.NoError(t, err)

tmpath, err := z.RegisterTemp("/test1/test2/test3", "test4")
Expand All @@ -123,9 +129,10 @@ func TestRegisterTemp(t *testing.T) {
}

func TestRegisterTempSeq(t *testing.T) {
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
defer ts.Stop()
err := z.Create("/test1/test2/test3")
err = z.Create("/test1/test2/test3")
assert.NoError(t, err)
tmpath, err := z.RegisterTempSeq("/test1/test2/test3", []byte("test"))
assert.NoError(t, err)
Expand Down