Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add: tag route static config #2304

Merged
merged 13 commits into from
May 30, 2023
11 changes: 6 additions & 5 deletions cluster/router/tag/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func (p *PriorityRouter) Route(invokers []protocol.Invoker, url *common.URL, inv
logger.Warnf("[tag router] invokers from previous router is empty")
return invokers
}
key := url.Service() + constant.TagRouterRuleSuffix
// tag is valid in application
key := strings.Join([]string{url.GetParam(constant.ApplicationKey, ""), constant.TagRouterRuleSuffix}, "")
value, ok := p.routerConfigs.Load(key)
if !ok {
return staticTag(invokers, url, invocation)
Expand All @@ -76,17 +77,17 @@ func (p *PriorityRouter) Notify(invokers []protocol.Invoker) {
if len(invokers) == 0 {
return
}
service := invokers[0].GetURL().Service()
if service == "" {
logger.Error("url service is empty")
application := invokers[0].GetURL().GetParam(constant.ApplicationKey, "")
if application == "" {
logger.Error("url application is empty")
return
}
dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration()
if dynamicConfiguration == nil {
logger.Warnf("config center does not start, please check if the configuration center has been properly configured in dubbogo.yml")
return
}
key := service + constant.TagRouterRuleSuffix
key := strings.Join([]string{application, constant.TagRouterRuleSuffix}, "")
dynamicConfiguration.AddListener(key, p)
value, err := dynamicConfiguration.GetRule(key)
if err != nil {
Expand Down
38 changes: 20 additions & 18 deletions cluster/router/tag/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ func TestRouter(t *testing.T) {
t.Run("dynamicEmptyTag_requestEmptyTag", func(t *testing.T) {
p, err := NewTagPriorityRouter()
assert.Nil(t, err)
p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.Service() + constant.TagRouterRuleSuffix,
p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix,
Force: false,
Enabled: true,
Valid: true,
Expand All @@ -149,8 +149,8 @@ func TestRouter(t *testing.T) {
t.Run("dynamicEmptyTag_requestHasTag", func(t *testing.T) {
p, err := NewTagPriorityRouter()
assert.Nil(t, err)
p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.Service() + constant.TagRouterRuleSuffix,
p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix,
Force: false,
Enabled: true,
Valid: true,
Expand All @@ -170,8 +170,8 @@ func TestRouter(t *testing.T) {
t.Run("dynamicTag_requestEmptyTag", func(t *testing.T) {
p, err := NewTagPriorityRouter()
assert.Nil(t, err)
p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.Service() + constant.TagRouterRuleSuffix,
p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix,
Force: false,
Enabled: true,
Valid: true,
Expand All @@ -194,8 +194,8 @@ func TestRouter(t *testing.T) {
t.Run("dynamicTag_emptyAddress_requestHasTag", func(t *testing.T) {
p, err := NewTagPriorityRouter()
assert.Nil(t, err)
p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.Service() + constant.TagRouterRuleSuffix,
p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix,
Force: false,
Enabled: true,
Valid: true,
Expand All @@ -218,8 +218,8 @@ func TestRouter(t *testing.T) {
t.Run("dynamicTag_address_requestHasTag", func(t *testing.T) {
p, err := NewTagPriorityRouter()
assert.Nil(t, err)
p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.Service() + constant.TagRouterRuleSuffix,
p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix,
Force: false,
Enabled: true,
Valid: true,
Expand All @@ -243,8 +243,8 @@ func TestRouter(t *testing.T) {
t.Run("dynamicTag_twoAddress_requestHasTag", func(t *testing.T) {
p, err := NewTagPriorityRouter()
assert.Nil(t, err)
p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.Service() + constant.TagRouterRuleSuffix,
p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix,
Force: false,
Enabled: true,
Valid: true,
Expand All @@ -268,8 +268,8 @@ func TestRouter(t *testing.T) {
t.Run("dynamicTag_addressNotMatch_requestHasTag", func(t *testing.T) {
p, err := NewTagPriorityRouter()
assert.Nil(t, err)
p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.Service() + constant.TagRouterRuleSuffix,
p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix,
Force: false,
Enabled: true,
Valid: true,
Expand All @@ -293,8 +293,8 @@ func TestRouter(t *testing.T) {
t.Run("dynamicTag_notValid", func(t *testing.T) {
p, err := NewTagPriorityRouter()
assert.Nil(t, err)
p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.Service() + constant.TagRouterRuleSuffix,
p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{
Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix,
Force: false,
Enabled: true,
Valid: false,
Expand Down Expand Up @@ -338,6 +338,7 @@ func TestNotify(t *testing.T) {
ivk := protocol.NewBaseInvoker(url1)
ivk1 := protocol.NewBaseInvoker(url2)
ivk2 := protocol.NewBaseInvoker(url3)
ivk.GetURL().SetParam(constant.ApplicationKey, "org.apache.dubbo.UserProvider.Test")
invokerList := make([]protocol.Invoker, 0, 3)
invokerList = append(invokerList, ivk)
invokerList = append(invokerList, ivk1)
Expand All @@ -359,7 +360,7 @@ tags:
dc, _ := mockFactory.GetDynamicConfiguration(ccUrl)
common_cfg.GetEnvInstance().SetDynamicConfiguration(dc)
p.Notify(invokerList)
value, ok := p.routerConfigs.Load(url3.Service() + constant.TagRouterRuleSuffix)
value, ok := p.routerConfigs.Load(url1.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix)
assert.True(t, ok)
routerCfg := value.(config.RouterConfig)
assert.True(t, routerCfg.Key == "org.apache.dubbo.UserProvider.Test")
Expand All @@ -374,6 +375,7 @@ tags:
ivk := protocol.NewBaseInvoker(url1)
ivk1 := protocol.NewBaseInvoker(url2)
ivk2 := protocol.NewBaseInvoker(url3)
ivk.GetURL().SetParam(constant.ApplicationKey, "org.apache.dubbo.UserProvider.Test")
invokerList := make([]protocol.Invoker, 0, 3)
invokerList = append(invokerList, ivk)
invokerList = append(invokerList, ivk1)
Expand All @@ -386,7 +388,7 @@ tags:
dc, _ := mockFactory.GetDynamicConfiguration(ccUrl)
common_cfg.GetEnvInstance().SetDynamicConfiguration(dc)
p.Notify(invokerList)
value, ok := p.routerConfigs.Load(url3.Service() + constant.TagRouterRuleSuffix)
value, ok := p.routerConfigs.Load(url1.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix)
assert.True(t, ok == false)
assert.True(t, value == nil)
})
Expand Down
1 change: 1 addition & 0 deletions config/application_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type ApplicationConfig struct {
Environment string `yaml:"environment" json:"environment,omitempty" property:"environment"`
// the metadata type. remote or local
MetadataType string `default:"local" yaml:"metadata-type" json:"metadataType,omitempty" property:"metadataType"`
Tag string `yaml:"tag" json:"tag,omitempty" property:"tag"`
}

// Prefix dubbo.application
Expand Down
1 change: 1 addition & 0 deletions config/config_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func createInstance(url *common.URL) (registry.ServiceInstance, error) {
Enable: true,
Healthy: true,
Metadata: metadata,
Tag: appConfig.Tag,
}

for _, cus := range extension.GetCustomizers() {
Expand Down
1 change: 1 addition & 0 deletions config_center/zookeeper/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu
// AddListener add listener for key
// TODO this method should has a parameter 'group', and it does not now, so we should concat group and key with '/' manually
func (c *zookeeperDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, options ...config_center.Option) {
key = strings.Join([]string{c.GetURL().GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup), key}, "/")
qualifiedKey := buildPath(c.rootPath, key)
c.cacheListener.AddListener(qualifiedKey, listener)
}
Expand Down
1 change: 1 addition & 0 deletions imports/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/roundrobin"
_ "dubbo.apache.org/dubbo-go/v3/cluster/router/meshrouter"
_ "dubbo.apache.org/dubbo-go/v3/cluster/router/polaris"
_ "dubbo.apache.org/dubbo-go/v3/cluster/router/tag"
_ "dubbo.apache.org/dubbo-go/v3/config_center/nacos"
_ "dubbo.apache.org/dubbo-go/v3/config_center/zookeeper"
_ "dubbo.apache.org/dubbo-go/v3/filter/accesslog"
Expand Down
16 changes: 14 additions & 2 deletions registry/service_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package registry

import (
"encoding/json"
url2 "net/url"
"strconv"
)

Expand Down Expand Up @@ -70,6 +71,9 @@ type ServiceInstance interface {

// SetServiceMetadata saves metadata in instance
SetServiceMetadata(info *common.MetadataInfo)

// GetTag will return the tag of the instance
GetTag() string
}

// nolint
Expand All @@ -92,6 +96,7 @@ type DefaultServiceInstance struct {
Address string
GroupName string
endpoints []*Endpoint `json:"-"`
Tag string
}

// GetID will return this instance's id. It should be unique.
Expand Down Expand Up @@ -142,6 +147,10 @@ func (d *DefaultServiceInstance) SetServiceMetadata(m *common.MetadataInfo) {
d.ServiceMetadata = m
}

func (d *DefaultServiceInstance) GetTag() string {
return d.Tag
}

// ToURLs return a list of url.
func (d *DefaultServiceInstance) ToURLs(service *common.ServiceInfo) []*common.URL {
urls := make([]*common.URL, 0, 8)
Expand All @@ -158,15 +167,17 @@ func (d *DefaultServiceInstance) ToURLs(service *common.ServiceInfo) []*common.U
url := common.NewURLWithOptions(common.WithProtocol(service.Protocol),
common.WithIp(d.Host), common.WithPort(strconv.Itoa(endpoint.Port)),
common.WithPath(service.Name), common.WithInterface(service.Name),
common.WithMethods(service.GetMethods()), common.WithParams(service.GetParams()))
common.WithMethods(service.GetMethods()), common.WithParams(service.GetParams()),
common.WithParams(url2.Values{constant.Tagkey: {d.Tag}}))
urls = append(urls, url)
}
}
} else {
url := common.NewURLWithOptions(common.WithProtocol(service.Protocol),
common.WithIp(d.Host), common.WithPort(strconv.Itoa(d.Port)),
common.WithPath(service.Name), common.WithInterface(service.Name),
common.WithMethods(service.GetMethods()), common.WithParams(service.GetParams()))
common.WithMethods(service.GetMethods()), common.WithParams(service.GetParams()),
common.WithParams(url2.Values{constant.Tagkey: {d.Tag}}))
urls = append(urls, url)
}
return urls
Expand Down Expand Up @@ -198,6 +209,7 @@ func (d *DefaultServiceInstance) Copy(endpoint *Endpoint) ServiceInstance {
Healthy: d.Healthy,
Metadata: d.Metadata,
ServiceMetadata: d.ServiceMetadata,
Tag: d.Tag,
}
dn.ID = d.GetAddress()
return dn
Expand Down
2 changes: 2 additions & 0 deletions registry/zookeeper/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func (zksd *zookeeperServiceDiscovery) toCuratorInstance(instance registry.Servi
Port: instance.GetPort(),
Payload: pl,
RegistrationTimeUTC: 0,
Tag: instance.GetTag(),
}
return cuis
}
Expand Down Expand Up @@ -327,5 +328,6 @@ func toZookeeperInstance(cris *curator_discovery.ServiceInstance) registry.Servi
Enable: true,
Healthy: true,
Metadata: md,
Tag: cris.Tag,
}
}
1 change: 1 addition & 0 deletions remoting/zookeeper/curator_discovery/service_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ type ServiceInstance struct {
Port int `json:"port,omitempty"`
Payload interface{} `json:"payload,omitempty"`
RegistrationTimeUTC int64 `json:"registrationTimeUTC,omitempty"`
Tag string `json:"tag,omitempty"`
}