Skip to content

Commit

Permalink
Merge pull request #601 from dubbo-x/zk_client
Browse files Browse the repository at this point in the history
Imp: Zk client
  • Loading branch information
AlexStocks authored Jun 17, 2020
2 parents 669301f + df758dc commit 024f7b2
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 70 deletions.
101 changes: 44 additions & 57 deletions remoting/zookeeper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,23 @@ const (
)

var (
errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil")
errNilZkClientConn = perrors.New("zookeeper client{conn} is nil")
errNilChildren = perrors.Errorf("has none children")
errNilNode = perrors.Errorf("node does not exist")
)

// ZookeeperClient ...
type ZookeeperClient struct {
name string
ZkAddrs []string
sync.RWMutex // for conn
Conn *zk.Conn
Timeout time.Duration
exit chan struct{}
Wait sync.WaitGroup
eventRegistry map[string][]*chan struct{}
name string
ZkAddrs []string
sync.RWMutex // for conn
Conn *zk.Conn
Timeout time.Duration
exit chan struct{}
Wait sync.WaitGroup

eventRegistry map[string][]*chan struct{}
eventRegistryLock sync.RWMutex
}

// StateToString ...
Expand Down Expand Up @@ -114,12 +116,11 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
var (
err error
)
opions := &Options{}
options := &Options{}
for _, opt := range opts {
opt(opions)
opt(options)
}
connected := false
err = nil

lock := container.ZkClientLock()
url := container.GetUrl()
Expand All @@ -128,18 +129,18 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
defer lock.Unlock()

if container.ZkClient() == nil {
//in dubbo ,every registry only connect one node ,so this is []string{r.Address}
// in dubbo, every registry only connect one node, so this is []string{r.Address}
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
logger.Errorf("timeout config %v is invalid ,err is %v",
url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location)
}
zkAddresses := strings.Split(url.Location, ",")
newClient, err := newZookeeperClient(opions.zkName, zkAddresses, timeout)
newClient, err := newZookeeperClient(options.zkName, zkAddresses, timeout)
if err != nil {
logger.Warnf("newZookeeperClient(name{%s}, zk address{%v}, timeout{%d}) = error{%v}",
opions.zkName, url.Location, timeout.String(), err)
options.zkName, url.Location, timeout.String(), err)
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location)
}
container.SetZkClient(newClient)
Expand All @@ -157,8 +158,8 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
}

if connected {
logger.Info("Connect to zookeeper successfully, name{%s}, zk address{%v}", opions.zkName, url.Location)
container.WaitGroup().Add(1) //zk client start successful, then registry wg +1
logger.Info("Connect to zookeeper successfully, name{%s}, zk address{%v}", options.zkName, url.Location)
container.WaitGroup().Add(1) // zk client start successful, then registry wg +1
}

return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL)
Expand Down Expand Up @@ -214,31 +215,25 @@ func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option)
eventRegistry: make(map[string][]*chan struct{}),
}

opions := &Options{}
options := &Options{}
for _, opt := range opts {
opt(opions)
opt(options)
}

// connect to zookeeper
if opions.ts != nil {
ts = opions.ts
if options.ts != nil {
ts = options.ts
} else {
ts, err = zk.StartTestCluster(1, nil, nil)
if err != nil {
return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
}
}

//callbackChan := make(chan zk.Event)
//f := func(event zk.Event) {
// callbackChan <- event
//}

z.Conn, event, err = ts.ConnectWithOptions(timeout)
if err != nil {
return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
}
//z.wait.Add(1)

return ts, z, event, nil
}
Expand All @@ -255,11 +250,10 @@ func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
logger.Infof("zk{path:%v, name:%s} connection goroutine game over.", z.ZkAddrs, z.name)
}()

