Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nacos client #1255

Merged
merged 26 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
518be2b
build(deps): bump actions/cache from v2.1.4 to v2.1.5
dependabot[bot] Apr 19, 2021
3162e41
Merge pull request #1162 from apache/dependabot/github_actions/develo…
AlexStocks Apr 21, 2021
2138190
Merge branch 'develop' of https://github.com/apache/dubbo-go into dev…
AlexStocks Apr 27, 2021
bef8e95
Merge branch '3.0' into develop
AlexStocks Apr 27, 2021
a85b65b
Merge branch '3.0' into develop
AlexStocks May 9, 2021
9e38afe
Merge branch '3.0' into develop
AlexStocks May 10, 2021
f0ad730
Merge branch '3.0' into develop
AlexStocks May 10, 2021
4cb6e44
improve etcd version and change create to put (#1203)
ztelur May 15, 2021
56d9d71
Merge branch '3.0' into develop
AlexStocks May 15, 2021
cc74aa5
Merge branch '3.0' into develop
AlexStocks May 18, 2021
40082d4
Merge branch '3.0' into develop
AlexStocks May 21, 2021
bec69c5
up:remoting nacos
zhaoyunxing92 Jun 4, 2021
daff04e
add:nacos service discovery
zhaoyunxing92 Jun 4, 2021
0bb95d6
up:设置默认值
zhaoyunxing92 Jun 5, 2021
c032f19
up:nacos registroy client
zhaoyunxing92 Jun 5, 2021
e97320e
up:nacon config client
zhaoyunxing92 Jun 5, 2021
3fad0be
up:go fmt
zhaoyunxing92 Jun 5, 2021
441e0b4
up:nacos config client
zhaoyunxing92 Jun 5, 2021
3fa236e
Merge branch 'apache:master' into nacos-client
zhaoyunxing92 Jun 11, 2021
dd4ad37
up:test
zhaoyunxing92 Jun 11, 2021
70e5386
up:修改初æ测试方法
zhaoyunxing92 Jun 11, 2021
d1ed6a1
merge
zhaoyunxing92 Jun 11, 2021
03b0574
up:fmt
zhaoyunxing92 Jun 11, 2021
cad80ab
up:triple version
zhaoyunxing92 Jun 12, 2021
79ff40c
up:修改配置操作
zhaoyunxing92 Jun 12, 2021
654e491
rm:移除nacos客户端,解决冲突
zhaoyunxing92 Jun 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 24 additions & 88 deletions config_center/nacos/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,51 @@
package nacos

import (
"strconv"
"strings"
"sync"
"time"
)

import (
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/config_client"
nacosconst "github.com/nacos-group/nacos-sdk-go/common/constant"
nacosClient "github.com/dubbogo/gost/database/kv/nacos"
perrors "github.com/pkg/errors"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/remoting/nacos"
)

// NacosClient Nacos client
// NacosClient Nacos configClient
type NacosClient struct {
name string
NacosAddrs []string
sync.Mutex // for Client
client *config_client.IConfigClient
exit chan struct{}
Timeout time.Duration
once sync.Once
onceClose func()
name string
NacosAddrs []string
sync.Mutex // for Client
configClient *nacosClient.NacosConfigClient
exit chan struct{}
Timeout time.Duration
once sync.Once
onceClose func()
}

// Client Get Client
func (n *NacosClient) Client() *config_client.IConfigClient {
return n.client
func (n *NacosClient) Client() *nacosClient.NacosConfigClient {
return n.configClient
}

// SetClient Set client
func (n *NacosClient) SetClient(client *config_client.IConfigClient) {
// SetClient Set configClient
func (n *NacosClient) SetClient(configClient *nacosClient.NacosConfigClient) {
n.Lock()
n.client = client
n.configClient = configClient
n.Unlock()
}

type option func(*options)

type options struct {
nacosName string
// client *NacosClient
// configClient *NacosClient
}

// WithNacosName Set nacos name
Expand All @@ -75,7 +72,7 @@ func WithNacosName(name string) option {
}
}

// ValidateNacosClient Validate nacos client , if null then create it
// ValidateNacosClient Validate nacos configClient , if null then create it
func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
if container == nil {
return perrors.Errorf("container can not be null")
Expand All @@ -95,7 +92,7 @@ func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
nacosAddresses := strings.Split(url.Location, ",")
if container.NacosClient() == nil {
// in dubbo ,every registry only connect one node ,so this is []string{r.Address}
newClient, err := newNacosClient(os.nacosName, nacosAddresses, timeout, url)
newClient, err := nacos.NewNacosConfigClientByUrl(url)
if err != nil {
logger.Errorf("newNacosClient(name{%s}, nacos address{%v}, timeout{%d}) = error{%v}",
os.nacosName, url.Location, timeout.String(), err)
Expand All @@ -105,79 +102,18 @@ func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
}

if container.NacosClient().Client() == nil {
configClient, err := initNacosConfigClient(nacosAddresses, timeout, url)
configClient, err := nacos.NewNacosConfigClientByUrl(url)
if err != nil {
logger.Errorf("initNacosConfigClient(addr:%+v,timeout:%v,url:%v) = err %+v",
nacosAddresses, timeout.String(), url, err)
return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
}
container.NacosClient().SetClient(&configClient)

container.NacosClient().SetClient(configClient.Client())
}

return perrors.WithMessagef(nil, "newNacosClient(address:%+v)", url.PrimitiveURL)
}

func newNacosClient(name string, nacosAddrs []string, timeout time.Duration, url *common.URL) (*NacosClient, error) {
var (
err error
n *NacosClient
)

n = &NacosClient{
name: name,
NacosAddrs: nacosAddrs,
Timeout: timeout,
exit: make(chan struct{}),
onceClose: func() {
close(n.exit)
},
}

configClient, err := initNacosConfigClient(nacosAddrs, timeout, url)
if err != nil {
logger.Errorf("initNacosConfigClient(addr:%+v,timeout:%v,url:%v) = err %+v",
nacosAddrs, timeout.String(), url, err)
return n, perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
}
n.SetClient(&configClient)

return n, nil
}

func initNacosConfigClient(nacosAddrs []string, timeout time.Duration, url *common.URL) (config_client.IConfigClient, error) {
var svrConfList []nacosconst.ServerConfig
for _, nacosAddr := range nacosAddrs {
split := strings.Split(nacosAddr, ":")
port, err := strconv.ParseUint(split[1], 10, 64)
if err != nil {
logger.Errorf("strconv.ParseUint(nacos addr port:%+v) = error %+v", split[1], err)
continue
}
svrconf := nacosconst.ServerConfig{
IpAddr: split[0],
Port: port,
}
svrConfList = append(svrConfList, svrconf)
}

return clients.CreateConfigClient(map[string]interface{}{
"serverConfigs": svrConfList,
"clientConfig": nacosconst.ClientConfig{
TimeoutMs: uint64(int32(timeout / time.Millisecond)),
ListenInterval: uint64(int32(timeout / time.Millisecond)),
NotLoadCacheAtStart: true,
LogDir: url.GetParam(constant.NACOS_LOG_DIR_KEY, ""),
CacheDir: url.GetParam(constant.NACOS_CACHE_DIR_KEY, ""),
Endpoint: url.GetParam(constant.NACOS_ENDPOINT, ""),
Username: url.GetParam(constant.NACOS_USERNAME, ""),
Password: url.GetParam(constant.NACOS_PASSWORD, ""),
NamespaceId: url.GetParam(constant.NACOS_NAMESPACE_ID, ""),
},
})
}

// Done Get nacos client exit signal
// Done Get nacos configClient exit signal
func (n *NacosClient) Done() <-chan struct{} {
return n.exit
}
Expand All @@ -193,7 +129,7 @@ func (n *NacosClient) stop() bool {
return false
}

// NacosClientValid Get nacos client valid status
// NacosClientValid Get nacos configClient valid status
func (n *NacosClient) NacosClientValid() bool {
select {
case <-n.exit:
Expand All @@ -211,7 +147,7 @@ func (n *NacosClient) NacosClientValid() bool {
return valid
}

// Close Close nacos client , then set null
// Close Close nacos configClient , then set null
func (n *NacosClient) Close() {
if n == nil {
return
Expand Down
49 changes: 19 additions & 30 deletions config_center/nacos/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package nacos

import (
"strings"
"testing"
"time"
)
Expand All @@ -32,9 +31,10 @@ import (
)

func TestNewNacosClient(t *testing.T) {
server := mockCommonNacosServer()
nacosURL := strings.ReplaceAll(server.URL, "http", "registry")

nacosURL := "registry://127.0.0.1:8848"
registryUrl, _ := common.NewURL(nacosURL)

c := &nacosDynamicConfiguration{
url: registryUrl,
done: make(chan struct{}),
Expand All @@ -44,52 +44,41 @@ func TestNewNacosClient(t *testing.T) {
c.wg.Add(1)
go HandleClientRestart(c)
go func() {
// c.client.Close() and <-c.client.Done() have order requirements.
// If c.client.Close() is called first.It is possible that "go HandleClientRestart(c)"
// sets c.client to nil before calling c.client.Done().
// c.configClient.Close() and <-c.configClient.Done() have order requirements.
// If c.configClient.Close() is called first.It is possible that "go HandleClientRestart(c)"
// sets c.configClient to nil before calling c.configClient.Done().
time.Sleep(time.Second)
c.client.Close()
}()
<-c.client.Done()
//<-c.client.Done()
c.Destroy()
}

func TestSetNacosClient(t *testing.T) {
server := mockCommonNacosServer()
nacosURL := "registry://" + server.Listener.Addr().String()
nacosURL := "registry://127.0.0.1:8848"
registryUrl, _ := common.NewURL(nacosURL)

c := &nacosDynamicConfiguration{
url: registryUrl,
done: make(chan struct{}),
}
var client *NacosClient
client = &NacosClient{
name: nacosClientName,
NacosAddrs: []string{nacosURL},
Timeout: 15 * time.Second,
exit: make(chan struct{}),
onceClose: func() {
close(client.exit)
},
}
c.SetNacosClient(client)

err := ValidateNacosClient(c, WithNacosName(nacosClientName))
assert.NoError(t, err)
c.wg.Add(1)
go HandleClientRestart(c)
go func() {
// c.client.Close() and <-c.client.Done() have order requirements.
// If c.client.Close() is called first.It is possible that "go HandleClientRestart(c)"
// sets c.client to nil before calling c.client.Done().
// c.configClient.Close() and <-c.configClient.Done() have order requirements.
// If c.configClient.Close() is called first.It is possible that "go HandleClientRestart(c)"
// sets c.configClient to nil before calling c.configClient.Done().
time.Sleep(time.Second)
c.client.Close()
}()
<-c.client.Done()
c.Destroy()
}

func TestNewNacosClient_connectError(t *testing.T) {
nacosURL := "registry://127.0.0.1:8888"
nacosURL := "registry://127.0.0.1:8848"
registryUrl, err := common.NewURL(nacosURL)
assert.NoError(t, err)
c := &nacosDynamicConfiguration{
Expand All @@ -101,14 +90,14 @@ func TestNewNacosClient_connectError(t *testing.T) {
c.wg.Add(1)
go HandleClientRestart(c)
go func() {
// c.client.Close() and <-c.client.Done() have order requirements.
// If c.client.Close() is called first.It is possible that "go HandleClientRestart(c)"
// sets c.client to nil before calling c.client.Done().
// c.configClient.Close() and <-c.configClient.Done() have order requirements.
// If c.configClient.Close() is called first.It is possible that "go HandleClientRestart(c)"
// sets c.configClient to nil before calling c.configClient.Done().
time.Sleep(time.Second)
c.client.Close()
}()
<-c.client.Done()
// let client do retry
// <-c.client.Done()
// let configClient do retry
time.Sleep(5 * time.Second)
c.Destroy()
}
54 changes: 7 additions & 47 deletions config_center/nacos/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,75 +19,35 @@ package nacos

import (
"sync"
"time"
)

import (
"github.com/apache/dubbo-getty"
perrors "github.com/pkg/errors"
nacosClient "github.com/dubbogo/gost/database/kv/nacos"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/logger"
)

const (
connDelay = 3
maxFailTimes = 15
)

type nacosClientFacade interface {
NacosClient() *NacosClient
SetNacosClient(*NacosClient)
// WaitGroup for wait group control, zk client listener & zk client container
NacosClient() *nacosClient.NacosConfigClient
SetNacosClient(*nacosClient.NacosConfigClient)
// WaitGroup for wait group control, zk configClient listener & zk configClient container
WaitGroup() *sync.WaitGroup
// GetDone For nacos client control RestartCallBack() bool
// GetDone For nacos configClient control RestartCallBack() bool
GetDone() chan struct{}
common.Node
}

// HandleClientRestart Restart client handler
// HandleClientRestart Restart configClient handler
func HandleClientRestart(r nacosClientFacade) {
var (
err error
failTimes int
)

defer r.WaitGroup().Done()
LOOP:
for {
select {
case <-r.GetDone():
logger.Warnf("(NacosProviderRegistry)reconnectNacosRegistry goroutine exit now...")
break LOOP
// re-register all services
case <-r.NacosClient().Done():
r.NacosClient().Close()
nacosName := r.NacosClient().name
nacosAddress := r.NacosClient().NacosAddrs
r.SetNacosClient(nil)

// Connect nacos until success.
failTimes = 0
for {
select {
case <-r.GetDone():
logger.Warnf("(NacosProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(time.Duration(failTimes*connDelay) * time.Second): // Prevent crazy reconnection nacos.
}
err = ValidateNacosClient(r, WithNacosName(nacosName))
logger.Infof("NacosProviderRegistry.validateNacosClient(nacosAddr{%s}) = error{%#v}",
nacosAddress, perrors.WithStack(err))
if err == nil {
break
}
failTimes++
if maxFailTimes <= failTimes {
failTimes = maxFailTimes
}
}
return
}
}
}
Loading