Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Imp: Zk client #601

Merged
merged 7 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 44 additions & 57 deletions remoting/zookeeper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,23 @@ const (
)

var (
errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil")
errNilZkClientConn = perrors.New("zookeeper client{conn} is nil")
errNilChildren = perrors.Errorf("has none children")
errNilNode = perrors.Errorf("node does not exist")
)

// ZookeeperClient ...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe u can add some comments for this struct or just using "nolint" instead.

type ZookeeperClient struct {
name string
ZkAddrs []string
sync.RWMutex // for conn
Conn *zk.Conn
Timeout time.Duration
exit chan struct{}
Wait sync.WaitGroup
eventRegistry map[string][]*chan struct{}
name string
ZkAddrs []string
sync.RWMutex // for conn
Conn *zk.Conn
Timeout time.Duration
exit chan struct{}
Wait sync.WaitGroup

eventRegistry map[string][]*chan struct{}
eventRegistryLock sync.RWMutex
}

// StateToString ...
Expand Down Expand Up @@ -114,12 +116,11 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
var (
err error
)
opions := &Options{}
options := &Options{}
for _, opt := range opts {
opt(opions)
opt(options)
}
connected := false
err = nil

lock := container.ZkClientLock()
url := container.GetUrl()
Expand All @@ -128,18 +129,18 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
defer lock.Unlock()

if container.ZkClient() == nil {
//in dubbo ,every registry only connect one node ,so this is []string{r.Address}
// in dubbo, every registry only connect one node, so this is []string{r.Address}
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
logger.Errorf("timeout config %v is invalid ,err is %v",
url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location)
}
zkAddresses := strings.Split(url.Location, ",")
newClient, err := newZookeeperClient(opions.zkName, zkAddresses, timeout)
newClient, err := newZookeeperClient(options.zkName, zkAddresses, timeout)
if err != nil {
logger.Warnf("newZookeeperClient(name{%s}, zk address{%v}, timeout{%d}) = error{%v}",
opions.zkName, url.Location, timeout.String(), err)
options.zkName, url.Location, timeout.String(), err)
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location)
}
container.SetZkClient(newClient)
Expand All @@ -157,8 +158,8 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
}

if connected {
logger.Info("Connect to zookeeper successfully, name{%s}, zk address{%v}", opions.zkName, url.Location)
container.WaitGroup().Add(1) //zk client start successful, then registry wg +1
logger.Info("Connect to zookeeper successfully, name{%s}, zk address{%v}", options.zkName, url.Location)
container.WaitGroup().Add(1) // zk client start successful, then registry wg +1
}

return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL)
Expand Down Expand Up @@ -214,31 +215,25 @@ func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option)
eventRegistry: make(map[string][]*chan struct{}),
}

opions := &Options{}
options := &Options{}
for _, opt := range opts {
opt(opions)
opt(options)
}

// connect to zookeeper
if opions.ts != nil {
ts = opions.ts
if options.ts != nil {
ts = options.ts
} else {
ts, err = zk.StartTestCluster(1, nil, nil)
if err != nil {
return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
}
}

//callbackChan := make(chan zk.Event)
//f := func(event zk.Event) {
// callbackChan <- event
//}

z.Conn, event, err = ts.ConnectWithOptions(timeout)
if err != nil {
return nil, nil, nil, perrors.WithMessagef(err, "zk.Connect")
}
//z.wait.Add(1)

return ts, z, event, nil
}
Expand All @@ -255,11 +250,10 @@ func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
logger.Infof("zk{path:%v, name:%s} connection goroutine game over.", z.ZkAddrs, z.name)
}()

LOOP:
for {
select {
case <-z.exit:
break LOOP
return
case event = <-session:
logger.Warnf("client{%s} get a zookeeper event{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
z.name, event.Type, event.Server, event.Path, event.State, StateToString(event.State), event.Err)
Expand All @@ -274,11 +268,10 @@ LOOP:
if conn != nil {
conn.Close()
}

break LOOP
return
case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged):
logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path)
z.RLock()
z.eventRegistryLock.RLock()
for p, a := range z.eventRegistry {
if strings.HasPrefix(p, event.Path) {
logger.Infof("send event{state:zk.EventNodeDataChange, Path:%s} notify event to path{%s} related listener",
Expand All @@ -288,16 +281,18 @@ LOOP:
}
}
}
z.RUnlock()
z.eventRegistryLock.RUnlock()
case (int)(zk.StateConnecting), (int)(zk.StateConnected), (int)(zk.StateHasSession):
if state == (int)(zk.StateHasSession) {
continue
}
z.eventRegistryLock.RLock()
if a, ok := z.eventRegistry[event.Path]; ok && 0 < len(a) {
for _, e := range a {
*e <- struct{}{}
}
}
z.eventRegistryLock.RUnlock()
}
state = (int)(event.State)
}
Expand All @@ -310,30 +305,29 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
return
}

