From 08af2cc00e3284f7e41de59626c349956d3acdf8 Mon Sep 17 00:00:00 2001 From: Xuewei Niu Date: Tue, 12 Jul 2022 13:06:49 +0800 Subject: [PATCH] fix(proto): fix getting attributes issue (#1968) RPCInvocation::GetAttributeWithDefaultValue() has a typo which couldn't get attributes correctly, this PR fixes that issue. Signed-off-by: Xuewei Niu --- protocol/invocation/rpcinvocation.go | 2 +- registry/polaris/core_test.go | 40 ++ registry/polaris/listener.go | 1 + registry/polaris/registry.go | 19 + registry/polaris/registry_test.go | 58 +++ registry/zookeeper/service_discovery_test.go | 481 +++++++++---------- 6 files changed, 338 insertions(+), 263 deletions(-) create mode 100644 registry/polaris/core_test.go create mode 100644 registry/polaris/registry_test.go diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index c477c7d6a1..5c821e6768 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -234,7 +234,7 @@ func (r *RPCInvocation) GetAttributeWithDefaultValue(key string, defaultValue in if r.attributes == nil { return defaultValue } - if value, ok := r.attachments[key]; ok { + if value, ok := r.attributes[key]; ok { return value } return defaultValue diff --git a/registry/polaris/core_test.go b/registry/polaris/core_test.go new file mode 100644 index 0000000000..ba71c2f2a7 --- /dev/null +++ b/registry/polaris/core_test.go @@ -0,0 +1,40 @@ +package polaris + +import ( + "dubbo.apache.org/dubbo-go/v3/remoting" + "github.com/polarismesh/polaris-go/api" + "github.com/polarismesh/polaris-go/pkg/model" + "github.com/stretchr/testify/assert" + "sync" + "testing" +) + +func TestPolarisServiceWatcher_AddSubscriber(t *testing.T) { + type fields struct { + consumer api.ConsumerAPI + subscribeParam *api.WatchServiceRequest + lock *sync.RWMutex + subscribers []subscriber + execOnce *sync.Once + } + type args struct { + subscriber func(remoting.EventType, []model.Instance) + } + var tests []struct { + name string + fields fields + args args + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + watcher := &PolarisServiceWatcher{ + subscribeParam: &newParam, + consumer: newConsumer, + lock: &sync.RWMutex{}, + subscribers: make([]subscriber, 0), + execOnce: &sync.Once{}, + } + assert.Empty(t, watcher) + }) + } +} diff --git a/registry/polaris/listener.go b/registry/polaris/listener.go index c7927605c2..c949f292c8 100644 --- a/registry/polaris/listener.go +++ b/registry/polaris/listener.go @@ -51,6 +51,7 @@ func NewPolarisListener(url *common.URL) (*polarisListener, error) { events: gxchan.NewUnboundedChan(32), closeCh: make(chan struct{}), } + return listener, nil } diff --git a/registry/polaris/registry.go b/registry/polaris/registry.go index 0ead232741..39db13da4f 100644 --- a/registry/polaris/registry.go +++ b/registry/polaris/registry.go @@ -42,6 +42,8 @@ import ( ) var localIP = "" +var newParam api.WatchServiceRequest +var newConsumer api.ConsumerAPI const ( RegistryConnDelay = 3 @@ -163,8 +165,24 @@ func (pr *polarisRegistry) Subscribe(url *common.URL, notifyListener registry.No continue } + watcher := &PolarisServiceWatcher{ + subscribeParam: &newParam, + consumer: newConsumer, + lock: &sync.RWMutex{}, + subscribers: make([]subscriber, 0), + execOnce: &sync.Once{}, + } + + watcher, err = newPolarisWatcher(&newParam, newConsumer) + if err != nil { + logger.Warnf("getwatcher() = err:%v", perrors.WithStack(err)) + <-time.After(time.Duration(RegistryConnDelay) * time.Second) + continue + } for { + serviceEvent, err := listener.Next() + if err != nil { logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) listener.Close() @@ -172,6 +190,7 @@ func (pr *polarisRegistry) Subscribe(url *common.URL, notifyListener registry.No } logger.Infof("update begin, service event: %v", serviceEvent.String()) notifyListener.Notify(serviceEvent) + watcher.startWatch() } } } diff --git a/registry/polaris/registry_test.go b/registry/polaris/registry_test.go new file mode 100644 index 0000000000..17a5988f3d --- /dev/null +++ b/registry/polaris/registry_test.go @@ -0,0 +1,58 @@ +package polaris + +import ( + "dubbo.apache.org/dubbo-go/v3/common" + "github.com/polarismesh/polaris-go/api" + "reflect" + "sync" + "testing" +) + +func Test_createDeregisterParam(t *testing.T) { + type args struct { + url *common.URL + serviceName string + } + tests := []struct { + name string + args args + want *api.InstanceDeRegisterRequest + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := createDeregisterParam(tt.args.url, tt.args.serviceName); !reflect.DeepEqual(got, tt.want) { + t.Errorf("createDeregisterParam() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_polarisRegistry_Destroy(t *testing.T) { + type fields struct { + url *common.URL + provider api.ProviderAPI + lock *sync.RWMutex + registryUrls map[string]*PolarisHeartbeat + listenerLock *sync.RWMutex + } + tests := []struct { + name string + fields fields + }{ + { + name: "Test_polarisRegistry_Destroy", + fields: fields{ + url: nil, + provider: nil, + registryUrls: nil, + }, + }, + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + }) + } +} diff --git a/registry/zookeeper/service_discovery_test.go b/registry/zookeeper/service_discovery_test.go index 477fa5eefe..ebd4807f6f 100644 --- a/registry/zookeeper/service_discovery_test.go +++ b/registry/zookeeper/service_discovery_test.go @@ -17,265 +17,222 @@ package zookeeper -// -//import ( -// "context" -// "strconv" -// "sync" -// "testing" -//) -// -//import ( -// "github.com/dubbogo/go-zookeeper/zk" -// -// gxset "github.com/dubbogo/gost/container/set" -// -// "github.com/stretchr/testify/assert" -//) -// -//import ( -// "dubbo.apache.org/dubbo-go/v3/common" -// "dubbo.apache.org/dubbo-go/v3/common/constant" -// "dubbo.apache.org/dubbo-go/v3/common/extension" -// "github.com/dubbogo/gost/gof/observer" -// "dubbo.apache.org/dubbo-go/v3/common/observer/dispatcher" -// "dubbo.apache.org/dubbo-go/v3/config" -// "dubbo.apache.org/dubbo-go/v3/metadata/mapping" -// "dubbo.apache.org/dubbo-go/v3/protocol" -// "dubbo.apache.org/dubbo-go/v3/registry" -// "dubbo.apache.org/dubbo-go/v3/registry/event" -//) -// -//const testName = "test" -// -//func prepareData(t *testing.T) *zk.TestCluster { -// var err error -// tc, err := zk.StartTestCluster(1, nil, nil) -// assert.NoError(t, err) -// assert.NotNil(t, tc.Servers[0]) -// address := "127.0.0.1:" + strconv.Itoa(tc.Servers[0].Port) -// //address := "127.0.0.1:2181" -// -// config.GetRootConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{ -// Protocol: "zookeeper", -// RemoteRef: "test", -// } -// -// config.GetRootConfig().Remotes[testName] = &config.RemoteConfig{ -// Address: address, -// TimeoutStr: "10s", -// } -// return tc -//} -// -//func TestNewZookeeperServiceDiscovery(t *testing.T) { -// _, err := newZookeeperServiceDiscovery() -// -// // the ShutdownConfig not found -// // err: could not init the instance because the config is invalid -// assert.NotNil(t, err) -// -// //sdc := &config.ServiceDiscoveryConfig{ -// // Protocol: "zookeeper", -// // RemoteRef: "mock", -// //} -// //config.GetRootConfig().ServiceDiscoveries[name] = sdc -// _, err = newZookeeperServiceDiscovery() -// -// // RemoteConfig not found -// // err: could not find the remote config for name: mock -// assert.NotNil(t, err) -//} -// -//func TestZookeeperServiceDiscovery_CURDAndListener(t *testing.T) { -// tc := prepareData(t) -// defer func() { -// _ = tc.Stop() -// }() -// t.Run("testCURDZookeeperServiceDiscovery", testCURDZookeeperServiceDiscovery) -// t.Run("testAddListenerZookeeperServiceDiscovery", testAddListenerZookeeperServiceDiscovery) -//} -// -//func testCURDZookeeperServiceDiscovery(t *testing.T) { -// prepareData(t) -// extension.SetEventDispatcher("mock", func() observer.EventDispatcher { -// return dispatcher.NewMockEventDispatcher() -// }) -// extension.SetGlobalServiceNameMapping(func() mapping.ServiceNameMapping { -// return mapping.NewMockServiceNameMapping() -// }) -// -// extension.SetProtocol("mock", func() protocol.Protocol { -// return &mockProtocol{} -// }) -// -// sd, err := newZookeeperServiceDiscovery() -// assert.Nil(t, err) -// defer func() { -// _ = sd.Destroy() -// }() -// ins := ®istry.DefaultServiceInstance{ -// ID: "testID", -// ServiceName: testName, -// Host: "127.0.0.1", -// Port: 2233, -// Enable: true, -// Healthy: true, -// Metadata: nil, -// } -// ins.Metadata = map[string]string{"t1": "test1", constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME: `{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`} -// err = sd.Register(ins) -// -// assert.Nil(t, err) -// -// testsPager := sd.GetHealthyInstancesByPage(testName, 0, 1, true) -// assert.Equal(t, 1, testsPager.GetDataSize()) -// assert.Equal(t, 1, testsPager.GetTotalPages()) -// test := testsPager.GetData()[0].(registry.ServiceInstance) -// assert.Equal(t, "127.0.0.1:2233", test.GetID()) -// assert.Equal(t, "test1", test.GetMetadata()["t1"]) -// -// ins = ®istry.DefaultServiceInstance{ -// ID: "testID", -// ServiceName: testName, -// Host: "127.0.0.1", -// Port: 2233, -// Enable: true, -// Healthy: true, -// } -// ins.Metadata = map[string]string{"t1": "test12", constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME: `{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`} -// -// err = sd.Update(ins) -// -// assert.Nil(t, err) -// -// testsPager = sd.GetInstancesByPage(testName, 0, 1) -// assert.Equal(t, 1, testsPager.GetDataSize()) -// test = testsPager.GetData()[0].(registry.ServiceInstance) -// assert.Equal(t, "test12", test.GetMetadata()["t1"]) -// -// testsMap := sd.GetRequestInstances([]string{testName}, 0, 1) -// assert.Equal(t, 1, len(testsMap)) -// assert.Equal(t, 1, testsMap[testName].GetDataSize()) -// test = testsMap[testName].GetData()[0].(registry.ServiceInstance) -// assert.Equal(t, "test12", test.GetMetadata()["t1"]) -// -// names := sd.GetServices() -// assert.Equal(t, 1, names.Size()) -// assert.Equal(t, testName, names.Values()[0]) -// -// err = sd.Unregister(®istry.DefaultServiceInstance{ -// ID: "testID", -// ServiceName: testName, -// Host: "127.0.0.1", -// Port: 2233, -// Enable: true, -// Healthy: true, -// Metadata: nil, -// }) -// assert.Nil(t, err) -//} -// -//func testAddListenerZookeeperServiceDiscovery(t *testing.T) { -// sd, err := newZookeeperServiceDiscovery() -// assert.Nil(t, err) -// defer func() { -// _ = sd.Destroy() -// }() -// -// ins := ®istry.DefaultServiceInstance{ -// ID: "testID", -// ServiceName: testName, -// Host: "127.0.0.1", -// Port: 2233, -// Enable: true, -// Healthy: true, -// Metadata: nil, -// } -// ins.Metadata = map[string]string{"t1": "test12", constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME: `{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`} -// err = sd.Register(ins) -// -// assert.Nil(t, err) -// wg := &sync.WaitGroup{} -// wg.Add(1) -// tn := &testNotify{ -// wg: wg, -// t: t, -// } -// hs := gxset.NewSet() -// hs.Add(testName) -// -// sicl := event.NewServiceInstancesChangedListener(hs) -// sicl.AddListenerAndNotify(testName, tn) -// extension.SetAndInitGlobalDispatcher("direct") -// extension.GetGlobalDispatcher().AddEventListener(sicl) -// err = sd.AddListener(sicl) -// assert.NoError(t, err) -// -// ins = ®istry.DefaultServiceInstance{ -// ID: "testID", -// ServiceName: testName, -// Host: "127.0.0.1", -// Port: 2233, -// Enable: true, -// Healthy: true, -// Metadata: nil, -// } -// ins.Metadata = map[string]string{"t1": "test12", constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME: `{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`} -// err = sd.Update(ins) -// assert.NoError(t, err) -// tn.wg.Wait() -//} -// -//type testNotify struct { -// wg *sync.WaitGroup -// t *testing.T -//} -// -//func (tn *testNotify) Notify(e *registry.ServiceEvent) { -// assert.Equal(tn.t, "2233", e.Service.Port) -// tn.wg.Done() -//} -//func (tn *testNotify) NotifyAll([]*registry.ServiceEvent, func()) { -// -//} -// -//type mockProtocol struct{} -// -//func (m mockProtocol) Export(protocol.Invoker) protocol.Exporter { -// panic("implement me") -//} -// -//func (m mockProtocol) Refer(*common.URL) protocol.Invoker { -// return &mockInvoker{} -//} -// -//func (m mockProtocol) Destroy() { -// panic("implement me") -//} -// -//type mockInvoker struct{} -// -//func (m *mockInvoker) GetURL() *common.URL { -// panic("implement me") -//} -// -//func (m *mockInvoker) IsAvailable() bool { -// panic("implement me") -//} -// -//func (m *mockInvoker) Destroy() { -// panic("implement me") -//} -// -//func (m *mockInvoker) Invoke(context.Context, protocol.Invocation) protocol.Result { -// // for getMetadataInfo and ServiceInstancesChangedListenerImpl onEvent -// serviceInfo := &common.ServiceInfo{ServiceKey: "test", MatchKey: "test"} -// services := make(map[string]*common.ServiceInfo) -// services["test"] = serviceInfo -// return &protocol.RPCResult{ -// Rest: &common.MetadataInfo{ -// Services: services, -// }, -// } -//} +import ( + "context" + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/protocol" + "dubbo.apache.org/dubbo-go/v3/registry" + "dubbo.apache.org/dubbo-go/v3/registry/event" + "dubbo.apache.org/dubbo-go/v3/remoting/nacos" + "fmt" + gxset "github.com/dubbogo/gost/container/set" + "github.com/nacos-group/nacos-sdk-go/model" + "github.com/nacos-group/nacos-sdk-go/vo" + perrors "github.com/pkg/errors" + "sync" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/common/constant" +) + +const testName = "test" + +func Test_newZookeeperServiceDiscovery(t *testing.T) { + url, _ := common.NewURL("dubbo://127.0.0.1:2181", + common.WithParamsValue(constant.ClientNameKey, "zk-client")) + sd, err := newZookeeperServiceDiscovery(url) + assert.Nil(t, err) + err = sd.Destroy() + assert.Nil(t, err) + +} + +func TestFunction(t *testing.T) { + + extension.SetProtocol("mock", func() protocol.Protocol { + return &mockProtocol{} + }) + + url, _ := common.NewURL("dubbo://127.0.0.1:8848") + sd, _ := newMockNacosServiceDiscovery(url) + defer func() { + _ = sd.Destroy() + }() + + ins := ®istry.DefaultServiceInstance{ + ID: "testID", + ServiceName: testName, + Host: "127.0.0.1", + Port: 2233, + Enable: true, + Healthy: true, + Metadata: nil, + } + ins.Metadata = map[string]string{"t1": "test12", constant.MetadataServiceURLParamsPropertyName: `{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`} + err := sd.Register(ins) + assert.Nil(t, err) + + wg := &sync.WaitGroup{} + wg.Add(1) + tn := &testNotify{ + wg: wg, + t: t, + } + hs := gxset.NewSet() + hs.Add(testName) + + sicl := event.NewServiceInstancesChangedListener(hs) + sicl.AddListenerAndNotify(testName, tn) + err = sd.AddListener(sicl) + assert.NoError(t, err) + + ins = ®istry.DefaultServiceInstance{ + ID: "testID", + ServiceName: testName, + Host: "127.0.0.1", + Port: 2233, + Enable: true, + Healthy: true, + Metadata: nil, + } + ins.Metadata = map[string]string{"t1": "test12", constant.MetadataServiceURLParamsPropertyName: `{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`} + err = sd.Update(ins) + assert.NoError(t, err) + err = sd.Unregister(ins) + assert.Nil(t, err) +} + +func newMockNacosServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { + discoveryURL := common.NewURLWithOptions( + common.WithParams(url.GetParams()), + common.WithParamsValue(constant.TimeoutKey, url.GetParam(constant.RegistryTimeoutKey, constant.DefaultRegTimeout)), + common.WithParamsValue(constant.NacosGroupKey, url.GetParam(constant.RegistryGroupKey, defaultGroup)), + common.WithParamsValue(constant.NacosUsername, url.Username), + common.WithParamsValue(constant.NacosPassword, url.Password), + common.WithParamsValue(constant.ClientNameKey, "nacos-client"), + common.WithParamsValue(constant.NacosNamespaceID, url.GetParam(constant.RegistryNamespaceKey, ""))) + discoveryURL.Location = url.Location + discoveryURL.Username = url.Username + discoveryURL.Password = url.Password + client, err := nacos.NewNacosClientByURL(discoveryURL) + mc := mockClient{} + client.SetClient(mc) + if err != nil { + return nil, perrors.WithMessage(err, "create nacos namingClient failed.") + } + + descriptor := fmt.Sprintf("zk-service-discovery[%s]", discoveryURL.Location) + + group := url.GetParam(constant.RegistryGroupKey, defaultGroup) + newInstance := &nacosServiceDiscovery{ + group: group, + namingClient: client, + descriptor: descriptor, + registryInstances: []registry.ServiceInstance{}, + instanceListenerMap: make(map[string]*gxset.HashSet), + } + return newInstance, nil +} + +type testNotify struct { + wg *sync.WaitGroup + t *testing.T +} + +func (tn *testNotify) Notify(e *registry.ServiceEvent) { + assert.Equal(tn.t, "2233", e.Service.Port) + tn.wg.Done() +} + +func (tn *testNotify) NotifyAll([]*registry.ServiceEvent, func()) {} + +type mockClient struct { + instance []interface{} +} + +func (c mockClient) RegisterInstance(param vo.RegisterInstanceParam) (bool, error) { + return true, nil +} + +func (c mockClient) DeregisterInstance(param vo.DeregisterInstanceParam) (bool, error) { + return true, nil +} + +func (c mockClient) UpdateInstance(param vo.UpdateInstanceParam) (bool, error) { + return true, nil +} + +func (c mockClient) GetService(param vo.GetServiceParam) (model.Service, error) { + panic("implement me") +} + +func (c mockClient) SelectInstances(param vo.SelectInstancesParam) ([]model.Instance, error) { + panic("implement me") +} + +func (c mockClient) SelectAllInstances(param vo.SelectAllInstancesParam) ([]model.Instance, error) { + panic("implement me") +} + +func (c mockClient) SelectOneHealthyInstance(param vo.SelectOneHealthInstanceParam) (*model.Instance, error) { + panic("implement me") +} + +func (c mockClient) Subscribe(param *vo.SubscribeParam) error { + return nil +} + +func (c mockClient) Unsubscribe(param *vo.SubscribeParam) error { + panic("implement me") +} + +func (c mockClient) GetAllServicesInfo(param vo.GetAllServiceInfoParam) (model.ServiceList, error) { + panic("implement me") +} + +type mockProtocol struct{} + +func (m mockProtocol) Export(protocol.Invoker) protocol.Exporter { + panic("implement me") +} + +func (m mockProtocol) Refer(*common.URL) protocol.Invoker { + return &mockInvoker{} +} + +func (m mockProtocol) Destroy() { + panic("implement me") +} + +type mockInvoker struct{} + +func (m *mockInvoker) GetURL() *common.URL { + panic("implement me") +} + +func (m *mockInvoker) IsAvailable() bool { + panic("implement me") +} + +func (m *mockInvoker) Destroy() { + panic("implement me") +} + +func (m *mockInvoker) Invoke(context.Context, protocol.Invocation) protocol.Result { + // for getMetadataInfo and ServiceInstancesChangedListenerImpl onEvent + serviceInfo := &common.ServiceInfo{ServiceKey: "test", MatchKey: "test"} + services := make(map[string]*common.ServiceInfo) + services["test"] = serviceInfo + return &protocol.RPCResult{ + Rest: &common.MetadataInfo{ + Services: services, + }, + } +}