Skip to content

Commit

Permalink
Merge branch 'develop' of https://github.com/apache/dubbo-go into dev…
Browse files Browse the repository at this point in the history
…elop
  • Loading branch information
AlexStocks committed Dec 16, 2020
2 parents 0d9d7c0 + b5365b3 commit e2954fe
Show file tree
Hide file tree
Showing 46 changed files with 104 additions and 126 deletions.
9 changes: 9 additions & 0 deletions cluster/router/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type RouterChain struct {
notify chan struct{}
// Address cache
cache atomic.Value
// init
init sync.Once
}

// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
Expand Down Expand Up @@ -111,6 +113,13 @@ func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
c.invokers = invokers
c.mutex.Unlock()

// it should trigger init router for first call
c.init.Do(func() {
go func() {
c.notify <- struct{}{}
}()
})

c.count++
now := time.Now()
if c.count >= countThreshold && now.Sub(c.last) >= timeThreshold {
Expand Down
1 change: 0 additions & 1 deletion cluster/router/healthcheck/default_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol
return int64(sleepWindow)
}


// GetRequestSuccessiveFailureThreshold return the requestSuccessiveFailureThreshold bound to this DefaultHealthChecker
func (c *DefaultHealthChecker) GetRequestSuccessiveFailureThreshold() int32 {
return c.requestSuccessiveFailureThreshold
Expand Down
3 changes: 1 addition & 2 deletions common/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[str
// Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error
// }
func (p *Proxy) Implement(v common.RPCService) {

// check parameters, incoming interface must be a elem's pointer.
valueOf := reflect.ValueOf(v)
logger.Debugf("[Implement] reflect.TypeOf: %s", valueOf.String())
Expand Down Expand Up @@ -145,7 +144,7 @@ func (p *Proxy) Implement(v common.RPCService) {
inv.SetAttachments(k, value)
}

// add user setAttachment. It is compatibility with previous versions.
// add user setAttachment. It is compatibility with previous versions.
atm := invCtx.Value(constant.AttachmentKey)
if m, ok := atm.(map[string]string); ok {
for k, value := range m {
Expand Down
2 changes: 1 addition & 1 deletion common/proxy/proxy_factory/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
args := invocation.Arguments()

// get service
svc := common.ServiceMap.GetService(proto, path)
svc := common.ServiceMap.GetServiceByServiceKey(proto, url.ServiceKey())
if svc == nil {
logger.Errorf("cannot find service [%s] in %s", path, proto)
result.SetError(perrors.Errorf("cannot find service [%s] in %s", path, proto))
Expand Down
28 changes: 17 additions & 11 deletions common/rpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,17 @@ type serviceMap struct {
}

// GetService gets a service defination by protocol and name
func (sm *serviceMap) GetService(protocol, name string) *Service {
func (sm *serviceMap) GetService(protocol, interfaceName, group, version string) *Service {
serviceKey := ServiceKey(interfaceName, group, version)
return sm.GetServiceByServiceKey(protocol, serviceKey)
}

// GetService gets a service defination by protocol and service key
func (sm *serviceMap) GetServiceByServiceKey(protocol, serviceKey string) *Service {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
if s, ok := sm.serviceMap[protocol]; ok {
if srv, ok := s[name]; ok {
if srv, ok := s[serviceKey]; ok {
return srv
}
return nil
Expand All @@ -180,7 +186,7 @@ func (sm *serviceMap) GetInterface(interfaceName string) []*Service {
}

// Register registers a service by @interfaceName and @protocol
func (sm *serviceMap) Register(interfaceName, protocol string, rcvr RPCService) (string, error) {
func (sm *serviceMap) Register(interfaceName, protocol, group, version string, rcvr RPCService) (string, error) {
if sm.serviceMap[protocol] == nil {
sm.serviceMap[protocol] = make(map[string]*Service)
}
Expand All @@ -203,8 +209,8 @@ func (sm *serviceMap) Register(interfaceName, protocol string, rcvr RPCService)
return "", perrors.New(s)
}

sname = rcvr.Reference()
if server := sm.GetService(protocol, sname); server != nil {
sname = ServiceKey(interfaceName, group, version)
if server := sm.GetService(protocol, interfaceName, group, version); server != nil {
return "", perrors.New("service already defined: " + sname)
}
s.name = sname
Expand All @@ -228,9 +234,9 @@ func (sm *serviceMap) Register(interfaceName, protocol string, rcvr RPCService)
}

// UnRegister cancels a service by @interfaceName, @protocol and @serviceId
func (sm *serviceMap) UnRegister(interfaceName, protocol, serviceId string) error {
if protocol == "" || serviceId == "" {
return perrors.New("protocol or serviceName is nil")
func (sm *serviceMap) UnRegister(interfaceName, protocol, serviceKey string) error {
if protocol == "" || serviceKey == "" {
return perrors.New("protocol or serviceKey is nil")
}

var (
Expand All @@ -248,9 +254,9 @@ func (sm *serviceMap) UnRegister(interfaceName, protocol, serviceId string) erro
if !ok {
return perrors.New("no services for " + protocol)
}
s, ok := svcs[serviceId]
s, ok := svcs[serviceKey]
if !ok {
return perrors.New("no service for " + serviceId)
return perrors.New("no service for " + serviceKey)
}
svrs, ok = sm.interfaceMap[interfaceName]
if !ok {
Expand All @@ -276,7 +282,7 @@ func (sm *serviceMap) UnRegister(interfaceName, protocol, serviceId string) erro
sm.interfaceMap[interfaceName] = append(sm.interfaceMap[interfaceName], svrs[i])
}
}
delete(svcs, serviceId)
delete(svcs, serviceKey)
if len(sm.serviceMap[protocol]) == 0 {
delete(sm.serviceMap, protocol)
}
Expand Down
28 changes: 14 additions & 14 deletions common/rpc_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,23 @@ func TestServiceMapRegister(t *testing.T) {
// lowercase
s0 := &testService{}
// methods, err := ServiceMap.Register("testporotocol", s0)
_, err := ServiceMap.Register(testInterfaceName, "testporotocol", s0)
_, err := ServiceMap.Register(testInterfaceName, "testporotocol", "", "v0", s0)
assert.EqualError(t, err, "type testService is not exported")

// succ
s := &TestService{}
methods, err := ServiceMap.Register(testInterfaceName, "testporotocol", s)
methods, err := ServiceMap.Register(testInterfaceName, "testporotocol", "", "v1", s)
assert.NoError(t, err)
assert.Equal(t, "MethodOne,MethodThree,methodTwo", methods)

// repeat
_, err = ServiceMap.Register(testInterfaceName, "testporotocol", s)
assert.EqualError(t, err, "service already defined: com.test.Path")
_, err = ServiceMap.Register(testInterfaceName, "testporotocol", "", "v1", s)
assert.EqualError(t, err, "service already defined: testService:v1")

// no method
s1 := &TestService1{}
_, err = ServiceMap.Register(testInterfaceName, "testporotocol", s1)
assert.EqualError(t, err, "type com.test.Path1 has no exported methods of suitable type")
_, err = ServiceMap.Register(testInterfaceName, "testporotocol", "", "v2", s1)
assert.EqualError(t, err, "type testService:v2 has no exported methods of suitable type")

ServiceMap = &serviceMap{
serviceMap: make(map[string]map[string]*Service),
Expand All @@ -111,22 +111,22 @@ func TestServiceMapRegister(t *testing.T) {

func TestServiceMapUnRegister(t *testing.T) {
s := &TestService{}
_, err := ServiceMap.Register("TestService", testProtocol, s)
_, err := ServiceMap.Register("TestService", testProtocol, "", "v1", s)
assert.NoError(t, err)
assert.NotNil(t, ServiceMap.GetService(testProtocol, referenceTestPath))
assert.NotNil(t, ServiceMap.GetService(testProtocol, "TestService", "", "v1"))
assert.Equal(t, 1, len(ServiceMap.GetInterface("TestService")))

err = ServiceMap.UnRegister("", "", referenceTestPath)
assert.EqualError(t, err, "protocol or serviceName is nil")
err = ServiceMap.UnRegister("", "", ServiceKey("TestService", "", "v1"))
assert.EqualError(t, err, "protocol or serviceKey is nil")

err = ServiceMap.UnRegister("", "protocol", referenceTestPath)
err = ServiceMap.UnRegister("", "protocol", ServiceKey("TestService", "", "v1"))
assert.EqualError(t, err, "no services for protocol")

err = ServiceMap.UnRegister("", testProtocol, referenceTestPathDistinct)
assert.EqualError(t, err, "no service for com.test.Path1")
err = ServiceMap.UnRegister("", testProtocol, ServiceKey("TestService", "", "v0"))
assert.EqualError(t, err, "no service for TestService:v0")

// succ
err = ServiceMap.UnRegister("TestService", testProtocol, referenceTestPath)
err = ServiceMap.UnRegister("TestService", testProtocol, ServiceKey("TestService", "", "v1"))
assert.NoError(t, err)
}

Expand Down
2 changes: 1 addition & 1 deletion common/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type URL struct {
noCopy noCopy

baseUrl
Path string // like /com.ikurento.dubbo.UserProvider3
Path string // like /com.ikurento.dubbo.UserProvider
Username string
Password string
Methods []string
Expand Down
5 changes: 1 addition & 4 deletions config/application_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,5 @@ func (c *ApplicationConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
return err
}
type plain ApplicationConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
}
return nil
return unmarshal((*plain)(c))
}
4 changes: 0 additions & 4 deletions config/base_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ import (
"github.com/apache/dubbo-go/common/logger"
)

type multiConfiger interface {
Prefix() string
}

// BaseConfig is the common configuration for provider and consumer
type BaseConfig struct {
ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"`
Expand Down
7 changes: 2 additions & 5 deletions config/config_center_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

import (
"github.com/creasty/defaults"
perrors "github.com/pkg/errors"
)

import (
Expand All @@ -35,7 +36,6 @@ import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
perrors "github.com/pkg/errors"
)

// ConfigCenterConfig is configuration for config center
Expand Down Expand Up @@ -69,10 +69,7 @@ func (c *ConfigCenterConfig) UnmarshalYAML(unmarshal func(interface{}) error) er
return err
}
type plain ConfigCenterConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
}
return nil
return unmarshal((*plain)(c))
}

// GetUrlMap gets url map from ConfigCenterConfig
Expand Down
6 changes: 3 additions & 3 deletions config/config_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestLoad(t *testing.T) {

conServices = map[string]common.RPCService{}
proServices = map[string]common.RPCService{}
err := common.ServiceMap.UnRegister("com.MockService", "mock", "MockService")
err := common.ServiceMap.UnRegister("com.MockService", "mock", common.ServiceKey("com.MockService", "huadong_idc", "1.0.0"))
assert.Nil(t, err)
consumerConfig = nil
providerConfig = nil
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestLoadWithSingleReg(t *testing.T) {

conServices = map[string]common.RPCService{}
proServices = map[string]common.RPCService{}
common.ServiceMap.UnRegister("com.MockService", "mock", "MockService")
common.ServiceMap.UnRegister("com.MockService", "mock", common.ServiceKey("com.MockService", "huadong_idc", "1.0.0"))
consumerConfig = nil
providerConfig = nil
}
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestWithNoRegLoad(t *testing.T) {

conServices = map[string]common.RPCService{}
proServices = map[string]common.RPCService{}
common.ServiceMap.UnRegister("com.MockService", "mock", "MockService")
common.ServiceMap.UnRegister("com.MockService", "mock", common.ServiceKey("com.MockService", "huadong_idc", "1.0.0"))
consumerConfig = nil
providerConfig = nil
}
Expand Down
5 changes: 1 addition & 4 deletions config/consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ func (c *ConsumerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
return err
}
type plain ConsumerConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
}
return nil
return unmarshal((*plain)(c))
}

// nolint
Expand Down
5 changes: 1 addition & 4 deletions config/method_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,5 @@ func (c *MethodConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
return err
}
type plain MethodConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
}
return nil
return unmarshal((*plain)(c))
}
5 changes: 1 addition & 4 deletions config/provider_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@ func (c *ProviderConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
return err
}
type plain ProviderConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
}
return nil
return unmarshal((*plain)(c))
}

// nolint
Expand Down
10 changes: 3 additions & 7 deletions config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,13 @@ func (c *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
}

*c = ReferenceConfig(raw)
if err := defaults.Set(c); err != nil {
return err
}

return nil
return defaults.Set(c)
}

// Refer ...
func (c *ReferenceConfig) Refer(_ interface{}) {
cfgURL := common.NewURLWithOptions(
common.WithPath(c.id),
common.WithPath(c.InterfaceName),
common.WithProtocol(c.Protocol),
common.WithParams(c.getUrlMap()),
common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),
Expand All @@ -117,7 +113,7 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
c.urls = append(c.urls, serviceUrl)
} else {
if serviceUrl.Path == "" {
serviceUrl.Path = "/" + c.id
serviceUrl.Path = "/" + c.InterfaceName
}
// merge url need to do
newUrl := common.MergeUrl(serviceUrl, cfgURL)
Expand Down
5 changes: 1 addition & 4 deletions config/registry_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@ func (c *RegistryConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
return err
}
type plain RegistryConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
}
return nil
return unmarshal((*plain)(c))
}

// nolint
Expand Down
4 changes: 2 additions & 2 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (c *ServiceConfig) Export() error {
proxyFactory := extension.GetProxyFactory(providerConfig.ProxyFactory)
for _, proto := range protocolConfigs {
// registry the service reflect
methods, err := common.ServiceMap.Register(c.InterfaceName, proto.Name, c.rpcService)
methods, err := common.ServiceMap.Register(c.InterfaceName, proto.Name, c.Group, c.Version, c.rpcService)
if err != nil {
formatErr := perrors.Errorf("The service %v export the protocol %v error! Error message is %v.", c.InterfaceName, proto.Name, err.Error())
logger.Errorf(formatErr.Error())
Expand All @@ -184,7 +184,7 @@ func (c *ServiceConfig) Export() error {
nextPort = nextPort.Next()
}
ivkURL := common.NewURLWithOptions(
common.WithPath(c.id),
common.WithPath(c.InterfaceName),
common.WithProtocol(proto.Name),
common.WithIp(proto.Ip),
common.WithPort(port),
Expand Down
8 changes: 6 additions & 2 deletions filter/filter_impl/access_log_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,12 @@ func (ef *AccessLogFilter) logIntoChannel(accessLogData AccessLogData) {
func (ef *AccessLogFilter) buildAccessLogData(_ protocol.Invoker, invocation protocol.Invocation) map[string]string {
dataMap := make(map[string]string, 16)
attachments := invocation.Attachments()
if v, ok := attachments[constant.INTERFACE_KEY]; ok && v != nil {
dataMap[constant.INTERFACE_KEY] = v.(string)
itf := attachments[constant.INTERFACE_KEY]
if itf == nil || len(itf.(string)) == 0 {
itf = attachments[constant.PATH_KEY]
}
if itf != nil {
dataMap[constant.INTERFACE_KEY] = itf.(string)
}
if v, ok := attachments[constant.METHOD_KEY]; ok && v != nil {
dataMap[constant.METHOD_KEY] = v.(string)
Expand Down
Loading

0 comments on commit e2954fe

Please sign in to comment.