z.Lock()
z.eventRegistryLock.Lock()
defer z.eventRegistryLock.Unlock()
a := z.eventRegistry[zkPath]
a = append(a, event)

z.eventRegistry[zkPath] = a
logger.Debugf("zkClient{%s} register event{path:%s, ptr:%p}", z.name, zkPath, event)
z.Unlock()
}

// UnregisterEvent ...
func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" {
return
}
z.Lock()
defer z.Unlock()

z.eventRegistryLock.Lock()
defer z.eventRegistryLock.Unlock()
infoList, ok := z.eventRegistry[zkPath]
if !ok {
return
}
for i, e := range infoList {
if e == event {
arr := infoList
infoList = append(arr[:i], arr[i+1:]...)
infoList = append(infoList[:i], infoList[i+1:]...)
logger.Infof("zkClient{%s} unregister event{path:%s, event:%p}", z.name, zkPath, event)
}
}
Expand Down Expand Up @@ -393,11 +387,11 @@ func (z *ZookeeperClient) Close() {
z.Conn = nil
z.Unlock()
if conn != nil {
logger.Warnf("zkClient Conn{name:%s, zk addr:%s} exit now.", z.name, conn.SessionID())
logger.Infof("zkClient Conn{name:%s, zk addr:%d} exit now.", z.name, conn.SessionID())
conn.Close()
}

logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs)
logger.Infof("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs)
}

