Skip to content

Commit

Permalink
Merge pull request #117 from hxmhlt/config_params
Browse files Browse the repository at this point in the history
 New Ftr: allow user set custom params to register to registry
  • Loading branch information
AlexStocks authored Jul 3, 2019
2 parents bd297e8 + cc1f089 commit d59c087
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 56 deletions.
17 changes: 13 additions & 4 deletions common/config/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,32 +63,40 @@ func (env *Environment) UpdateExternalConfigMap(externalMap map[string]string) {
func (env *Environment) Configuration() *list.List {
list := list.New()
memConf := newInmemoryConfiguration()
memConf.setProperties(env.externalConfigMap)
memConf.setProperties(&(env.externalConfigMap))
list.PushBack(memConf)
return list
}

type InmemoryConfiguration struct {
store sync.Map
store *sync.Map
}

func newInmemoryConfiguration() *InmemoryConfiguration {
return &InmemoryConfiguration{}
}
func (conf *InmemoryConfiguration) setProperties(p sync.Map) {
func (conf *InmemoryConfiguration) setProperties(p *sync.Map) {
conf.store = p
}

func (conf *InmemoryConfiguration) GetProperty(key string) (bool, string) {
if conf.store == nil {
return false, ""
}

v, ok := conf.store.Load(key)
if ok {
return true, v.(string)
}
return false, ""

return false, ""
}

func (conf *InmemoryConfiguration) GetSubProperty(subKey string) map[string]struct{} {
if conf.store == nil {
return nil
}

properties := make(map[string]struct{})
conf.store.Range(func(key, value interface{}) bool {
if idx := strings.Index(key.(string), subKey); idx >= 0 {
Expand All @@ -100,5 +108,6 @@ func (conf *InmemoryConfiguration) GetSubProperty(subKey string) map[string]stru
}
return true
})

return properties
}
40 changes: 23 additions & 17 deletions config/base_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func setFieldValue(val reflect.Value, id reflect.Value, config *config.InmemoryC
setBaseValue := func(f reflect.Value) {
ok, value := config.GetProperty(getKeyPrefix(val, id) + key)
if ok {
if f.Kind() == reflect.Int64 {
switch f.Kind() {
case reflect.Int64:
x, err := strconv.Atoi(value)
if err != nil {
logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}",
Expand All @@ -120,21 +121,16 @@ func setFieldValue(val reflect.Value, id reflect.Value, config *config.InmemoryC
val.Type().Name(), val.Type().Field(i).Name, perrors.Errorf("the int64 value {%v} from config center is overflow", int64(x)))
}
}

}

if f.Kind() == reflect.String {
case reflect.String:
f.SetString(value)
}
if f.Kind() == reflect.Bool {
case reflect.Bool:
x, err := strconv.ParseBool(value)
if err != nil {
logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}",
val.Type().Name(), val.Type().Field(i).Name, err)
}
f.SetBool(x)
}
if f.Kind() == reflect.Float64 {
case reflect.Float64:
x, err := strconv.ParseFloat(value, 64)
if err != nil {
logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}",
Expand All @@ -147,7 +143,10 @@ func setFieldValue(val reflect.Value, id reflect.Value, config *config.InmemoryC
val.Type().Name(), val.Type().Field(i).Name, perrors.Errorf("the float64 value {%v} from config center is overflow", x))
}
}
default:
logger.Warnf("The kind of field {%v} is not supported ", f.Kind().String())
}

}

}
Expand Down Expand Up @@ -180,25 +179,32 @@ func setFieldValue(val reflect.Value, id reflect.Value, config *config.InmemoryC
}
if f.Kind() == reflect.Map {

//initiate config
s := reflect.New(f.Type().Elem().Elem())
prefix := s.MethodByName("Prefix").Call(nil)[0].String()
m := config.GetSubProperty(prefix)
for k := range m {
f.SetMapIndex(reflect.ValueOf(k), reflect.New(f.Type().Elem().Elem()))
if f.Type().Elem().Kind() == reflect.Ptr {
//initiate config
s := reflect.New(f.Type().Elem().Elem())
prefix := s.MethodByName("Prefix").Call(nil)[0].String()
m := config.GetSubProperty(prefix)
for k := range m {
f.SetMapIndex(reflect.ValueOf(k), reflect.New(f.Type().Elem().Elem()))
}
}

//iter := f.MapRange()

for _, k := range f.MapKeys() {
v := f.MapIndex(k)
if v.Kind() == reflect.Ptr {
switch v.Kind() {
case reflect.Ptr:
if v.Elem().Kind() == reflect.Struct {
setFieldValue(v.Elem(), k, config)
} else {
setBaseValue(v.Elem())
}
case reflect.Int64, reflect.String, reflect.Bool, reflect.Float64:
setBaseValue(v)
default:
logger.Warnf("The kind of field {%v} is not supported ", v.Kind().String())
}

}
}

Expand Down
1 change: 1 addition & 0 deletions config/config_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestConfigLoader(t *testing.T) {
assert.NotEqual(t, ConsumerConfig{}, GetConsumerConfig())
assert.NotNil(t, providerConfig)
assert.NotEqual(t, ProviderConfig{}, GetProviderConfig())
assert.Equal(t, "soa.com.ikurento.user.UserProvider", GetConsumerConfig().References["UserProvider"].Params["serviceid"])
}

func TestLoad(t *testing.T) {
Expand Down
31 changes: 18 additions & 13 deletions config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,20 @@ import (
type ReferenceConfig struct {
context context.Context
pxy *proxy.Proxy
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
Url string `yaml:"url" json:"url,omitempty" property:"url"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
async bool `yaml:"async" json:"async,omitempty" property:"async"`
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
Url string `yaml:"url" json:"url,omitempty" property:"url"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
async bool `yaml:"async" json:"async,omitempty" property:"async"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
invoker protocol.Invoker
urls []*common.URL
}
Expand Down Expand Up @@ -143,6 +144,10 @@ func (refconfig *ReferenceConfig) GetRPCService() common.RPCService {

func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap := url.Values{}
//first set user params
for k, v := range refconfig.Params {
urlMap.Set(k, v)
}
urlMap.Set(constant.INTERFACE_KEY, refconfig.InterfaceName)
urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10))
urlMap.Set(constant.CLUSTER_KEY, refconfig.Cluster)
Expand Down
4 changes: 4 additions & 0 deletions config/reference_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ func doInit() {
},
References: map[string]*ReferenceConfig{
"MockService": {
Params: map[string]string{
"serviceid": "soa.mock",
},
Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2",
InterfaceName: "MockService",
Protocol: "mock",
Expand Down Expand Up @@ -125,6 +128,7 @@ func Test_Refer(t *testing.T) {

for _, reference := range consumerConfig.References {
reference.Refer()
assert.Equal(t, "soa.mock", reference.Params["serviceid"])
assert.NotNil(t, reference.invoker)
assert.NotNil(t, reference.pxy)
}
Expand Down
4 changes: 2 additions & 2 deletions config/registry_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type RegistryConfig struct {
Group string `yaml:"group" json:"group,omitempty" property:"group"`
//for registry
Address string `yaml:"address" json:"address,omitempty" property:"address"`
Username string `yaml:"username" json:"address,omitempty" property:"username"`
Password string `yaml:"password" json:"address,omitempty" property:"password"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"`
}

func (*RegistryConfig) Prefix() string {
Expand Down
27 changes: 16 additions & 11 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,18 @@ import (

type ServiceConfig struct {
context context.Context
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` //multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` //multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
unexported *atomic.Bool
exported *atomic.Bool
rpcService common.RPCService
Expand Down Expand Up @@ -148,6 +149,10 @@ func (srvconfig *ServiceConfig) Implement(s common.RPCService) {

func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap := url.Values{}
//first set user params
for k, v := range srvconfig.Params {
urlMap.Set(k, v)
}
urlMap.Set(constant.INTERFACE_KEY, srvconfig.InterfaceName)
urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10))
urlMap.Set(constant.CLUSTER_KEY, srvconfig.Cluster)
Expand Down
3 changes: 3 additions & 0 deletions config/testdata/consumer_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ references:
methods :
- name: "GetUser"
retries: 3
params:
"serviceid":
"soa.com.ikurento.user.UserProvider"

protocol_conf:
dubbo:
Expand Down
4 changes: 2 additions & 2 deletions registry/zookeeper/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ import (

func Test_Register(t *testing.T) {
regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithParamsValue("serviceid", "soa.mock"), common.WithMethods([]string{"GetUser", "AddUser"}))

ts, reg, err := newMockZkRegistry(&regurl)
defer ts.Stop()
err = reg.Register(url)
children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*provider", children)
assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*.serviceid%3Dsoa.mock%26.*provider", children)
assert.NoError(t, err)
}

Expand Down
2 changes: 1 addition & 1 deletion remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
go func(node string) {
logger.Infof("delete zkNode{%s}", node)
if l.ListenServiceNodeEvent(node, listener) {
logger.Infof("delete content{%s}", n)
logger.Infof("delete content{%s}", node)
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
Expand Down
17 changes: 11 additions & 6 deletions remoting/zookeeper/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package zookeeper

import (
"fmt"
"sync"
"testing"
"time"
)
Expand All @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/remoting"
)

Expand Down Expand Up @@ -86,32 +87,36 @@ func TestListener(t *testing.T) {
dubbo.service.com.ikurento.user.UserProvider.warmup=100
dubbo.service.com.ikurento.user.UserProvider.cluster=failover
`

var wait sync.WaitGroup
ts, client, event := initZkData(t)
defer ts.Stop()
client.Wait.Add(1)
wait.Add(1)
go client.HandleZkEvent(event)
listener := NewZkEventListener(client)
dataListener := &mockDataListener{client: client, changedData: changedData}
dataListener := &mockDataListener{client: client, changedData: changedData, wait: &wait}
listener.ListenServiceEvent("/dubbo", dataListener)

_, err := client.Conn.Set("/dubbo/dubbo.properties", []byte(changedData), 1)
assert.NoError(t, err)
client.Wait.Wait()
wait.Wait()
assert.Equal(t, changedData, dataListener.eventList[1].Content)
client.Close()

}

type mockDataListener struct {
eventList []remoting.Event
client *ZookeeperClient
changedData string
wait *sync.WaitGroup
}

func (m *mockDataListener) DataChange(eventType remoting.Event) bool {
fmt.Println(eventType)
logger.Info(eventType)
m.eventList = append(m.eventList, eventType)
if eventType.Content == m.changedData {
m.client.Wait.Done()
m.wait.Done()
}
return true
}

0 comments on commit d59c087

Please sign in to comment.