Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nacos client #1255

Merged
merged 26 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
518be2b
build(deps): bump actions/cache from v2.1.4 to v2.1.5
dependabot[bot] Apr 19, 2021
3162e41
Merge pull request #1162 from apache/dependabot/github_actions/develo…
AlexStocks Apr 21, 2021
2138190
Merge branch 'develop' of https://github.com/apache/dubbo-go into dev…
AlexStocks Apr 27, 2021
bef8e95
Merge branch '3.0' into develop
AlexStocks Apr 27, 2021
a85b65b
Merge branch '3.0' into develop
AlexStocks May 9, 2021
9e38afe
Merge branch '3.0' into develop
AlexStocks May 10, 2021
f0ad730
Merge branch '3.0' into develop
AlexStocks May 10, 2021
4cb6e44
improve etcd version and change create to put (#1203)
ztelur May 15, 2021
56d9d71
Merge branch '3.0' into develop
AlexStocks May 15, 2021
cc74aa5
Merge branch '3.0' into develop
AlexStocks May 18, 2021
40082d4
Merge branch '3.0' into develop
AlexStocks May 21, 2021
bec69c5
up:remoting nacos
zhaoyunxing92 Jun 4, 2021
daff04e
add:nacos service discovery
zhaoyunxing92 Jun 4, 2021
0bb95d6
up:设置默认值
zhaoyunxing92 Jun 5, 2021
c032f19
up:nacos registroy client
zhaoyunxing92 Jun 5, 2021
e97320e
up:nacon config client
zhaoyunxing92 Jun 5, 2021
3fad0be
up:go fmt
zhaoyunxing92 Jun 5, 2021
441e0b4
up:nacos config client
zhaoyunxing92 Jun 5, 2021
3fa236e
Merge branch 'apache:master' into nacos-client
zhaoyunxing92 Jun 11, 2021
dd4ad37
up:test
zhaoyunxing92 Jun 11, 2021
70e5386
up:修改初æ测试方法
zhaoyunxing92 Jun 11, 2021
d1ed6a1
merge
zhaoyunxing92 Jun 11, 2021
03b0574
up:fmt
zhaoyunxing92 Jun 11, 2021
cad80ab
up:triple version
zhaoyunxing92 Jun 12, 2021
79ff40c
up:修改配置操作
zhaoyunxing92 Jun 12, 2021
654e491
rm:移除nacos客户端,解决冲突
zhaoyunxing92 Jun 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
up:nacos registroy client
  • Loading branch information
zhaoyunxing92 committed Jun 5, 2021
commit c032f19a59bcb66a7ba37c195b1bdef57a34a404
17 changes: 8 additions & 9 deletions registry/nacos/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

import (
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
nacosClient "github.com/dubbogo/gost/database/kv/nacos"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/vo"
perrors "github.com/pkg/errors"
Expand All @@ -42,7 +42,7 @@ import (
)

type nacosListener struct {
namingClient naming_client.INamingClient
namingClient *nacosClient.NacosNamingClient
listenUrl *common.URL
events chan *config_center.ConfigChangeEvent
instanceMap map[string]model.Instance
Expand All @@ -51,8 +51,8 @@ type nacosListener struct {
subscribeParam *vo.SubscribeParam
}

// NewRegistryDataListener creates a data listener for nacos
func NewNacosListener(url *common.URL, namingClient naming_client.INamingClient) (*nacosListener, error) {
// NewNacosListener creates a data listener for nacos
func NewNacosListener(url *common.URL, namingClient *nacosClient.NacosNamingClient) (*nacosListener, error) {
listener := &nacosListener{
namingClient: namingClient,
listenUrl: url, events: make(chan *config_center.ConfigChangeEvent, 32),
Expand Down Expand Up @@ -150,7 +150,6 @@ func (nl *nacosListener) Callback(services []model.SubscribeService, err error)
}

nl.instanceMap = newInstanceMap

for i := range addInstances {
newUrl := generateUrl(addInstances[i])
if newUrl != nil {
Expand Down Expand Up @@ -184,18 +183,18 @@ func getSubscribeName(url *common.URL) string {

func (nl *nacosListener) startListen() error {
if nl.namingClient == nil {
return perrors.New("nacos naming client stopped")
return perrors.New("nacos naming namingClient stopped")
}
serviceName := getSubscribeName(nl.listenUrl)
nl.subscribeParam = &vo.SubscribeParam{ServiceName: serviceName, SubscribeCallback: nl.Callback}
go func() {
_ = nl.namingClient.Subscribe(nl.subscribeParam)
_ = nl.namingClient.Client().Subscribe(nl.subscribeParam)
}()
return nil
}

func (nl *nacosListener) stopListen() error {
return nl.namingClient.Unsubscribe(nl.subscribeParam)
return nl.namingClient.Client().Unsubscribe(nl.subscribeParam)
}

func (nl *nacosListener) process(configType *config_center.ConfigChangeEvent) {
Expand All @@ -219,6 +218,6 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, error) {

// nolint
func (nl *nacosListener) Close() {
nl.stopListen()
_ = nl.stopListen()
close(nl.done)
}
73 changes: 7 additions & 66 deletions registry/nacos/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ package nacos

import (
"bytes"
"net"
"dubbo.apache.org/dubbo-go/v3/remoting/nacos"
"strconv"
"strings"
"time"
)

import (
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant"
nacosClient "github.com/dubbogo/gost/database/kv/nacos"
"github.com/nacos-group/nacos-sdk-go/vo"
perrors "github.com/pkg/errors"
)
Expand All @@ -55,7 +53,7 @@ func init() {

type nacosRegistry struct {
*common.URL
namingClient naming_client.INamingClient
namingClient *nacosClient.NacosNamingClient
registryUrls []*common.URL
}

Expand Down Expand Up @@ -119,7 +117,7 @@ func createRegisterParam(url *common.URL, serviceName string) vo.RegisterInstanc
func (nr *nacosRegistry) Register(url *common.URL) error {
serviceName := getServiceName(url)
param := createRegisterParam(url, serviceName)
isRegistry, err := nr.namingClient.RegisterInstance(param)
isRegistry, err := nr.namingClient.Client().RegisterInstance(param)
if err != nil {
return err
}
Expand Down Expand Up @@ -149,7 +147,7 @@ func createDeregisterParam(url *common.URL, serviceName string) vo.DeregisterIns
func (nr *nacosRegistry) DeRegister(url *common.URL) error {
serviceName := getServiceName(url)
param := createDeregisterParam(url, serviceName)
isDeRegistry, err := nr.namingClient.DeregisterInstance(param)
isDeRegistry, err := nr.namingClient.Client().DeregisterInstance(param)
if err != nil {
return err
}
Expand Down Expand Up @@ -199,11 +197,9 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti
listener.Close()
return err
}

logger.Infof("update begin, service event: %v", serviceEvent.String())
notifyListener.Notify(serviceEvent)
}

}
}

Expand Down Expand Up @@ -237,69 +233,14 @@ func (nr *nacosRegistry) Destroy() {

// newNacosRegistry will create new instance
func newNacosRegistry(url *common.URL) (registry.Registry, error) {
nacosConfig, err := getNacosConfig(url)
if err != nil {
return &nacosRegistry{}, err
}
client, err := clients.CreateNamingClient(nacosConfig)
namingClient, err := nacos.NewNacosClientByUrl(url)
if err != nil {
return &nacosRegistry{}, err
}
tmpRegistry := &nacosRegistry{
URL: url,
namingClient: client,
namingClient: namingClient,
registryUrls: []*common.URL{},
}
return tmpRegistry, nil
}

// getNacosConfig will return the nacos config
// TODO support RemoteRef
func getNacosConfig(url *common.URL) (map[string]interface{}, error) {
if url == nil {
return nil, perrors.New("url is empty!")
}
if len(url.Location) == 0 {
return nil, perrors.New("url.location is empty!")
}
configMap := make(map[string]interface{}, 2)

addresses := strings.Split(url.Location, ",")
serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses))
for _, addr := range addresses {
ip, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, perrors.WithMessagef(err, "split [%s] ", addr)
}
port, _ := strconv.Atoi(portStr)
serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{
IpAddr: ip,
Port: uint64(port),
})
}
configMap[nacosConstant.KEY_SERVER_CONFIGS] = serverConfigs

var clientConfig nacosConstant.ClientConfig
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
return nil, err
}
clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000)
clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs
clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "")
clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "")
clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "")
clientConfig.NamespaceId = url.GetParam(constant.NACOS_NAMESPACE_ID, "")

// enable local cache when nacos can not connect.
notLoadCache, err := strconv.ParseBool(url.GetParam(constant.NACOS_NOT_LOAD_LOCAL_CACHE, "false"))
if err != nil {
logger.Errorf("ParseBool - error: %v", err)
notLoadCache = false
}
clientConfig.NotLoadCacheAtStart = notLoadCache

configMap[nacosConstant.KEY_CLIENT_CONFIG] = clientConfig

return configMap, nil
}
7 changes: 4 additions & 3 deletions registry/nacos/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
)

