Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix exporter append #722

Merged
merged 10 commits into from
Sep 5, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
format
  • Loading branch information
xg.gao committed Aug 20, 2020
commit 60f982288150392d860ed209de239c0425e9ec12
2 changes: 1 addition & 1 deletion config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List {

tcp, err := gxnet.ListenOnTCPRandomPort(proto.Ip)
if err != nil {
panic(perrors.New(fmt.Sprintf("Get tcp port error,err is {%v}", err)))
panic(perrors.New(fmt.Sprintf("Get tcp port error, err is {%v}", err)))
}
defer tcp.Close()
ports.PushBack(strings.Split(tcp.Addr().String(), ":")[1])
Expand Down
16 changes: 7 additions & 9 deletions registry/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"
"sync"
)

import (
gxset "github.com/dubbogo/gost/container/set"
)
Expand Down Expand Up @@ -54,9 +55,10 @@ var (

type registryProtocol struct {
invokers []protocol.Invoker
// Registry Map<RegistryAddress, Registry>
// Registry Map<RegistryAddress, Registry>
registries *sync.Map
// To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed.
// To solve the problem of RMI repeated exposure port conflicts,
// the services that have been exposed are no longer exposed.
// providerurl <--> exporter
bounds *sync.Map
overrideListeners *sync.Map
Expand Down Expand Up @@ -100,7 +102,6 @@ func getUrlToRegistry(providerUrl *common.URL, registryUrl *common.URL) *common.

// filterHideKey filter the parameters that do not need to be output in url(Starting with .)
func filterHideKey(url *common.URL) *common.URL {

// be careful params maps in url is map type
removeSet := gxset.NewSet()
for k, _ := range url.GetParams() {
Expand All @@ -127,7 +128,6 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
}

var reg registry.Registry

if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded {
reg = getRegistry(&registryUrl)
proto.registries.Store(registryUrl.Key(), reg)
Expand All @@ -138,7 +138,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
// new registry directory for store service url from registry
directory, err := extension.GetDefaultRegistryDirectory(&registryUrl, reg)
if err != nil {
logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!",
logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!",
serviceUrl.String(), err.Error())
return nil
}
Expand All @@ -151,7 +151,6 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {

// new cluster invoker
cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))

invoker := cluster.Join(directory)
proto.invokers = append(proto.invokers, invoker)
return invoker
Expand Down Expand Up @@ -192,7 +191,7 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
}

key := getCacheKey(providerUrl)
logger.Infof("The cached exporter keys is %v !", key)
logger.Infof("The cached exporter keys is %v!", key)
cachedExporter, loaded := proto.bounds.Load(key)
if loaded {
logger.Infof("The exporter has been cached, and will return cached exporter!")
Expand All @@ -216,7 +215,6 @@ func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common
proto.bounds.Delete(key)
proto.Export(wrappedNewInvoker)
// TODO: unregister & unsubscribe

}
}

Expand Down Expand Up @@ -353,7 +351,7 @@ func (proto *registryProtocol) Destroy() {
func getRegistryUrl(invoker protocol.Invoker) *common.URL {
// here add * for return a new url
url := invoker.GetUrl()
// if the protocol == registry ,set protocol the registry value in url.params
// if the protocol == registry, set protocol the registry value in url.params
if url.Protocol == constant.REGISTRY_PROTOCOL {
protocol := url.GetParam(constant.REGISTRY_KEY, "")
url.Protocol = protocol
Expand Down