Skip to content

Commit

Permalink
Fix zookeeper and nacos issues working as registry, metadata and conf…
Browse files Browse the repository at this point in the history
…igcenter (#2369)
  • Loading branch information
chickenlj authored Nov 3, 2023
1 parent 410e2f8 commit e8b4ab5
Show file tree
Hide file tree
Showing 23 changed files with 352 additions and 85 deletions.
5 changes: 5 additions & 0 deletions config/instance/metadata_report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func (m *mockMetadataReportFactory) CreateMetadataReport(*common.URL) report.Met

type mockMetadataReport struct{}

func (m mockMetadataReport) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) {
//TODO implement me
panic("implement me")
}

func (m mockMetadataReport) RegisterServiceAppMapping(string, string, string) error {
panic("implement me")
}
Expand Down
12 changes: 6 additions & 6 deletions metadata/mapping/metadata/service_name_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
)

const (
defaultGroup = "mapping"
DefaultGroup = "mapping"
slash = "/"
)

Expand Down Expand Up @@ -68,7 +68,7 @@ func (d *MetadataServiceNameMapping) Map(url *common.URL) error {
if metadataReport == nil {
logger.Info("get metadata report instance is nil, metadata service will be enabled!")
} else {
err := metadataReport.RegisterServiceAppMapping(serviceInterface, defaultGroup, appName)
err := metadataReport.RegisterServiceAppMapping(serviceInterface, DefaultGroup, appName)
if err != nil {
return perrors.WithStack(err)
}
Expand All @@ -80,20 +80,20 @@ func (d *MetadataServiceNameMapping) Map(url *common.URL) error {
func (d *MetadataServiceNameMapping) Get(url *common.URL, listener registry.MappingListener) (*gxset.HashSet, error) {
serviceInterface := url.GetParam(constant.InterfaceKey, "")
metadataReport := instance.GetMetadataReportInstance()
return metadataReport.GetServiceAppMapping(serviceInterface, defaultGroup, listener)
return metadataReport.GetServiceAppMapping(serviceInterface, DefaultGroup, listener)
}

func (d *MetadataServiceNameMapping) Remove(url *common.URL) error {
serviceInterface := url.GetParam(constant.InterfaceKey, "")
metadataReport := instance.GetMetadataReportInstance()
return metadataReport.RemoveServiceAppMappingListener(serviceInterface, defaultGroup)
return metadataReport.RemoveServiceAppMappingListener(serviceInterface, DefaultGroup)
}

// buildMappingKey will return mapping key, it looks like defaultGroup/serviceInterface
// buildMappingKey will return mapping key, it looks like DefaultGroup/serviceInterface
func (d *MetadataServiceNameMapping) buildMappingKey(serviceInterface string) string {
// the issue : https://github.com/apache/dubbo/issues/4671
// so other params are ignored and remove, including group string, version string, protocol string
return defaultGroup + slash + serviceInterface
return DefaultGroup + slash + serviceInterface
}

var (
Expand Down
6 changes: 6 additions & 0 deletions metadata/report/delegate/delegate_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
)

import (
gxset "github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/log/logger"

"github.com/go-co-op/gocron"
Expand Down Expand Up @@ -311,3 +312,8 @@ func (mr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifie
}
return false
}

func (mr *MetadataReport) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) {
//TODO implement me
panic("implement me")
}
5 changes: 5 additions & 0 deletions metadata/report/etcd/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ type etcdMetadataReport struct {
root string
}

func (e *etcdMetadataReport) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) {
//TODO implement me
panic("implement me")
}

// GetAppMetadata get metadata info from etcd
func (e *etcdMetadataReport) GetAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) {
key := e.getNodeKey(metadataIdentifier)
Expand Down
62 changes: 52 additions & 10 deletions metadata/report/nacos/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,21 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metadata/identifier"
"dubbo.apache.org/dubbo-go/v3/metadata/mapping/metadata"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
"dubbo.apache.org/dubbo-go/v3/metadata/report/factory"
"dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/remoting/nacos"
)

