Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangshen023 committed Sep 5, 2020
2 parents 5ff6870 + 6388c4a commit 04d7ac9
Show file tree
Hide file tree
Showing 21 changed files with 124 additions and 108 deletions.
1 change: 0 additions & 1 deletion cluster/router/tag/tag_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) {
defer c.mutex.Unlock()
c.tagRouterRule = routerRule
c.ruleChanged = true
return
}

// URL gets the url of tagRouter
Expand Down
4 changes: 2 additions & 2 deletions common/rpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (sm *serviceMap) GetService(protocol, name string) *Service {
return nil
}

// GetInterface gets an interface defination by interface name
// GetInterface gets an interface definition by interface name
func (sm *serviceMap) GetInterface(interfaceName string) []*Service {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
Expand Down Expand Up @@ -271,7 +271,7 @@ func (sm *serviceMap) UnRegister(interfaceName, protocol, serviceId string) erro
sm.mutex.Lock()
defer sm.mutex.Unlock()
sm.interfaceMap[interfaceName] = make([]*Service, 0, len(svrs))
for i, _ := range svrs {
for i := range svrs {
if i != index {
sm.interfaceMap[interfaceName] = append(sm.interfaceMap[interfaceName], svrs[i])
}
Expand Down
2 changes: 1 addition & 1 deletion common/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func NewURL(urlString string, opts ...option) (URL, error) {
}

// rawUrlString = "//" + rawUrlString
if strings.Index(rawUrlString, "//") < 0 {
if !strings.Contains(rawUrlString, "//") {
t := URL{baseUrl: baseUrl{}}
for _, opt := range opts {
opt(&t)
Expand Down
16 changes: 3 additions & 13 deletions config/base_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,8 @@ func getKeyPrefix(val reflect.Value) []string {
} else {
prefix = val.MethodByName(configPrefixMethod).Call(nil)[0].String()
}
var retPrefixes []string

for _, pfx := range strings.Split(prefix, "|") {

retPrefixes = append(retPrefixes, pfx)

}
return retPrefixes

return strings.Split(prefix, "|")
}

func getPtrElement(v reflect.Value) reflect.Value {
Expand Down Expand Up @@ -216,12 +209,9 @@ func setFieldValue(val reflect.Value, id reflect.Value, config *config.InmemoryC
prefix := s.MethodByName("Prefix").Call(nil)[0].String()
for _, pfx := range strings.Split(prefix, "|") {
m := config.GetSubProperty(pfx)
if m != nil {
for k := range m {
f.SetMapIndex(reflect.ValueOf(k), reflect.New(f.Type().Elem().Elem()))
}
for k := range m {
f.SetMapIndex(reflect.ValueOf(k), reflect.New(f.Type().Elem().Elem()))
}

}

}
Expand Down
10 changes: 4 additions & 6 deletions config/config_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,34 +141,32 @@ func loadConsumerConfig() {

// wait for invoker is available, if wait over default 3s, then panic
var count int
checkok := true
for {
checkok := true
for _, refconfig := range consumerConfig.References {
if (refconfig.Check != nil && *refconfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check != nil && *consumerConfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check == nil) { // default to true

if refconfig.invoker != nil &&
!refconfig.invoker.IsAvailable() {
if refconfig.invoker != nil && !refconfig.invoker.IsAvailable() {
checkok = false
count++
if count > maxWait {
errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
errMsg := fmt.Sprintf("Failed to check the status of the service %v. No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
logger.Error(errMsg)
panic(errMsg)
}
time.Sleep(time.Second * 1)
break
}
if refconfig.invoker == nil {
logger.Warnf("The interface %s invoker not exist , may you should check your interface config.", refconfig.InterfaceName)
logger.Warnf("The interface %s invoker not exist, may you should check your interface config.", refconfig.InterfaceName)
}
}
}
if checkok {
break
}
checkok = true
}
}

Expand Down
4 changes: 2 additions & 2 deletions config/consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ type ConsumerConfig struct {

References map[string]*ReferenceConfig `yaml:"references" json:"references,omitempty" property:"references"`
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf"`
FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" `
ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" `
FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf"`
ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf"`
ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"`
}

Expand Down
6 changes: 3 additions & 3 deletions config/provider_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ type ProviderConfig struct {
ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty" property:"proxy_factory"`
Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"`
Protocols map[string]*ProtocolConfig `yaml:"protocols" json:"protocols,omitempty" property:"protocols"`
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf" `
FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" `
ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" `
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf"`
FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf"`
ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf"`
ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"`

Registry *RegistryConfig `yaml:"registry" json:"registry,omitempty" property:"registry"`
Expand Down
1 change: 0 additions & 1 deletion config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,5 +251,4 @@ func (c *ReferenceConfig) GenericLoad(id string) {
c.id = id
c.Refer(genericService)
c.Implement(genericService)
return
}
47 changes: 22 additions & 25 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List {

tcp, err := gxnet.ListenOnTCPRandomPort(proto.Ip)
if err != nil {
panic(perrors.New(fmt.Sprintf("Get tcp port error,err is {%v}", err)))
panic(perrors.New(fmt.Sprintf("Get tcp port error, err is {%v}", err)))
}
defer tcp.Close()
ports.PushBack(strings.Split(tcp.Addr().String(), ":")[1])
Expand All @@ -145,38 +145,38 @@ func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List {
func (c *ServiceConfig) Export() error {
// TODO: config center start here

// TODO:delay export
// TODO: delay export
if c.unexported != nil && c.unexported.Load() {
err := perrors.Errorf("The service %v has already unexported! ", c.InterfaceName)
err := perrors.Errorf("The service %v has already unexported!", c.InterfaceName)
logger.Errorf(err.Error())
return err
}
if c.unexported != nil && c.exported.Load() {
logger.Warnf("The service %v has already exported! ", c.InterfaceName)
logger.Warnf("The service %v has already exported!", c.InterfaceName)
return nil
}

regUrls := loadRegistries(c.Registry, providerConfig.Registries, common.PROVIDER)
urlMap := c.getUrlMap()
protocolConfigs := loadProtocol(c.Protocol, c.Protocols)
if len(protocolConfigs) == 0 {
logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs ", c.InterfaceName, c.Protocol)
logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs", c.InterfaceName, c.Protocol)
return nil
}

ports := getRandomPort(protocolConfigs)
nextPort := ports.Front()
proxyFactory := extension.GetProxyFactory(providerConfig.ProxyFactory)
for _, proto := range protocolConfigs {
// registry the service reflect
methods, err := common.ServiceMap.Register(c.InterfaceName, proto.Name, c.rpcService)
if err != nil {
formatErr := perrors.Errorf("The service %v export the protocol %v error! Error message is %v .", c.InterfaceName, proto.Name, err.Error())
formatErr := perrors.Errorf("The service %v export the protocol %v error! Error message is %v.", c.InterfaceName, proto.Name, err.Error())
logger.Errorf(formatErr.Error())
return formatErr
}

port := proto.Port

if len(proto.Port) == 0 {
port = nextPort.Value.(string)
nextPort = nextPort.Next()
Expand All @@ -196,33 +196,31 @@ func (c *ServiceConfig) Export() error {
ivkURL.AddParam(constant.Tagkey, c.Tag)
}

var exporter protocol.Exporter

if len(regUrls) > 0 {
c.cacheMutex.Lock()
if c.cacheProtocol == nil {
logger.Infof(fmt.Sprintf("First load the registry protocol, url is {%v}!", ivkURL))
c.cacheProtocol = extension.GetProtocol("registry")
}
c.cacheMutex.Unlock()

for _, regUrl := range regUrls {
regUrl.SubURL = ivkURL

c.cacheMutex.Lock()
if c.cacheProtocol == nil {
logger.Infof(fmt.Sprintf("First load the registry protocol , url is {%v}!", ivkURL))
c.cacheProtocol = extension.GetProtocol("registry")
}
c.cacheMutex.Unlock()

invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*regUrl)
exporter = c.cacheProtocol.Export(invoker)
invoker := proxyFactory.GetInvoker(*regUrl)
exporter := c.cacheProtocol.Export(invoker)
if exporter == nil {
panic(perrors.New(fmt.Sprintf("Registry protocol new exporter error,registry is {%v},url is {%v}", regUrl, ivkURL)))
return perrors.New(fmt.Sprintf("Registry protocol new exporter error, registry is {%v}, url is {%v}", regUrl, ivkURL))
}
c.exporters = append(c.exporters, exporter)
}
} else {
invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*ivkURL)
exporter = extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
invoker := proxyFactory.GetInvoker(*ivkURL)
exporter := extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
if exporter == nil {
panic(perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error,url is {%v}", ivkURL)))
return perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error, url is {%v}", ivkURL))
}
c.exporters = append(c.exporters, exporter)
}
c.exporters = append(c.exporters, exporter)
}
c.exported.Store(true)
return nil
Expand Down Expand Up @@ -314,7 +312,6 @@ func (c *ServiceConfig) getUrlMap() url.Values {

urlMap.Set(constant.EXECUTE_LIMIT_KEY, v.ExecuteLimit)
urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, v.ExecuteLimitRejectedHandler)

}

return urlMap
Expand Down
2 changes: 1 addition & 1 deletion config_center/configurator/override.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (c *overrideConfigurator) configureIfMatchInternal(url *common.URL) {
func (c *overrideConfigurator) configureIfMatch(host string, url *common.URL) {
if constant.ANYHOST_VALUE == c.configuratorUrl.Ip || host == c.configuratorUrl.Ip {
providers := c.configuratorUrl.GetParam(constant.OVERRIDE_PROVIDERS_KEY, "")
if len(providers) == 0 || strings.Index(providers, url.Location) >= 0 || strings.Index(providers, constant.ANYHOST_VALUE) >= 0 {
if len(providers) == 0 || strings.Contains(providers, url.Location) || strings.Contains(providers, constant.ANYHOST_VALUE) {
c.configureIfMatchInternal(url)
}
}
Expand Down
56 changes: 46 additions & 10 deletions filter/filter_impl/hystrix_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ var (
providerConfigOnce sync.Once
)

//The filter in the server end of dubbo-go can't get the invoke result for now,
//this filter ONLY works in CLIENT end (consumer side) temporarily
//Only after the callService logic is integrated into the filter chain of server end then the filter can be used,
//which will be done soon
func init() {
extension.SetFilter(HYSTRIX_CONSUMER, GetHystrixFilterConsumer)
extension.SetFilter(HYSTRIX_PROVIDER, GetHystrixFilterProvider)
Expand Down Expand Up @@ -85,7 +81,47 @@ func NewHystrixFilterError(err error, failByHystrix bool) error {
}
}

// nolint
/**
* HystrixFilter
* You should add hystrix related configuration in provider or consumer config or both, according to which side you are to apply HystrixFilter.
* For example:
* filter_conf:
* hystrix:
* configs:
* # =========== Define config here ============
* "Default":
* timeout : 1000
* max_concurrent_requests : 25
* sleep_window : 5000
* error_percent_threshold : 50
* request_volume_threshold: 20
* "userp":
* timeout: 2000
* max_concurrent_requests: 512
* sleep_window: 4000
* error_percent_threshold: 35
* request_volume_threshold: 6
* "userp_m":
* timeout : 1200
* max_concurrent_requests : 512
* sleep_window : 6000
* error_percent_threshold : 60
* request_volume_threshold: 16
* # =========== Define error whitelist which will be ignored by Hystrix counter ============
* error_whitelist: [".*exception.*"]
*
* # =========== Apply default config here ===========
* default: "Default"
*
* services:
* "com.ikurento.user.UserProvider":
* # =========== Apply service level config ===========
* service_config: "userp"
* # =========== Apply method level config ===========
* methods:
* "GetUser": "userp_m"
* "GetUser1": "userp_m"
*/
type HystrixFilter struct {
COrP bool //true for consumer
res map[string][]*regexp.Regexp
Expand Down Expand Up @@ -213,11 +249,11 @@ func getConfig(service string, method string, cOrP bool) CommandConfigWithError

func initHystrixConfigConsumer() error {
if config.GetConsumerConfig().FilterConf == nil {
return perrors.Errorf("no config for hystrix")
return perrors.Errorf("no config for hystrix_consumer")
}
filterConfig := config.GetConsumerConfig().FilterConf.(map[interface{}]interface{})[HYSTRIX]
if filterConfig == nil {
return perrors.Errorf("no config for hystrix")
return perrors.Errorf("no config for hystrix_consumer")
}
hystrixConfByte, err := yaml.Marshal(filterConfig)
if err != nil {
Expand All @@ -232,11 +268,11 @@ func initHystrixConfigConsumer() error {

func initHystrixConfigProvider() error {
if config.GetProviderConfig().FilterConf == nil {
return perrors.Errorf("no config for hystrix")
return perrors.Errorf("no config for hystrix_provider")
}
filterConfig := config.GetConsumerConfig().FilterConf.(map[interface{}]interface{})[HYSTRIX]
filterConfig := config.GetProviderConfig().FilterConf.(map[interface{}]interface{})[HYSTRIX]
if filterConfig == nil {
return perrors.Errorf("no config for hystrix")
return perrors.Errorf("no config for hystrix_provider")
}
hystrixConfByte, err := yaml.Marshal(filterConfig)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion metadata/service/exporter/configurable/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func NewMetadataServiceExporter(metadataService service.MetadataService) exporte
// Export will export the metadataService
func (exporter *MetadataServiceExporter) Export() error {
if !exporter.IsExported() {

serviceConfig := config.NewServiceConfig(constant.SIMPLE_METADATA_SERVICE_NAME, context.Background())
serviceConfig.Protocol = constant.DEFAULT_PROTOCOL
serviceConfig.Protocols = map[string]*config.ProtocolConfig{
Expand Down
2 changes: 1 addition & 1 deletion protocol/jsonrpc/jsonrpc_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewJsonrpcProtocol() *JsonrpcProtocol {
}
}

// Export JSON RPC service for remote invocation
// Export JSON RPC service for remote invocation
func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
url := invoker.GetUrl()
serviceKey := strings.TrimPrefix(url.Path, "/")
Expand Down
8 changes: 3 additions & 5 deletions protocol/jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ func (s *Server) handlePkg(conn net.Conn) {
}

reqBody, err := ioutil.ReadAll(r.Body)
r.Body.Close()
if err != nil {
return
}
r.Body.Close()

reqHeader := make(map[string]string)
for k := range r.Header {
Expand Down Expand Up @@ -263,8 +263,7 @@ func (s *Server) Stop() {
})
}

func serveRequest(ctx context.Context,
header map[string]string, body []byte, conn net.Conn) error {
func serveRequest(ctx context.Context, header map[string]string, body []byte, conn net.Conn) error {
sendErrorResp := func(header map[string]string, body []byte) error {
rsp := &http.Response{
Header: make(http.Header),
Expand Down Expand Up @@ -324,13 +323,12 @@ func serveRequest(ctx context.Context,
if err == io.EOF || err == io.ErrUnexpectedEOF {
return perrors.WithStack(err)
}

return perrors.New("server cannot decode request: " + err.Error())
}

path := header["Path"]
methodName := codec.req.Method
if len(path) == 0 || len(methodName) == 0 {
codec.ReadBody(nil)
return perrors.New("service/method request ill-formed: " + path + "/" + methodName)
}

Expand Down
Loading

0 comments on commit 04d7ac9

Please sign in to comment.