Skip to content

Commit

Permalink
Fix ZK BUG
Browse files Browse the repository at this point in the history
  • Loading branch information
flycash committed Jul 5, 2020
1 parent e90fac6 commit 9a5990d
Show file tree
Hide file tree
Showing 11 changed files with 50 additions and 38 deletions.
4 changes: 3 additions & 1 deletion common/extension/metadata_service_proxy_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ var (
metadataServiceProxyFactoryMap = make(map[string]func() service.MetadataServiceProxyFactory)
)

type MetadataServiceProxyFactoryFunc func() service.MetadataServiceProxyFactory

// SetMetadataServiceProxyFactory store the name-creator pair
func SetMetadataServiceProxyFactory(name string, creator func() service.MetadataServiceProxyFactory) {
func SetMetadataServiceProxyFactory(name string, creator MetadataServiceProxyFactoryFunc) {
metadataServiceProxyFactoryMap[name] = creator
}

Expand Down
36 changes: 18 additions & 18 deletions config/base_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ func TestRefresh(t *testing.T) {
config.GetEnvInstance().UpdateExternalConfigMap(mockMap)

father := &ConsumerConfig{
Check: &[]bool{true}[0],
Check: &[]bool{true}[0],
BaseConfig: BaseConfig{
ApplicationConfig:baseAppConfig,
ApplicationConfig: baseAppConfig,
},
Registries: baseRegistries,
References: baseMockRef,
Registries: baseRegistries,
References: baseMockRef,
ShutdownConfig: &ShutdownConfig{
Timeout: "12s",
StepTimeout: "2s",
Expand Down Expand Up @@ -150,12 +150,12 @@ func TestAppExternalRefresh(t *testing.T) {
mockMap["dubbo.consumer.check"] = "true"
config.GetEnvInstance().UpdateExternalConfigMap(mockMap)
father := &ConsumerConfig{
Check: &[]bool{true}[0],
Check: &[]bool{true}[0],
BaseConfig: BaseConfig{
ApplicationConfig:baseAppConfig,
ApplicationConfig: baseAppConfig,
},
Registries: baseRegistries,
References: baseMockRef,
Registries: baseRegistries,
References: baseMockRef,
}

c.SetFatherConfig(father)
Expand All @@ -178,12 +178,12 @@ func TestAppExternalWithoutIDRefresh(t *testing.T) {
mockMap["dubbo.consumer.check"] = "true"
config.GetEnvInstance().UpdateExternalConfigMap(mockMap)
father := &ConsumerConfig{
Check: &[]bool{true}[0],
Check: &[]bool{true}[0],
BaseConfig: BaseConfig{
ApplicationConfig:baseAppConfig,
ApplicationConfig: baseAppConfig,
},
Registries: baseRegistries,
References: baseMockRef,
Registries: baseRegistries,
References: baseMockRef,
}

c.SetFatherConfig(father)
Expand All @@ -208,13 +208,13 @@ func TestRefreshSingleRegistry(t *testing.T) {
config.GetEnvInstance().UpdateExternalConfigMap(mockMap)

father := &ConsumerConfig{
Check: &[]bool{true}[0],
Check: &[]bool{true}[0],
BaseConfig: BaseConfig{
ApplicationConfig: baseAppConfig,
ApplicationConfig: baseAppConfig,
},
Registries: map[string]*RegistryConfig{},
Registry: &RegistryConfig{},
References: baseMockRef,
Registries: map[string]*RegistryConfig{},
Registry: &RegistryConfig{},
References: baseMockRef,
}

c.SetFatherConfig(father)
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestRefreshProvider(t *testing.T) {
BaseConfig: BaseConfig{
ApplicationConfig: baseAppConfig,
},
Registries: baseRegistries,
Registries: baseRegistries,
Services: map[string]*ServiceConfig{
"MockService": {
InterfaceName: "com.MockService",
Expand Down
6 changes: 3 additions & 3 deletions config/config_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ func loadConsumerConfig() {
checkok = false
count++
if count > maxWait {
errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
logger.Error(errMsg)
panic(errMsg)
// errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
// logger.Error(errMsg)
// panic(errMsg)
}
time.Sleep(time.Second * 1)
break
Expand Down
2 changes: 1 addition & 1 deletion registry/etcdv3/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (e *etcdV3ServiceDiscovery) GetInstances(serviceName string) []registry.Ser
}
return serviceInstances
}
perrors.New(fmt.Sprintf("could not getChildrenKVList the err is:%v", err))
logger.Infof("could not getChildrenKVList the err is:%v", err)
}

return make([]registry.ServiceInstance, 0, 0)
Expand Down
3 changes: 1 addition & 2 deletions registry/event/event_publishing_service_deiscovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package event
import (
"reflect"
"testing"

"github.com/apache/dubbo-go/metadata/mapping"
)

import (
Expand All @@ -36,6 +34,7 @@ import (
"github.com/apache/dubbo-go/common/observer"
dispatcher2 "github.com/apache/dubbo-go/common/observer/dispatcher"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/metadata/mapping"
_ "github.com/apache/dubbo-go/metadata/service/inmemory"
"github.com/apache/dubbo-go/registry"
)
Expand Down
1 change: 1 addition & 0 deletions registry/zookeeper/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ func (zksd *zookeeperServiceDiscovery) DataChange(eventType remoting.Event) bool
err := zksd.DispatchEventByServiceName(serviceName)
if err != nil {
logger.Errorf("[zkServiceDiscovery] DispatchEventByServiceName{%s} error = err{%v}", serviceName, err)
return false
}
return true
}
Expand Down
4 changes: 2 additions & 2 deletions registry/zookeeper/service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package zookeeper

import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/observer"
"strconv"
"sync"
"testing"
Expand All @@ -31,6 +29,8 @@ import (
)

import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/observer"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/registry"
)
Expand Down
8 changes: 1 addition & 7 deletions remoting/consul/test_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,5 @@ func (consulAgent *ConsulAgent) Close() error {
if err != nil {
return err
}

err = os.RemoveAll(consulAgent.dataDir)
if err != nil {
return err
}

return nil
return os.RemoveAll(consulAgent.dataDir)
}
6 changes: 5 additions & 1 deletion remoting/zookeeper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func ValidateZookeeperClient(container ZkClientFacade, opts ...Option) error {
}

if connected {
logger.Info("Connect to zookeeper successfully, name{%s}, zk address{%v}", options.zkName, url.Location)
logger.Infof("Connect to zookeeper successfully, name{%s}, zk address{%v}", options.zkName, url.Location)
container.WaitGroup().Add(1) // zk client start successful, then registry wg +1
}

Expand Down Expand Up @@ -433,6 +433,7 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error {

// CreateTempWithValue will create the node recursively, which means that if the parent node is absent,
// it will create parent node first,and set value in last child path
// If the path exist, it will update data
func (z *ZookeeperClient) CreateTempWithValue(basePath string, value []byte) error {
var (
err error
Expand All @@ -453,6 +454,9 @@ func (z *ZookeeperClient) CreateTempWithValue(basePath string, value []byte) err
// last child need be ephemeral
if i == length-1 {
_, err = conn.Create(tmpPath, value, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err == zk.ErrNodeExists {
return err
}
} else {
_, err = conn.Create(tmpPath, []byte{}, 0, zk.WorldACL(zk.PermAll))
}
Expand Down
12 changes: 12 additions & 0 deletions remoting/zookeeper/curator_discovery/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"path"
"strings"
"sync"

"github.com/dubbogo/go-zookeeper/zk"
)

import (
Expand Down Expand Up @@ -71,6 +73,16 @@ func (sd *ServiceDiscovery) registerService(instance *ServiceInstance) error {
return err
}
err = sd.client.CreateTempWithValue(path, data)
if err == zk.ErrNodeExists {
_, state, _ := sd.client.GetContent(path)
if state != nil {
_, err = sd.client.SetContent(path, data, state.Version+1)
if err != nil {
logger.Debugf("Try to update the node data failed. In most cases, it's not a problem. ")
}
}
return nil
}
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions remoting/zookeeper/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package zookeeper

import (
"github.com/apache/dubbo-go/common"
"sync"
)
import (
Expand All @@ -27,15 +26,16 @@ import (
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
)

type ZkClientFacade interface {
ZkClient() *ZookeeperClient
SetZkClient(*ZookeeperClient)
ZkClientLock() *sync.Mutex
WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container
Done() chan struct{} //for zk client control
WaitGroup() *sync.WaitGroup // for wait group control, zk client listener & zk client container
Done() chan struct{} // for zk client control
RestartCallBack() bool
GetUrl() common.URL
}
Expand Down

0 comments on commit 9a5990d

Please sign in to comment.