const (
// the number is a little big tricky
// it will be used in query which looks up all keys with the target group
// now, one key represents one application
// so only a group has more than 9999 applications will failed
maxKeysNum = 9999
)

func init() {
mf := &nacosMetadataReportFactory{}
extension.SetMetadataReportFactory("nacos", func() factory.MetadataReportFactory {
Expand All @@ -55,13 +64,14 @@ func init() {
// of MetadataReport based on nacos.
type nacosMetadataReport struct {
client *nacosClient.NacosConfigClient
group string
}

// GetAppMetadata get metadata info from nacos
func (n *nacosMetadataReport) GetAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) {
data, err := n.getConfig(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
Group: metadataIdentifier.Group,
Group: n.group,
})
if err != nil {
return nil, err
Expand All @@ -84,7 +94,7 @@ func (n *nacosMetadataReport) PublishAppMetadata(metadataIdentifier *identifier.

return n.storeMetadata(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
Group: metadataIdentifier.Group,
Group: n.group,
Content: string(data),
})
}
Expand All @@ -93,7 +103,7 @@ func (n *nacosMetadataReport) PublishAppMetadata(metadataIdentifier *identifier.
func (n *nacosMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
return n.storeMetadata(vo.ConfigParam{
DataId: providerIdentifier.GetIdentifierKey(),
Group: providerIdentifier.Group,
Group: n.group,
Content: serviceDefinitions,
})
}
Expand All @@ -102,7 +112,7 @@ func (n *nacosMetadataReport) StoreProviderMetadata(providerIdentifier *identifi
func (n *nacosMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *identifier.MetadataIdentifier, serviceParameterString string) error {
return n.storeMetadata(vo.ConfigParam{
DataId: consumerMetadataIdentifier.GetIdentifierKey(),
Group: consumerMetadataIdentifier.Group,
Group: n.group,
Content: serviceParameterString,
})
}
Expand All @@ -111,7 +121,7 @@ func (n *nacosMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *
func (n *nacosMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier, url *common.URL) error {
return n.storeMetadata(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
Group: metadataIdentifier.Group,
Group: n.group,
Content: url.String(),
})
}
Expand All @@ -120,15 +130,15 @@ func (n *nacosMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier
func (n *nacosMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier) error {
return n.deleteMetadata(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
Group: metadataIdentifier.Group,
Group: n.group,
})
}

// GetExportedURLs gets the urls.
func (n *nacosMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) ([]string, error) {
return n.getConfigAsArray(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
Group: metadataIdentifier.Group,
Group: n.group,
})
}

Expand All @@ -151,7 +161,7 @@ func (n *nacosMetadataReport) GetSubscribedURLs(subscriberMetadataIdentifier *id
func (n *nacosMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) (string, error) {
return n.getConfig(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
Group: metadataIdentifier.Group,
Group: n.group,
})
}

Expand Down Expand Up @@ -288,6 +298,37 @@ func (n *nacosMetadataReport) GetServiceAppMapping(key string, group string, lis
return set, nil
}

// GetConfigKeysByGroup will return all keys with the group
func (n *nacosMetadataReport) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) {
group = n.resolvedGroup(group)
page, err := n.client.Client().SearchConfig(vo.SearchConfigParam{
Search: "accurate",
Group: group,
PageNo: 1,
// actually it's impossible for user to create 9999 application under one group
PageSize: maxKeysNum,
})

result := gxset.NewSet()
if err != nil {
return result, perrors.WithMessage(err, "can not find the configClient config")
}
for _, itm := range page.PageItems {
result.Add(itm.DataId)
}
return result, nil
}

// resolvedGroup will regular the group. Now, it will replace the '/' with '-'.
// '/' is a special character for nacos
func (n *nacosMetadataReport) resolvedGroup(group string) string {
if len(group) <= 0 {
group = metadata.DefaultGroup
return group
}
return strings.ReplaceAll(group, "/", "-")
}

// RemoveServiceAppMappingListener remove the serviceMapping listener from metadata center
func (n *nacosMetadataReport) RemoveServiceAppMappingListener(key string, group string) error {
return n.removeServiceMappingListener(key, group)
Expand All @@ -299,13 +340,14 @@ type nacosMetadataReportFactory struct{}
func (n *nacosMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {
url.SetParam(constant.NacosNamespaceID, url.GetParam(constant.MetadataReportNamespaceKey, ""))
url.SetParam(constant.TimeoutKey, url.GetParam(constant.TimeoutKey, constant.DefaultRegTimeout))
url.SetParam(constant.NacosGroupKey, url.GetParam(constant.MetadataReportGroupKey, constant.ServiceDiscoveryDefaultGroup))
group := url.GetParam(constant.MetadataReportGroupKey, constant.ServiceDiscoveryDefaultGroup)
url.SetParam(constant.NacosGroupKey, group)
url.SetParam(constant.NacosUsername, url.Username)
url.SetParam(constant.NacosPassword, url.Password)
client, err := nacos.NewNacosConfigClientByUrl(url)
if err != nil {
logger.Errorf("Could not create nacos metadata report. URL: %s", url.String())
return nil
}
return &nacosMetadataReport{client: client}
return &nacosMetadataReport{client: client, group: group}
}
2 changes: 2 additions & 0 deletions metadata/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,6 @@ type MetadataReport interface {

// RemoveServiceAppMappingListener remove the serviceMapping listener by key and group
RemoveServiceAppMappingListener(string, string) error

GetConfigKeysByGroup(group string) (*gxset.HashSet, error)
}
100 changes: 100 additions & 0 deletions metadata/report/zookeeper/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package zookeeper

import (
"strings"
"sync"
)

import (
gxset "github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/log/logger"
)

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/remoting"
"dubbo.apache.org/dubbo-go/v3/remoting/zookeeper"
)

// CacheListener defines keyListeners and rootPath
type CacheListener struct {
// key is zkNode Path and value is set of listeners
keyListeners sync.Map
zkEventListener *zookeeper.ZkEventListener
rootPath string
}

// NewCacheListener creates a new CacheListener
func NewCacheListener(rootPath string, listener *zookeeper.ZkEventListener) *CacheListener {
return &CacheListener{zkEventListener: listener, rootPath: rootPath}
}

// AddListener will add a listener if loaded
func (l *CacheListener) AddListener(key string, listener registry.MappingListener) {
// FIXME do not use Client.ExistW, cause it has a bug(can not watch zk node that do not exist)
_, _, _, err := l.zkEventListener.Client.Conn.ExistsW(key)
// reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure
// make a map[your type]struct{} like set in java
if err != nil {
return
}
listeners, loaded := l.keyListeners.LoadOrStore(key, map[registry.MappingListener]struct{}{listener: {}})
if loaded {
listeners.(map[registry.MappingListener]struct{})[listener] = struct{}{}
l.keyListeners.Store(key, listeners)
}
}

// RemoveListener will delete a listener if loaded
func (l *CacheListener) RemoveListener(key string, listener registry.MappingListener) {
listeners, loaded := l.keyListeners.Load(key)
if loaded {
delete(listeners.(map[registry.MappingListener]struct{}), listener)
}
}

// DataChange changes all listeners' event
func (l *CacheListener) DataChange(event remoting.Event) bool {
if listeners, ok := l.keyListeners.Load(event.Path); ok {
for listener := range listeners.(map[registry.MappingListener]struct{}) {
appNames := strings.Split(event.Content, constant.CommaSeparator)
set := gxset.NewSet()
for _, e := range appNames {
set.Add(e)
}
err := listener.OnEvent(registry.NewServiceMappingChangedEvent(l.pathToKey(event.Path), set))
if err != nil {
logger.Error("Error notify mapping change event.", err)
return false
}
}
return true
}
return false
}

func (l *CacheListener) pathToKey(path string) string {
if len(path) == 0 {
return path
}
groupKey := strings.Replace(strings.Replace(path, l.rootPath+constant.PathSeparator, "", -1), constant.PathSeparator, constant.DotSeparator, -1)
return groupKey[strings.Index(groupKey, constant.DotSeparator)+1:]
}
Loading

0 comments on commit e8b4ab5

Please sign in to comment.