Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Ftr: allow user set custom params to register to registry #117

Merged
merged 9 commits into from
Jul 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions common/config/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,32 +63,40 @@ func (env *Environment) UpdateExternalConfigMap(externalMap map[string]string) {
func (env *Environment) Configuration() *list.List {
list := list.New()
memConf := newInmemoryConfiguration()
memConf.setProperties(env.externalConfigMap)
memConf.setProperties(&(env.externalConfigMap))
list.PushBack(memConf)
return list
}

type InmemoryConfiguration struct {
store sync.Map
store *sync.Map
}

func newInmemoryConfiguration() *InmemoryConfiguration {
return &InmemoryConfiguration{}
}
func (conf *InmemoryConfiguration) setProperties(p sync.Map) {
func (conf *InmemoryConfiguration) setProperties(p *sync.Map) {
conf.store = p
}

func (conf *InmemoryConfiguration) GetProperty(key string) (bool, string) {
if conf.store == nil {
return false, ""
}

v, ok := conf.store.Load(key)
if ok {
return true, v.(string)
}
return false, ""

return false, ""
}

func (conf *InmemoryConfiguration) GetSubProperty(subKey string) map[string]struct{} {
if conf.store == nil {
return nil
}

properties := make(map[string]struct{})
conf.store.Range(func(key, value interface{}) bool {
if idx := strings.Index(key.(string), subKey); idx >= 0 {
Expand All @@ -100,5 +108,6 @@ func (conf *InmemoryConfiguration) GetSubProperty(subKey string) map[string]stru
}
return true
})

return properties
}
40 changes: 23 additions & 17 deletions config/base_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func setFieldValue(val reflect.Value, id reflect.Value, config *config.InmemoryC
setBaseValue := func(f reflect.Value) {
ok, value := config.GetProperty(getKeyPrefix(val, id) + key)
if ok {
if f.Kind() == reflect.Int64 {
switch f.Kind() {
case reflect.Int64:
x, err := strconv.Atoi(value)
if err != nil {
logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}",
Expand All @@ -120,21 +121,16 @@ func setFieldValue(val reflect.Value, id reflect.Value, config *config.InmemoryC
val.Type().Name(), val.Type().Field(i).Name, perrors.Errorf("the int64 value {%v} from config center is overflow", int64(x)))
}
}

}

if f.Kind() == reflect.String {
case reflect.String:
f.SetString(value)
}
if f.Kind() == reflect.Bool {
case reflect.Bool:
x, err := strconv.ParseBool(value)
if err != nil {
logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}",
val.Type().Name(), val.Type().Field(i).Name, err)
}
f.SetBool(x)
}
if f.Kind() == reflect.Float64 {
case reflect.Float64:
x, err := strconv.ParseFloat(value, 64)
if err != nil {
logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}",
Expand All @@ -147,7 +143,10 @@ func setFieldValue(val reflect.Value, id reflect.Value, config *config.InmemoryC
val.Type().Name(), val.Type().Field(i).Name, perrors.Errorf("the float64 value {%v} from config center is overflow", x))
}
}
default:
logger.Warnf("The kind of field {%v} is not supported ", f.Kind().String())
}

}

}
Expand Down Expand Up @@ -180,25 +179,32 @@ func setFieldValue(val reflect.Value, id reflect.Value, config *config.InmemoryC
}
if f.Kind() == reflect.Map {

//initiate config
s := reflect.New(f.Type().Elem().Elem())
prefix := s.MethodByName("Prefix").Call(nil)[0].String()
m := config.GetSubProperty(prefix)
for k := range m {
f.SetMapIndex(reflect.ValueOf(k), reflect.New(f.Type().Elem().Elem()))
if f.Type().Elem().Kind() == reflect.Ptr {
//initiate config
s := reflect.New(f.Type().Elem().Elem())
prefix := s.MethodByName("Prefix").Call(nil)[0].String()
m := config.GetSubProperty(prefix)
for k := range m {
f.SetMapIndex(reflect.ValueOf(k), reflect.New(f.Type().Elem().Elem()))
}
}

//iter := f.MapRange()

for _, k := range f.MapKeys() {
v := f.MapIndex(k)
if v.Kind() == reflect.Ptr {
switch v.Kind() {
case reflect.Ptr:
if v.Elem().Kind() == reflect.Struct {
setFieldValue(v.Elem(), k, config)
} else {
setBaseValue(v.Elem())
}
case reflect.Int64, reflect.String, reflect.Bool, reflect.Float64:
setBaseValue(v)
default:
logger.Warnf("The kind of field {%v} is not supported ", v.Kind().String())
}

}
}

Expand Down
1 change: 1 addition & 0 deletions config/config_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestConfigLoader(t *testing.T) {
assert.NotEqual(t, ConsumerConfig{}, GetConsumerConfig())
assert.NotNil(t, providerConfig)
assert.NotEqual(t, ProviderConfig{}, GetProviderConfig())
assert.Equal(t, "soa.com.ikurento.user.UserProvider", GetConsumerConfig().References["UserProvider"].Params["serviceid"])
}

func TestLoad(t *testing.T) {
Expand Down
31 changes: 18 additions & 13 deletions config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,20 @@ import (
type ReferenceConfig struct {
context context.Context
pxy *proxy.Proxy
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
Url string `yaml:"url" json:"url,omitempty" property:"url"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
async bool `yaml:"async" json:"async,omitempty" property:"async"`
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
Url string `yaml:"url" json:"url,omitempty" property:"url"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
async bool `yaml:"async" json:"async,omitempty" property:"async"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
invoker protocol.Invoker
urls []*common.URL
}
Expand Down Expand Up @@ -143,6 +144,10 @@ func (refconfig *ReferenceConfig) GetRPCService() common.RPCService {

func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap := url.Values{}
//first set user params
for k, v := range refconfig.Params {
urlMap.Set(k, v)
}
urlMap.Set(constant.INTERFACE_KEY, refconfig.InterfaceName)
urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10))
urlMap.Set(constant.CLUSTER_KEY, refconfig.Cluster)
Expand Down
4 changes: 4 additions & 0 deletions config/reference_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ func doInit() {
},
References: map[string]*ReferenceConfig{
"MockService": {
Params: map[string]string{
"serviceid": "soa.mock",
},
Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2",
InterfaceName: "MockService",
Protocol: "mock",
Expand Down Expand Up @@ -125,6 +128,7 @@ func Test_Refer(t *testing.T) {

for _, reference := range consumerConfig.References {
reference.Refer()
assert.Equal(t, "soa.mock", reference.Params["serviceid"])
assert.NotNil(t, reference.invoker)
assert.NotNil(t, reference.pxy)
}
Expand Down
4 changes: 2 additions & 2 deletions config/registry_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type RegistryConfig struct {
Group string `yaml:"group" json:"group,omitempty" property:"group"`
//for registry
Address string `yaml:"address" json:"address,omitempty" property:"address"`
Username string `yaml:"username" json:"address,omitempty" property:"username"`
Password string `yaml:"password" json:"address,omitempty" property:"password"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"`
}

func (*RegistryConfig) Prefix() string {
Expand Down
27 changes: 16 additions & 11 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,18 @@ import (

type ServiceConfig struct {
context context.Context
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` //multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` //multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
unexported *atomic.Bool
exported *atomic.Bool
rpcService common.RPCService
Expand Down Expand Up @@ -148,6 +149,10 @@ func (srvconfig *ServiceConfig) Implement(s common.RPCService) {

func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap := url.Values{}
//first set user params
for k, v := range srvconfig.Params {
urlMap.Set(k, v)
}
urlMap.Set(constant.INTERFACE_KEY, srvconfig.InterfaceName)
urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10))
urlMap.Set(constant.CLUSTER_KEY, srvconfig.Cluster)
Expand Down
3 changes: 3 additions & 0 deletions config/testdata/consumer_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ references:
methods :
- name: "GetUser"
retries: 3
params:
"serviceid":
"soa.com.ikurento.user.UserProvider"

protocol_conf:
dubbo:
Expand Down
4 changes: 2 additions & 2 deletions registry/zookeeper/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ import (

func Test_Register(t *testing.T) {
regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithParamsValue("serviceid", "soa.mock"), common.WithMethods([]string{"GetUser", "AddUser"}))

ts, reg, err := newMockZkRegistry(&regurl)
defer ts.Stop()
err = reg.Register(url)
children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*provider", children)
assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*.serviceid%3Dsoa.mock%26.*provider", children)
assert.NoError(t, err)
}

Expand Down
2 changes: 1 addition & 1 deletion remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
go func(node string) {
logger.Infof("delete zkNode{%s}", node)
if l.ListenServiceNodeEvent(node, listener) {
logger.Infof("delete content{%s}", n)
logger.Infof("delete content{%s}", node)
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
Expand Down
17 changes: 11 additions & 6 deletions remoting/zookeeper/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package zookeeper

import (
"fmt"
"sync"
"testing"
"time"
)
Expand All @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/remoting"
)

Expand Down Expand Up @@ -86,32 +87,36 @@ func TestListener(t *testing.T) {
dubbo.service.com.ikurento.user.UserProvider.warmup=100
dubbo.service.com.ikurento.user.UserProvider.cluster=failover
`

var wait sync.WaitGroup
ts, client, event := initZkData(t)
defer ts.Stop()
client.Wait.Add(1)
wait.Add(1)
go client.HandleZkEvent(event)
listener := NewZkEventListener(client)
dataListener := &mockDataListener{client: client, changedData: changedData}
dataListener := &mockDataListener{client: client, changedData: changedData, wait: &wait}
listener.ListenServiceEvent("/dubbo", dataListener)

_, err := client.Conn.Set("/dubbo/dubbo.properties", []byte(changedData), 1)
assert.NoError(t, err)
client.Wait.Wait()
wait.Wait()
assert.Equal(t, changedData, dataListener.eventList[1].Content)
client.Close()

}

type mockDataListener struct {
eventList []remoting.Event
client *ZookeeperClient
changedData string
wait *sync.WaitGroup
}

func (m *mockDataListener) DataChange(eventType remoting.Event) bool {
fmt.Println(eventType)
logger.Info(eventType)
m.eventList = append(m.eventList, eventType)
if eventType.Content == m.changedData {
m.client.Wait.Done()
hxmhlt marked this conversation as resolved.
Show resolved Hide resolved
m.wait.Done()
}
return true
}