Skip to content

Commit

Permalink
add: tag route static config (#2304)
Browse files Browse the repository at this point in the history
* add subscribe any value

* fix nill error bug

* fix bug that commentted by niu

Signed-off-by: wudong <ustbwmd@163.com>

* add static

* use strings.Join

* fix test error

---------

Signed-off-by: wudong <ustbwmd@163.com>
Co-authored-by: Ken Liu <ken.lj.hz@gmail.com>
  • Loading branch information
wudong5 and chickenlj authored May 30, 2023
1 parent 66a152c commit afb8a61
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 25 deletions.
11 changes: 6 additions & 5 deletions cluster/router/tag/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func (p *PriorityRouter) Route(invokers []protocol.Invoker, url *common.URL, inv
logger.Warnf("[tag router] invokers from previous router is empty")
return invokers
}
key := url.Service() + constant.TagRouterRuleSuffix
// tag is valid in application
key := strings.Join([]string{url.GetParam(constant.ApplicationKey, ""), constant.TagRouterRuleSuffix}, "")
value, ok := p.routerConfigs.Load(key)
if !ok {
return staticTag(invokers, url, invocation)
Expand All @@ -76,17 +77,17 @@ func (p *PriorityRouter) Notify(invokers []protocol.Invoker) {
if len(invokers) == 0 {
return
}
service := invokers[0].GetURL().Service()
if service == "" {
logger.Error("url service is empty")
application := invokers[0].GetURL().GetParam(constant.ApplicationKey, "")
if application == "" {
logger.Error("url application is empty")
return
}
dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration()
if dynamicConfiguration == nil {
logger.Warnf("config center does not start, please check if the configuration center has been properly configured in dubbogo.yml")
return
}
key := service + constant.TagRouterRuleSuffix
key := strings.Join([]string{application, constant.TagRouterRuleSuffix}, "")
dynamicConfiguration.AddListener(key, p)
value, err := dynamicConfiguration.GetRule(key)
if err != nil {
Expand Down
38 changes: 20 additions & 18 deletions cluster/router/tag/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ func TestRouter(t *testing.T) {
t.Run("dynamicEmptyTag_requestEmptyTag", func(t *testing.T) {
p, err := NewTagPriorityRouter()
assert.Nil(t, err)
p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.Service() + constant.TagRouterRuleSuffix,
p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix,
Force: false,
Enabled: true,
Valid: true,
Expand All @@ -149,8 +149,8 @@ func TestRouter(t *testing.T) {
t.Run("dynamicEmptyTag_requestHasTag", func(t *testing.T) {
p, err := NewTagPriorityRouter()
assert.Nil(t, err)
p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.Service() + constant.TagRouterRuleSuffix,
p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix,
Force: false,
Enabled: true,
Valid: true,
Expand All @@ -170,8 +170,8 @@ func TestRouter(t *testing.T) {
t.Run("dynamicTag_requestEmptyTag", func(t *testing.T) {
p, err := NewTagPriorityRouter()
assert.Nil(t, err)
p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.Service() + constant.TagRouterRuleSuffix,
p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix,
Force: false,
Enabled: true,
Valid: true,
Expand All @@ -194,8 +194,8 @@ func TestRouter(t *testing.T) {
t.Run("dynamicTag_emptyAddress_requestHasTag", func(t *testing.T) {
p, err := NewTagPriorityRouter()
assert.Nil(t, err)
p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.Service() + constant.TagRouterRuleSuffix,
p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix,
Force: false,
Enabled: true,
Valid: true,
Expand All @@ -218,8 +218,8 @@ func TestRouter(t *testing.T) {
t.Run("dynamicTag_address_requestHasTag", func(t *testing.T) {
p, err := NewTagPriorityRouter()
assert.Nil(t, err)
p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.Service() + constant.TagRouterRuleSuffix,
p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix,
Force: false,
Enabled: true,
Valid: true,
Expand All @@ -243,8 +243,8 @@ func TestRouter(t *testing.T) {
t.Run("dynamicTag_twoAddress_requestHasTag", func(t *testing.T) {
p, err := NewTagPriorityRouter()
assert.Nil(t, err)
p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.Service() + constant.TagRouterRuleSuffix,
p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix,
Force: false,
Enabled: true,
Valid: true,
Expand All @@ -268,8 +268,8 @@ func TestRouter(t *testing.T) {
t.Run("dynamicTag_addressNotMatch_requestHasTag", func(t *testing.T) {
p, err := NewTagPriorityRouter()
assert.Nil(t, err)
p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.Service() + constant.TagRouterRuleSuffix,
p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix,
Force: false,
Enabled: true,
Valid: true,
Expand All @@ -293,8 +293,8 @@ func TestRouter(t *testing.T) {
t.Run("dynamicTag_notValid", func(t *testing.T) {
p, err := NewTagPriorityRouter()
assert.Nil(t, err)
p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.Service() + constant.TagRouterRuleSuffix,
p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix,
Force: false,
Enabled: true,
Valid: false,
Expand Down Expand Up @@ -338,6 +338,7 @@ func TestNotify(t *testing.T) {
ivk := protocol.NewBaseInvoker(url1)
ivk1 := protocol.NewBaseInvoker(url2)
ivk2 := protocol.NewBaseInvoker(url3)
ivk.GetURL().SetParam(constant.ApplicationKey, "org.apache.dubbo.UserProvider.Test")
invokerList := make([]protocol.Invoker, 0, 3)
invokerList = append(invokerList, ivk)
invokerList = append(invokerList, ivk1)
Expand All @@ -359,7 +360,7 @@ tags:
dc, _ := mockFactory.GetDynamicConfiguration(ccUrl)
common_cfg.GetEnvInstance().SetDynamicConfiguration(dc)
p.Notify(invokerList)
value, ok := p.routerConfigs.Load(url3.Service() + constant.TagRouterRuleSuffix)
value, ok := p.routerConfigs.Load(url1.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix)
assert.True(t, ok)
routerCfg := value.(config.RouterConfig)
assert.True(t, routerCfg.Key == "org.apache.dubbo.UserProvider.Test")
Expand All @@ -374,6 +375,7 @@ tags:
ivk := protocol.NewBaseInvoker(url1)
ivk1 := protocol.NewBaseInvoker(url2)
ivk2 := protocol.NewBaseInvoker(url3)
ivk.GetURL().SetParam(constant.ApplicationKey, "org.apache.dubbo.UserProvider.Test")
invokerList := make([]protocol.Invoker, 0, 3)
invokerList = append(invokerList, ivk)
invokerList = append(invokerList, ivk1)
Expand All @@ -386,7 +388,7 @@ tags:
dc, _ := mockFactory.GetDynamicConfiguration(ccUrl)
common_cfg.GetEnvInstance().SetDynamicConfiguration(dc)
p.Notify(invokerList)
value, ok := p.routerConfigs.Load(url3.Service() + constant.TagRouterRuleSuffix)
value, ok := p.routerConfigs.Load(url1.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix)
assert.True(t, ok == false)
assert.True(t, value == nil)
})
Expand Down
1 change: 1 addition & 0 deletions config/application_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type ApplicationConfig struct {
Environment string `yaml:"environment" json:"environment,omitempty" property:"environment"`
// the metadata type. remote or local
MetadataType string `default:"local" yaml:"metadata-type" json:"metadataType,omitempty" property:"metadataType"`
Tag string `yaml:"tag" json:"tag,omitempty" property:"tag"`
}

// Prefix dubbo.application
Expand Down
1 change: 1 addition & 0 deletions config/config_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func createInstance(url *common.URL) (registry.ServiceInstance, error) {
Enable: true,
Healthy: true,
Metadata: metadata,
Tag: appConfig.Tag,
}

for _, cus := range extension.GetCustomizers() {
Expand Down
1 change: 1 addition & 0 deletions config_center/zookeeper/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu
// AddListener add listener for key
// TODO this method should has a parameter 'group', and it does not now, so we should concat group and key with '/' manually
func (c *zookeeperDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, options ...config_center.Option) {
key = strings.Join([]string{c.GetURL().GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup), key}, "/")
qualifiedKey := buildPath(c.rootPath, key)
c.cacheListener.AddListener(qualifiedKey, listener)
}
Expand Down
1 change: 1 addition & 0 deletions imports/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/roundrobin"
_ "dubbo.apache.org/dubbo-go/v3/cluster/router/meshrouter"
_ "dubbo.apache.org/dubbo-go/v3/cluster/router/polaris"
_ "dubbo.apache.org/dubbo-go/v3/cluster/router/tag"
_ "dubbo.apache.org/dubbo-go/v3/config_center/nacos"
_ "dubbo.apache.org/dubbo-go/v3/config_center/zookeeper"
_ "dubbo.apache.org/dubbo-go/v3/filter/accesslog"
Expand Down
16 changes: 14 additions & 2 deletions registry/service_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package registry

import (
"encoding/json"
url2 "net/url"
"strconv"
)

Expand Down Expand Up @@ -70,6 +71,9 @@ type ServiceInstance interface {

// SetServiceMetadata saves metadata in instance
SetServiceMetadata(info *common.MetadataInfo)

// GetTag will return the tag of the instance
GetTag() string
}

// nolint
Expand All @@ -92,6 +96,7 @@ type DefaultServiceInstance struct {
Address string
GroupName string
endpoints []*Endpoint `json:"-"`
Tag string
}

// GetID will return this instance's id. It should be unique.
Expand Down Expand Up @@ -142,6 +147,10 @@ func (d *DefaultServiceInstance) SetServiceMetadata(m *common.MetadataInfo) {
d.ServiceMetadata = m
}

func (d *DefaultServiceInstance) GetTag() string {
return d.Tag
}

// ToURLs return a list of url.
func (d *DefaultServiceInstance) ToURLs(service *common.ServiceInfo) []*common.URL {
urls := make([]*common.URL, 0, 8)
Expand All @@ -158,15 +167,17 @@ func (d *DefaultServiceInstance) ToURLs(service *common.ServiceInfo) []*common.U
url := common.NewURLWithOptions(common.WithProtocol(service.Protocol),
common.WithIp(d.Host), common.WithPort(strconv.Itoa(endpoint.Port)),
common.WithPath(service.Name), common.WithInterface(service.Name),
common.WithMethods(service.GetMethods()), common.WithParams(service.GetParams()))
common.WithMethods(service.GetMethods()), common.WithParams(service.GetParams()),
common.WithParams(url2.Values{constant.Tagkey: {d.Tag}}))
urls = append(urls, url)
}
}
} else {
url := common.NewURLWithOptions(common.WithProtocol(service.Protocol),
common.WithIp(d.Host), common.WithPort(strconv.Itoa(d.Port)),
common.WithPath(service.Name), common.WithInterface(service.Name),
common.WithMethods(service.GetMethods()), common.WithParams(service.GetParams()))
common.WithMethods(service.GetMethods()), common.WithParams(service.GetParams()),
common.WithParams(url2.Values{constant.Tagkey: {d.Tag}}))
urls = append(urls, url)
}
return urls
Expand Down Expand Up @@ -198,6 +209,7 @@ func (d *DefaultServiceInstance) Copy(endpoint *Endpoint) ServiceInstance {
Healthy: d.Healthy,
Metadata: d.Metadata,
ServiceMetadata: d.ServiceMetadata,
Tag: d.Tag,
}
dn.ID = d.GetAddress()
return dn
Expand Down
2 changes: 2 additions & 0 deletions registry/zookeeper/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func (zksd *zookeeperServiceDiscovery) toCuratorInstance(instance registry.Servi
Port: instance.GetPort(),
Payload: pl,
RegistrationTimeUTC: 0,
Tag: instance.GetTag(),
}
return cuis
}
Expand Down Expand Up @@ -327,5 +328,6 @@ func toZookeeperInstance(cris *curator_discovery.ServiceInstance) registry.Servi
Enable: true,
Healthy: true,
Metadata: md,
Tag: cris.Tag,
}
}
1 change: 1 addition & 0 deletions remoting/zookeeper/curator_discovery/service_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ type ServiceInstance struct {
Port int `json:"port,omitempty"`
Payload interface{} `json:"payload,omitempty"`
RegistrationTimeUTC int64 `json:"registrationTimeUTC,omitempty"`
Tag string `json:"tag,omitempty"`
}

0 comments on commit afb8a61

Please sign in to comment.