// Create will create the node recursively, which means that if the parent node is absent,
Expand Down Expand Up @@ -428,9 +422,9 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error {

if err != nil {
if err == zk.ErrNodeExists {
logger.Debugf("zk.create(\"%s\") exists\n", tmpPath)
logger.Debugf("zk.create(\"%s\") exists", tmpPath)
} else {
logger.Errorf("zk.create(\"%s\") error(%v)\n", tmpPath, perrors.WithStack(err))
logger.Errorf("zk.create(\"%s\") error(%v)", tmpPath, perrors.WithStack(err))
return perrors.WithMessagef(err, "zk.Create(path:%s)", basePath)
}
}
Expand All @@ -441,11 +435,7 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error {

// Delete ...
func (z *ZookeeperClient) Delete(basePath string) error {
var (
err error
)

err = errNilZkClientConn
err := errNilZkClientConn
conn := z.getConn()
if conn != nil {
err = conn.Delete(basePath, -1)
Expand All @@ -458,25 +448,22 @@ func (z *ZookeeperClient) Delete(basePath string) error {
func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, error) {
var (
err error
data []byte
zkPath string
tmpPath string
)

err = errNilZkClientConn
data = []byte("")
zkPath = path.Join(basePath) + "/" + node
conn := z.getConn()
if conn != nil {
tmpPath, err = conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
tmpPath, err = conn.Create(zkPath, []byte(""), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
}

//if err != nil && err != zk.ErrNodeExists {
if err != nil {
logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)\n", zkPath, perrors.WithStack(err))
logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)", zkPath, perrors.WithStack(err))
return zkPath, perrors.WithStack(err)
}
logger.Debugf("zkClient{%s} create a temp zookeeper node:%s\n", z.name, tmpPath)
logger.Debugf("zkClient{%s} create a temp zookeeper node:%s", z.name, tmpPath)

return tmpPath, nil
}
Expand All @@ -501,11 +488,11 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string,

logger.Debugf("zookeeperClient.RegisterTempSeq(basePath{%s}) = tempPath{%s}", basePath, tmpPath)
if err != nil && err != zk.ErrNodeExists {
logger.Errorf("zkClient{%s} conn.Create(\"%s\", \"%s\", zk.FlagEphemeral|zk.FlagSequence) error(%v)\n",
logger.Errorf("zkClient{%s} conn.Create(\"%s\", \"%s\", zk.FlagEphemeral|zk.FlagSequence) error(%v)",
z.name, basePath, string(data), err)
return "", perrors.WithStack(err)
}
logger.Debugf("zkClient{%s} create a temp zookeeper node:%s\n", z.name, tmpPath)
logger.Debugf("zkClient{%s} create a temp zookeeper node:%s", z.name, tmpPath)

return tmpPath, nil
}
Expand Down
53 changes: 30 additions & 23 deletions remoting/zookeeper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
package zookeeper

import (
"fmt"
"testing"
"time"
)

import (
"github.com/dubbogo/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think u can rollback the testify/require to testify/assert package.

Copy link
Author

@gaoxinge gaoxinge Jun 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an example

func TestSomething(t *testing.T) {
	var err error

	err = perrors.Errorf("something1")
	assert.NoError(t, err)

	err = perrors.Errorf("something2")
	assert.NoError(t, err)
}
  • use testify/assert will output two errors on console:
=== RUN   TestSomething
--- FAIL: TestSomething (0.00s)
    client_test.go:151: 
        	Error Trace:	client_test.go:151
        	Error:      	Received unexpected error:
        	            	something1
        	            	github.com/apache/dubbo-go/remoting/zookeeper.TestSomething
        	            		D:/GoPath/src/github.com/apache/dubbo-go/remoting/zookeeper/client_test.go:150
        	            	testing.tRunner
        	            		D:/Go/src/testing/testing.go:909
        	            	runtime.goexit
        	            		D:/Go/src/runtime/asm_amd64.s:1357
        	Test:       	TestSomething
    client_test.go:154: 
        	Error Trace:	client_test.go:154
        	Error:      	Received unexpected error:
        	            	something2
        	            	github.com/apache/dubbo-go/remoting/zookeeper.TestSomething
        	            		D:/GoPath/src/github.com/apache/dubbo-go/remoting/zookeeper/client_test.go:153
        	            	testing.tRunner
        	            		D:/Go/src/testing/testing.go:909
        	            	runtime.goexit
        	            		D:/Go/src/runtime/asm_amd64.s:1357
        	Test:       	TestSomething
FAIL
  • use testify/require just output the first error and stop the test case intermediately

why I prefer testify/require

  • Because errors in test case may have some dependency: first error causes second error, then I think it's better to use testify/require, which only shows the first error.
  • Using testify/assert may output too many errors, but developers only need to focus on fixing the first error.

@AlexStocks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, it is impossible that we test the same error case in one Testxxx.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AlexStocks Ok, I revert require to assert.

)

import (
"github.com/apache/dubbo-go/common/logger"
)

func verifyEventStateOrder(t *testing.T, c <-chan zk.Event, expectedStates []zk.State, source string) {
Expand All @@ -35,7 +38,7 @@ func verifyEventStateOrder(t *testing.T, c <-chan zk.Event, expectedStates []zk.
if !ok {
t.Fatalf("unexpected channel close for %s", source)
}
fmt.Println(event)
logger.Debug(event)
if event.Type != zk.EventSession {
continue
}
Expand Down Expand Up @@ -77,7 +80,7 @@ func verifyEventStateOrder(t *testing.T, c <-chan zk.Event, expectedStates []zk.

func Test_newMockZookeeperClient(t *testing.T) {
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
require.NoError(t, err)
defer ts.Stop()
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
Expand All @@ -87,49 +90,53 @@ func Test_newMockZookeeperClient(t *testing.T) {
}

func TestCreate(t *testing.T) {
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
require.NoError(t, err)
defer ts.Stop()
err := z.Create("test1/test2/test3/test4")
assert.NoError(t, err)
err = z.Create("test1/test2/test3/test4")
require.NoError(t, err)

states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
}

func TestCreateDelete(t *testing.T) {
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
require.NoError(t, err)
defer ts.Stop()

states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
err := z.Create("/test1/test2/test3/test4")
assert.NoError(t, err)
err2 := z.Delete("/test1/test2/test3/test4")
assert.NoError(t, err2)
//verifyEventOrder(t, event, []zk.EventType{zk.EventNodeCreated}, "event channel")
err = z.Create("/test1/test2/test3/test4")
require.NoError(t, err)
err = z.Delete("/test1/test2/test3/test4")
require.NoError(t, err)
// verifyEventOrder(t, event, []zk.EventType{zk.EventNodeCreated}, "event channel")
}

func TestRegisterTemp(t *testing.T) {
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
require.NoError(t, err)
defer ts.Stop()
err := z.Create("/test1/test2/test3")
assert.NoError(t, err)
err = z.Create("/test1/test2/test3")
require.NoError(t, err)

tmpath, err := z.RegisterTemp("/test1/test2/test3", "test4")
assert.NoError(t, err)
assert.Equal(t, "/test1/test2/test3/test4", tmpath)
require.NoError(t, err)
require.Equal(t, "/test1/test2/test3/test4", tmpath)
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
}

func TestRegisterTempSeq(t *testing.T) {
ts, z, event, _ := NewMockZookeeperClient("test", 15*time.Second)
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
require.NoError(t, err)
defer ts.Stop()
err := z.Create("/test1/test2/test3")
assert.NoError(t, err)
err = z.Create("/test1/test2/test3")
require.NoError(t, err)
tmpath, err := z.RegisterTempSeq("/test1/test2/test3", []byte("test"))
assert.NoError(t, err)
assert.Equal(t, "/test1/test2/test3/0000000000", tmpath)
require.NoError(t, err)
require.Equal(t, "/test1/test2/test3/0000000000", tmpath)
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
}
Expand Down