LOOP:
for {
select {
case <-z.exit:
break LOOP
return
case event = <-session:
logger.Warnf("client{%s} get a zookeeper event{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
z.name, event.Type, event.Server, event.Path, event.State, StateToString(event.State), event.Err)
Expand All @@ -274,11 +268,10 @@ LOOP:
if conn != nil {
conn.Close()
}

break LOOP
return
case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged):
logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path)
z.RLock()
z.eventRegistryLock.RLock()
for p, a := range z.eventRegistry {
if strings.HasPrefix(p, event.Path) {
logger.Infof("send event{state:zk.EventNodeDataChange, Path:%s} notify event to path{%s} related listener",
Expand All @@ -288,16 +281,18 @@ LOOP:
}
}
}
z.RUnlock()
z.eventRegistryLock.RUnlock()
case (int)(zk.StateConnecting), (int)(zk.StateConnected), (int)(zk.StateHasSession):
if state == (int)(zk.StateHasSession) {
continue
}
z.eventRegistryLock.RLock()
if a, ok := z.eventRegistry[event.Path]; ok && 0 < len(a) {
for _, e := range a {
*e <- struct{}{}
}
}
z.eventRegistryLock.RUnlock()
}
state = (int)(event.State)
}
Expand All @@ -310,30 +305,29 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
return
}

z.Lock()
z.eventRegistryLock.Lock()
defer z.eventRegistryLock.Unlock()
a := z.eventRegistry[zkPath]
a = append(a, event)

z.eventRegistry[zkPath] = a
logger.Debugf("zkClient{%s} register event{path:%s, ptr:%p}", z.name, zkPath, event)
z.Unlock()
}

// UnregisterEvent ...
func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" {
return
}
z.Lock()
defer z.Unlock()

z.eventRegistryLock.Lock()
defer z.eventRegistryLock.Unlock()
infoList, ok := z.eventRegistry[zkPath]
if !ok {
return
}
for i, e := range infoList {
if e == event {
arr := infoList
infoList = append(arr[:i], arr[i+1:]...)
infoList = append(infoList[:i], infoList[i+1:]...)
logger.Infof("zkClient{%s} unregister event{path:%s, event:%p}", z.name, zkPath, event)
}
}
Expand Down Expand Up @@ -393,11 +387,11 @@ func (z *ZookeeperClient) Close() {
z.Conn = nil
z.Unlock()
if conn != nil {
logger.Warnf("zkClient Conn{name:%s, zk addr:%s} exit now.", z.name, conn.SessionID())
logger.Infof("zkClient Conn{name:%s, zk addr:%d} exit now.", z.name, conn.SessionID())
conn.Close()
}

logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs)
logger.Infof("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs)
}

