Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix exporter append #722

Merged
merged 10 commits into from
Sep 5, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/rpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (sm *serviceMap) GetService(protocol, name string) *Service {
return nil
}

// GetInterface gets an interface defination by interface name
// GetInterface gets an interface definition by interface name
func (sm *serviceMap) GetInterface(interfaceName string) []*Service {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
Expand Down
10 changes: 4 additions & 6 deletions config/config_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,34 +141,32 @@ func loadConsumerConfig() {

// wait for invoker is available, if wait over default 3s, then panic
var count int
checkok := true
for {
checkok := true
for _, refconfig := range consumerConfig.References {
if (refconfig.Check != nil && *refconfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check != nil && *consumerConfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check == nil) { // default to true

if refconfig.invoker != nil &&
!refconfig.invoker.IsAvailable() {
if refconfig.invoker != nil && !refconfig.invoker.IsAvailable() {
checkok = false
count++
if count > maxWait {
errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
errMsg := fmt.Sprintf("Failed to check the status of the service %v. No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
logger.Error(errMsg)
panic(errMsg)
}
time.Sleep(time.Second * 1)
break
}
if refconfig.invoker == nil {
logger.Warnf("The interface %s invoker not exist , may you should check your interface config.", refconfig.InterfaceName)
logger.Warnf("The interface %s invoker not exist, may you should check your interface config.", refconfig.InterfaceName)
}
}
}
if checkok {
break
}
checkok = true
}
}

Expand Down
4 changes: 2 additions & 2 deletions config/consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ type ConsumerConfig struct {

References map[string]*ReferenceConfig `yaml:"references" json:"references,omitempty" property:"references"`
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf"`
FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" `
ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" `
FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf"`
ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf"`
ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"`
}

Expand Down
6 changes: 3 additions & 3 deletions config/provider_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ type ProviderConfig struct {
ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty" property:"proxy_factory"`
Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"`
Protocols map[string]*ProtocolConfig `yaml:"protocols" json:"protocols,omitempty" property:"protocols"`
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf" `
FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" `
ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" `
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf"`
FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf"`
ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf"`
ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"`

Registry *RegistryConfig `yaml:"registry" json:"registry,omitempty" property:"registry"`
Expand Down
47 changes: 22 additions & 25 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List {

tcp, err := gxnet.ListenOnTCPRandomPort(proto.Ip)
if err != nil {
panic(perrors.New(fmt.Sprintf("Get tcp port error,err is {%v}", err)))
panic(perrors.New(fmt.Sprintf("Get tcp port error, err is {%v}", err)))
}
defer tcp.Close()
ports.PushBack(strings.Split(tcp.Addr().String(), ":")[1])
Expand All @@ -145,38 +145,38 @@ func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List {
func (c *ServiceConfig) Export() error {
// TODO: config center start here

// TODO:delay export
// TODO: delay export
if c.unexported != nil && c.unexported.Load() {
err := perrors.Errorf("The service %v has already unexported! ", c.InterfaceName)
err := perrors.Errorf("The service %v has already unexported!", c.InterfaceName)
logger.Errorf(err.Error())
return err
}
if c.unexported != nil && c.exported.Load() {
logger.Warnf("The service %v has already exported! ", c.InterfaceName)
logger.Warnf("The service %v has already exported!", c.InterfaceName)
return nil
}

regUrls := loadRegistries(c.Registry, providerConfig.Registries, common.PROVIDER)
urlMap := c.getUrlMap()
protocolConfigs := loadProtocol(c.Protocol, c.Protocols)
if len(protocolConfigs) == 0 {
logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs ", c.InterfaceName, c.Protocol)
logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs", c.InterfaceName, c.Protocol)
return nil
}

ports := getRandomPort(protocolConfigs)
nextPort := ports.Front()
proxyFactory := extension.GetProxyFactory(providerConfig.ProxyFactory)
for _, proto := range protocolConfigs {
// registry the service reflect
methods, err := common.ServiceMap.Register(c.InterfaceName, proto.Name, c.rpcService)
if err != nil {
formatErr := perrors.Errorf("The service %v export the protocol %v error! Error message is %v .", c.InterfaceName, proto.Name, err.Error())
formatErr := perrors.Errorf("The service %v export the protocol %v error! Error message is %v.", c.InterfaceName, proto.Name, err.Error())
logger.Errorf(formatErr.Error())
return formatErr
}

port := proto.Port

if len(proto.Port) == 0 {
port = nextPort.Value.(string)
nextPort = nextPort.Next()
Expand All @@ -196,33 +196,31 @@ func (c *ServiceConfig) Export() error {
ivkURL.AddParam(constant.Tagkey, c.Tag)
}

var exporter protocol.Exporter

if len(regUrls) > 0 {
c.cacheMutex.Lock()
if c.cacheProtocol == nil {
logger.Infof(fmt.Sprintf("First load the registry protocol, url is {%v}!", ivkURL))
c.cacheProtocol = extension.GetProtocol("registry")
}
c.cacheMutex.Unlock()

for _, regUrl := range regUrls {
regUrl.SubURL = ivkURL

c.cacheMutex.Lock()
if c.cacheProtocol == nil {
logger.Infof(fmt.Sprintf("First load the registry protocol , url is {%v}!", ivkURL))
c.cacheProtocol = extension.GetProtocol("registry")
}
c.cacheMutex.Unlock()

invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*regUrl)
exporter = c.cacheProtocol.Export(invoker)
invoker := proxyFactory.GetInvoker(*regUrl)
exporter := c.cacheProtocol.Export(invoker)
if exporter == nil {
panic(perrors.New(fmt.Sprintf("Registry protocol new exporter error,registry is {%v},url is {%v}", regUrl, ivkURL)))
return perrors.New(fmt.Sprintf("Registry protocol new exporter error, registry is {%v}, url is {%v}", regUrl, ivkURL))
fangyincheng marked this conversation as resolved.
Show resolved Hide resolved
}
c.exporters = append(c.exporters, exporter)
}
} else {
invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*ivkURL)
exporter = extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
invoker := proxyFactory.GetInvoker(*ivkURL)
exporter := extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
if exporter == nil {
panic(perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error,url is {%v}", ivkURL)))
return perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error, url is {%v}", ivkURL))
}
c.exporters = append(c.exporters, exporter)
}
c.exporters = append(c.exporters, exporter)
}
c.exported.Store(true)
return nil
Expand Down Expand Up @@ -314,7 +312,6 @@ func (c *ServiceConfig) getUrlMap() url.Values {

urlMap.Set(constant.EXECUTE_LIMIT_KEY, v.ExecuteLimit)
urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, v.ExecuteLimitRejectedHandler)

}

return urlMap
Expand Down
1 change: 0 additions & 1 deletion metadata/service/exporter/configurable/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func NewMetadataServiceExporter(metadataService service.MetadataService) exporte
// Export will export the metadataService
func (exporter *MetadataServiceExporter) Export() error {
if !exporter.IsExported() {

serviceConfig := config.NewServiceConfig(constant.SIMPLE_METADATA_SERVICE_NAME, context.Background())
serviceConfig.Protocol = constant.DEFAULT_PROTOCOL
serviceConfig.Protocols = map[string]*config.ProtocolConfig{
Expand Down
2 changes: 1 addition & 1 deletion protocol/jsonrpc/jsonrpc_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewJsonrpcProtocol() *JsonrpcProtocol {
}
}

// Export JSON RPC service for remote invocation
// Export JSON RPC service for remote invocation
func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
url := invoker.GetUrl()
serviceKey := strings.TrimPrefix(url.Path, "/")
Expand Down
8 changes: 3 additions & 5 deletions protocol/jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ func (s *Server) handlePkg(conn net.Conn) {
}

reqBody, err := ioutil.ReadAll(r.Body)
r.Body.Close()
if err != nil {
return
}
r.Body.Close()

reqHeader := make(map[string]string)
for k := range r.Header {
Expand Down Expand Up @@ -263,8 +263,7 @@ func (s *Server) Stop() {
})
}

func serveRequest(ctx context.Context,
header map[string]string, body []byte, conn net.Conn) error {
func serveRequest(ctx context.Context, header map[string]string, body []byte, conn net.Conn) error {
sendErrorResp := func(header map[string]string, body []byte) error {
rsp := &http.Response{
Header: make(http.Header),
Expand Down Expand Up @@ -324,13 +323,12 @@ func serveRequest(ctx context.Context,
if err == io.EOF || err == io.ErrUnexpectedEOF {
return perrors.WithStack(err)
}

return perrors.New("server cannot decode request: " + err.Error())
}

path := header["Path"]
methodName := codec.req.Method
if len(path) == 0 || len(methodName) == 0 {
codec.ReadBody(nil)
return perrors.New("service/method request ill-formed: " + path + "/" + methodName)
}

Expand Down
15 changes: 5 additions & 10 deletions protocol/protocolwrapper/protocol_filter_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,16 @@ func (pfw *ProtocolFilterWrapper) Destroy() {
}

func buildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker {
filtName := invoker.GetUrl().GetParam(key, "")
if filtName == "" {
gaoxinge marked this conversation as resolved.
Show resolved Hide resolved
return invoker
}
filtNames := strings.Split(filtName, ",")
next := invoker
filterName := invoker.GetUrl().GetParam(key, "")
filterNames := strings.Split(filterName, ",")

// The order of filters is from left to right, so loading from right to left

for i := len(filtNames) - 1; i >= 0; i-- {
flt := extension.GetFilter(filtNames[i])
next := invoker
for i := len(filterNames) - 1; i >= 0; i-- {
gaoxinge marked this conversation as resolved.
Show resolved Hide resolved
flt := extension.GetFilter(filterNames[i])
fi := &FilterInvoker{next: next, invoker: invoker, filter: flt}
next = fi
}

return next
}

Expand Down
17 changes: 7 additions & 10 deletions registry/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"
"sync"
)

import (
gxset "github.com/dubbogo/gost/container/set"
)
Expand Down Expand Up @@ -54,9 +55,10 @@ var (

type registryProtocol struct {
invokers []protocol.Invoker
// Registry Map<RegistryAddress, Registry>
// Registry Map<RegistryAddress, Registry>
registries *sync.Map
// To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed.
// To solve the problem of RMI repeated exposure port conflicts,
// the services that have been exposed are no longer exposed.
// providerurl <--> exporter
bounds *sync.Map
overrideListeners *sync.Map
Expand Down Expand Up @@ -100,7 +102,6 @@ func getUrlToRegistry(providerUrl *common.URL, registryUrl *common.URL) *common.

// filterHideKey filter the parameters that do not need to be output in url(Starting with .)
func filterHideKey(url *common.URL) *common.URL {

// be careful params maps in url is map type
removeSet := gxset.NewSet()
for k := range url.GetParams() {
Expand Down Expand Up @@ -139,7 +140,6 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
}

var reg registry.Registry

if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded {
reg = getRegistry(&registryUrl)
proto.registries.Store(registryUrl.Key(), reg)
Expand All @@ -150,7 +150,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
// new registry directory for store service url from registry
directory, err := extension.GetDefaultRegistryDirectory(&registryUrl, reg)
if err != nil {
logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!",
logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!",
serviceUrl.String(), err.Error())
return nil
}
Expand All @@ -163,7 +163,6 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {

// new cluster invoker
cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))

invoker := cluster.Join(directory)
proto.invokers = append(proto.invokers, invoker)
return invoker
Expand Down Expand Up @@ -204,7 +203,7 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
}

key := getCacheKey(providerUrl)
logger.Infof("The cached exporter keys is %v !", key)
logger.Infof("The cached exporter keys is %v!", key)
cachedExporter, loaded := proto.bounds.Load(key)
if loaded {
logger.Infof("The exporter has been cached, and will return cached exporter!")
Expand All @@ -217,7 +216,6 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte

go reg.Subscribe(overriderUrl, overrideSubscribeListener)
return cachedExporter.(protocol.Exporter)

}

func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common.URL) {
Expand All @@ -229,7 +227,6 @@ func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common
proto.bounds.Delete(key)
proto.Export(wrappedNewInvoker)
// TODO: unregister & unsubscribe

}
}

Expand Down Expand Up @@ -366,7 +363,7 @@ func (proto *registryProtocol) Destroy() {
func getRegistryUrl(invoker protocol.Invoker) *common.URL {
// here add * for return a new url
url := invoker.GetUrl()
// if the protocol == registry ,set protocol the registry value in url.params
// if the protocol == registry, set protocol the registry value in url.params
if url.Protocol == constant.REGISTRY_PROTOCOL {
protocol := url.GetParam(constant.REGISTRY_KEY, "")
url.Protocol = protocol
Expand Down
Loading