func TestNacosRegistry_Register(t *testing.T) {
t.Skip()
//t.Skip()
if !checkNacosServerAlive() {
return
}
Expand Down Expand Up @@ -66,8 +66,9 @@ func TestNacosRegistry_Register(t *testing.T) {
t.Errorf("register error:%s \n", err.Error())
return
}
time.Sleep(5 * time.Second)
nacosReg := reg.(*nacosRegistry)
service, _ := nacosReg.namingClient.GetService(vo.GetServiceParam{ServiceName: "providers:com.ikurento.user.UserProvider:1.0.0:guangzhou-idc"})
service, _ := nacosReg.namingClient.Client().GetService(vo.GetServiceParam{ServiceName: "providers:com.ikurento.user.UserProvider:1.0.0:guangzhou-idc"})
data, _ := json.Marshal(service)
t.Logf(string(data))
assert.Equal(t, 1, len(service.Hosts))
Expand Down Expand Up @@ -179,7 +180,7 @@ func TestNacosRegistry_Subscribe_del(t *testing.T) {

nacosReg := reg.(*nacosRegistry)
// deregister instance to mock instance offline
_, err = nacosReg.namingClient.DeregisterInstance(vo.DeregisterInstanceParam{
_, err = nacosReg.namingClient.Client().DeregisterInstance(vo.DeregisterInstanceParam{
Ip: "127.0.0.2", Port: 20000,
ServiceName: "providers:com.ikurento.user.UserProvider:2.0.0:guangzhou-idc",
})
Expand Down
40 changes: 17 additions & 23 deletions registry/nacos/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,21 @@ func init() {
}

// nacosServiceDiscovery is the implementation of service discovery based on nacos.
// There is a problem, the go client for nacos does not support the id field.
// There is a problem, the go namingClient for nacos does not support the id field.
// we will use the metadata to store the id of ServiceInstance
type nacosServiceDiscovery struct {
group string
// descriptor is a short string about the basic information of this instance
descriptor string

// client is the Nacos' client
client *nacosClient.NacosNamingClient
// namingClient is the Nacos' namingClient
namingClient *nacosClient.NacosNamingClient
// cache registry instances
registryInstances []registry.ServiceInstance
}

// Destroy will close the service discovery.
// Actually, it only marks the naming client as null and then return
// Actually, it only marks the naming namingClient as null and then return
func (n *nacosServiceDiscovery) Destroy() error {
for _, inst := range n.registryInstances {
err := n.Unregister(inst)
Expand All @@ -74,14 +74,14 @@ func (n *nacosServiceDiscovery) Destroy() error {
logger.Errorf("Unregister nacos instance:%+v, err:%+v", inst, err)
}
}
n.client.Close()
n.namingClient.Close()
return nil
}

// Register will register the service to nacos
func (n *nacosServiceDiscovery) Register(instance registry.ServiceInstance) error {
ins := n.toRegisterInstance(instance)
ok, err := n.client.Client().RegisterInstance(ins)
ok, err := n.namingClient.Client().RegisterInstance(ins)
if err != nil || !ok {
return perrors.WithMessage(err, "Could not register the instance. "+instance.GetServiceName())
}
Expand All @@ -90,20 +90,15 @@ func (n *nacosServiceDiscovery) Register(instance registry.ServiceInstance) erro
}

// Update will update the information
// However, because nacos client doesn't support the update API,
// However, because nacos namingClient doesn't support the update API,
// so we should repetition registration the instance
func (n *nacosServiceDiscovery) Update(instance registry.ServiceInstance) error {
// TODO(wait for nacos support)
//err := n.Unregister(instance)
//if err != nil {
// return perrors.WithStack(err)
//}
return n.Register(instance)
}

// Unregister will unregister the instance
func (n *nacosServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
ok, err := n.client.Client().DeregisterInstance(n.toDeregisterInstance(instance))
ok, err := n.namingClient.Client().DeregisterInstance(n.toDeregisterInstance(instance))
if err != nil || !ok {
return perrors.WithMessage(err, "Could not unregister the instance. "+instance.GetServiceName())
}
Expand All @@ -117,7 +112,7 @@ func (n *nacosServiceDiscovery) GetDefaultPageSize() int {

// GetServices will return the all services
func (n *nacosServiceDiscovery) GetServices() *gxset.HashSet {
services, err := n.client.Client().GetAllServicesInfo(vo.GetAllServiceInfoParam{
services, err := n.namingClient.Client().GetAllServicesInfo(vo.GetAllServiceInfoParam{
GroupName: n.group,
})

Expand All @@ -135,7 +130,7 @@ func (n *nacosServiceDiscovery) GetServices() *gxset.HashSet {

// GetInstances will return the instances of serviceName and the group
func (n *nacosServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
instances, err := n.client.Client().SelectAllInstances(vo.SelectAllInstancesParam{
instances, err := n.namingClient.Client().SelectAllInstances(vo.SelectAllInstancesParam{
ServiceName: serviceName,
GroupName: n.group,
})
Expand All @@ -161,12 +156,11 @@ func (n *nacosServiceDiscovery) GetInstances(serviceName string) []registry.Serv
Metadata: metadata,
})
}

return res
}

// GetInstancesByPage will return the instances
// Due to nacos client does not support pagination, so we have to query all instances and then return part of them
// Due to nacos namingClient does not support pagination, so we have to query all instances and then return part of them
func (n *nacosServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
all := n.GetInstances(serviceName)
res := make([]interface{}, 0, pageSize)
Expand All @@ -178,7 +172,7 @@ func (n *nacosServiceDiscovery) GetInstancesByPage(serviceName string, offset in
}

// GetHealthyInstancesByPage will return the instance
// The nacos client has an API SelectInstances, which has a parameter call HealthyOnly.
// The nacos namingClient has an API SelectInstances, which has a parameter call HealthyOnly.
// However, the healthy parameter in this method maybe false. So we can not use that API.
// Thus, we must query all instances and then do filter
func (n *nacosServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
Expand All @@ -201,7 +195,7 @@ func (n *nacosServiceDiscovery) GetHealthyInstancesByPage(serviceName string, of
}

// GetRequestInstances will return the instances
// The nacos client doesn't have batch API, so we should query those serviceNames one by one.
// The nacos namingClient doesn't have batch API, so we should query those serviceNames one by one.
func (n *nacosServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
res := make(map[string]gxpage.Pager, len(serviceNames))
for _, name := range serviceNames {
Expand All @@ -214,7 +208,7 @@ func (n *nacosServiceDiscovery) GetRequestInstances(serviceNames []string, offse
func (n *nacosServiceDiscovery) AddListener(listener registry.ServiceInstancesChangedListener) error {
for _, t := range listener.GetServiceNames().Values() {
serviceName := t.(string)
err := n.client.Client().Subscribe(&vo.SubscribeParam{
err := n.namingClient.Client().Subscribe(&vo.SubscribeParam{
ServiceName: serviceName,
SubscribeCallback: func(services []model.SubscribeService, err error) {
if err != nil {
Expand Down Expand Up @@ -282,7 +276,7 @@ func (n *nacosServiceDiscovery) toRegisterInstance(instance registry.ServiceInst
Ip: instance.GetHost(),
Port: uint64(instance.GetPort()),
Metadata: metadata,
// We must specify the weight since Java nacos client will ignore the instance whose weight is 0
// We must specify the weight since Java nacos namingClient will ignore the instance whose weight is 0
Weight: 1,
Enable: instance.IsEnable(),
Healthy: instance.IsHealthy(),
Expand Down Expand Up @@ -337,14 +331,14 @@ func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) {

client, err := nacos.NewNacosClient(remoteConfig)
if err != nil {
return nil, perrors.WithMessage(err, "create nacos client failed.")
return nil, perrors.WithMessage(err, "create nacos namingClient failed.")
}

descriptor := fmt.Sprintf("nacos-service-discovery[%s]", remoteConfig.Address)

newInstance := &nacosServiceDiscovery{
group: group,
client: client,
namingClient: client,
descriptor: descriptor,
registryInstances: []registry.ServiceInstance{},
}
Expand Down
2 changes: 1 addition & 1 deletion registry/nacos/service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestNacosServiceDiscovery_Destroy(t *testing.T) {
assert.NotNil(t, serviceDiscovery)
err = serviceDiscovery.Destroy()
assert.Nil(t, err)
assert.Nil(t, serviceDiscovery.(*nacosServiceDiscovery).client)
assert.Nil(t, serviceDiscovery.(*nacosServiceDiscovery).namingClient)
}

func prepareData() {
Expand Down