// Create will create the node recursively, which means that if the parent node is absent,
Expand Down Expand Up @@ -428,9 +422,9 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error {

if err != nil {
if err == zk.ErrNodeExists {
logger.Debugf("zk.create(\"%s\") exists\n", tmpPath)
logger.Debugf("zk.create(\"%s\") exists", tmpPath)
} else {
logger.Errorf("zk.create(\"%s\") error(%v)\n", tmpPath, perrors.WithStack(err))
logger.Errorf("zk.create(\"%s\") error(%v)", tmpPath, perrors.WithStack(err))
return perrors.WithMessagef(err, "zk.Create(path:%s)", basePath)
}
}
Expand All @@ -441,11 +435,7 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error {

// Delete ...
func (z *ZookeeperClient) Delete(basePath string) error {
var (
err error
)

err = errNilZkClientConn
err := errNilZkClientConn
conn := z.getConn()
if conn != nil {
err = conn.Delete(basePath, -1)
Expand All @@ -458,25 +448,22 @@ func (z *ZookeeperClient) Delete(basePath string) error {
func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, error) {
var (
err error
data []byte
zkPath string
tmpPath string
)

err = errNilZkClientConn
data = []byte("")
zkPath = path.Join(basePath) + "/" + node
conn := z.getConn()
if conn != nil {
tmpPath, err = conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
tmpPath, err = conn.Create(zkPath, []byte(""), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
}

//if err != nil && err != zk.ErrNodeExists {
if err != nil {
logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)\n", zkPath, perrors.WithStack(err))
logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)", zkPath, perrors.WithStack(err))
return zkPath, perrors.WithStack(err)
}
logger.Debugf("zkClient{%s} create a temp zookeeper node:%s\n", z.name, tmpPath)
logger.Debugf("zkClient{%s} create a temp zookeeper node:%s", z.name, tmpPath)

return tmpPath, nil
}
Expand All @@ -501,11 +488,11 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string,

logger.Debugf("zookeeperClient.RegisterTempSeq(basePath{%s}) = tempPath{%s}", basePath, tmpPath)
if err != nil && err != zk.ErrNodeExists {
logger.Errorf("zkClient{%s} conn.Create(\"%s\", \"%s\", zk.FlagEphemeral|zk.FlagSequence) error(%v)\n",
logger.Errorf("zkClient{%s} conn.Create(\"%s\", \"%s\", zk.FlagEphemeral|zk.FlagSequence) error(%v)",
z.name, basePath, string(data), err)
return "", perrors.WithStack(err)
}
logger.Debugf("zkClient{%s} create a temp zookeeper node:%s\n", z.name, tmpPath)
logger.Debugf("zkClient{%s} create a temp zookeeper node:%s", z.name, tmpPath)

return tmpPath, nil
}
Expand Down
33 changes: 20 additions & 13 deletions remoting/zookeeper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package zookeeper

import (
"fmt"
"testing"
"time"
)
Expand All @@ -28,14 +27,18 @@ import (
"github.com/stretchr/testify/assert"
)

import (
"github.com/apache/dubbo-go/common/logger"
)

func verifyEventStateOrder(t *testing.T, c <-chan zk.Event, expectedStates []zk.State, source string) {
for _, state := range expectedStates {
for {
event, ok := <-c
if !ok {
t.Fatalf("unexpected channel close for %s", source)
}
fmt.Println(event)
logger.Debug(event)
if event.Type != zk.EventSession {
continue
}
Expand Down Expand Up @@ -87,32 +90,35 @@ func Test_newMockZookeeperClient(t *testing.T) {
}

func TestCreate(t *testing.T) {
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
defer ts.Stop()
err := z.Create("test1/test2/test3/test4")
err = z.Create("test1/test2/test3/test4")
assert.NoError(t, err)

states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
}

func TestCreateDelete(t *testing.T) {
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
defer ts.Stop()

states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
err := z.Create("/test1/test2/test3/test4")
err = z.Create("/test1/test2/test3/test4")
assert.NoError(t, err)
err = z.Delete("/test1/test2/test3/test4")
assert.NoError(t, err)
err2 := z.Delete("/test1/test2/test3/test4")
assert.NoError(t, err2)
//verifyEventOrder(t, event, []zk.EventType{zk.EventNodeCreated}, "event channel")
// verifyEventOrder(t, event, []zk.EventType{zk.EventNodeCreated}, "event channel")
}

func TestRegisterTemp(t *testing.T) {
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
defer ts.Stop()
err := z.Create("/test1/test2/test3")
err = z.Create("/test1/test2/test3")
assert.NoError(t, err)

tmpath, err := z.RegisterTemp("/test1/test2/test3", "test4")
Expand All @@ -123,9 +129,10 @@ func TestRegisterTemp(t *testing.T) {
}

func TestRegisterTempSeq(t *testing.T) {
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
defer ts.Stop()
err := z.Create("/test1/test2/test3")
err = z.Create("/test1/test2/test3")
assert.NoError(t, err)
tmpath, err := z.RegisterTempSeq("/test1/test2/test3", []byte("test"))
assert.NoError(t, err)
Expand Down

0 comments on commit 024f7b2

Please sign in to comment.