From e504e7982712455143fefe0bc1656010691327bf Mon Sep 17 00:00:00 2001 From: Jason Peng Date: Mon, 16 Jan 2023 14:39:46 +0800 Subject: [PATCH] Fix: Replace assignment behavior with copy operation to avoid OOM problem (#2182) --- common/url.go | 49 ++++++++++++---------------- common/url_test.go | 2 +- go.mod | 2 +- go.sum | 2 -- registry/directory/directory_test.go | 2 +- registry/zookeeper/listener.go | 2 +- remoting/zookeeper/listener.go | 27 +++------------ 7 files changed, 28 insertions(+), 58 deletions(-) diff --git a/common/url.go b/common/url.go index 75e207010a..e0ef856f5c 100644 --- a/common/url.go +++ b/common/url.go @@ -35,11 +35,9 @@ import ( gxset "github.com/dubbogo/gost/container/set" + "github.com/google/uuid" "github.com/jinzhu/copier" - perrors "github.com/pkg/errors" - - "github.com/satori/go.uuid" ) import ( @@ -203,7 +201,7 @@ func WithToken(token string) Option { if len(token) > 0 { value := token if strings.ToLower(token) == "true" || strings.ToLower(token) == "default" { - u, _ := uuid.NewV4() + u, _ := uuid.NewUUID() value = u.String() } url.SetParam(constant.TokenKey, value) @@ -684,7 +682,8 @@ func (c *URL) ToMap() map[string]string { // will be added into result. // for example, if serviceURL contains params (a1->v1, b1->v2) and referenceURL contains params(a2->v3, b1 -> v4) // the params of result will be (a1->v1, b1->v2, a2->v3). -// You should notice that the value of b1 is v2, not v4. +// You should notice that the value of b1 is v2, not v4 +// except constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey. // due to URL is not thread-safe, so this method is not thread-safe func MergeURL(serviceURL *URL, referenceURL *URL) *URL { // After Clone, it is a new URL that there is no thread safe issue. @@ -693,16 +692,15 @@ func MergeURL(serviceURL *URL, referenceURL *URL) *URL { // iterator the referenceURL if serviceURL not have the key ,merge in // referenceURL usually will not changed. so change RangeParams to GetParams to avoid the string value copy.// Group get group for key, value := range referenceURL.GetParams() { - if v := mergedURL.GetParam(key, ""); len(v) == 0 { - if len(value) > 0 { - params[key] = value + if v := mergedURL.GetParam(key, ""); len(v) == 0 && len(value) > 0 { + if params == nil { + params = url.Values{} } + params[key] = make([]string, len(value)) + copy(params[key], value) } } - // loadBalance,cluster,retries strategy config - methodConfigMergeFcn := mergeNormalParam(params, referenceURL, []string{constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey}) - // remote timestamp if v := serviceURL.GetParam(constant.TimestampKey, ""); len(v) > 0 { params[constant.RemoteTimestampKey] = []string{v} @@ -711,8 +709,17 @@ func MergeURL(serviceURL *URL, referenceURL *URL) *URL { // finally execute methodConfigMergeFcn for _, method := range referenceURL.Methods { - for _, fcn := range methodConfigMergeFcn { - fcn("methods." + method) + for _, paramKey := range []string{constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey} { + if v := referenceURL.GetParam(paramKey, ""); len(v) > 0 { + params[paramKey] = []string{v} + } + + methodsKey := "methods." + method + "." + paramKey + //if len(mergedURL.GetParam(methodsKey, "")) == 0 { + if v := referenceURL.GetParam(methodsKey, ""); len(v) > 0 { + params[methodsKey] = []string{v} + } + //} } } // In this way, we will raise some performance. @@ -732,7 +739,6 @@ func (c *URL) Clone() *URL { newURL.SetParam(key, value) return true }) - return newURL } @@ -818,21 +824,6 @@ func IsEquals(left *URL, right *URL, excludes ...string) bool { return true } -func mergeNormalParam(params url.Values, referenceURL *URL, paramKeys []string) []func(method string) { - methodConfigMergeFcn := make([]func(method string), 0, len(paramKeys)) - for _, paramKey := range paramKeys { - if v := referenceURL.GetParam(paramKey, ""); len(v) > 0 { - params[paramKey] = []string{v} - } - methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) { - if v := referenceURL.GetParam(method+"."+paramKey, ""); len(v) > 0 { - params[method+"."+paramKey] = []string{v} - } - }) - } - return methodConfigMergeFcn -} - // URLSlice will be used to sort URL instance // Instances will be order by URL.String() type URLSlice []*URL diff --git a/common/url_test.go b/common/url_test.go index 66f000364c..dcb2ce8237 100644 --- a/common/url_test.go +++ b/common/url_test.go @@ -323,7 +323,7 @@ func TestMergeUrl(t *testing.T) { assert.Equal(t, "1", mergedUrl.GetParam("test2", "")) assert.Equal(t, "1", mergedUrl.GetParam("test3", "")) assert.Equal(t, "1", mergedUrl.GetParam(constant.RetriesKey, "")) - assert.Equal(t, "2", mergedUrl.GetParam(constant.MethodKeys+".testMethod."+constant.RetriesKey, "")) + assert.Equal(t, "1", mergedUrl.GetParam(constant.MethodKeys+".testMethod."+constant.RetriesKey, "")) } func TestURLSetParams(t *testing.T) { diff --git a/go.mod b/go.mod index 5e1d25ed1b..227df9c70e 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.2 github.com/google/go-cmp v0.5.9 + github.com/google/uuid v1.3.0 github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99 // indirect github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 github.com/hashicorp/vault/sdk v0.6.2 @@ -43,7 +44,6 @@ require ( github.com/pkg/errors v0.9.1 github.com/polarismesh/polaris-go v1.3.0 github.com/prometheus/client_golang v1.12.2 - github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b github.com/stretchr/testify v1.8.1 go.etcd.io/etcd/api/v3 v3.5.6 go.etcd.io/etcd/client/v3 v3.5.6 diff --git a/go.sum b/go.sum index e80538d648..fdff49893a 100644 --- a/go.sum +++ b/go.sum @@ -705,8 +705,6 @@ github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= -github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM= -github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/shirou/gopsutil v3.20.11+incompatible h1:LJr4ZQK4mPpIV5gOa4jCOKOGb4ty4DZO54I4FGqIpto= github.com/shirou/gopsutil v3.20.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index 33814430ad..b26459d378 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -90,7 +90,7 @@ func Test_MergeProviderUrl(t *testing.T) { time.Sleep(1e9) assert.Len(t, registryDirectory.cacheInvokers, 1) if len(registryDirectory.cacheInvokers) > 0 { - assert.Equal(t, "mock", registryDirectory.cacheInvokers[0].GetURL().GetParam(constant.ClusterKey, "")) + assert.Equal(t, "mock1", registryDirectory.cacheInvokers[0].GetURL().GetParam(constant.ClusterKey, "")) } } diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 860b48deca..65871adb9a 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -97,7 +97,7 @@ func (l *RegistryDataListener) DataChange(event remoting.Event) bool { listener.Process( &config_center.ConfigChangeEvent{ Key: event.Path, - Value: serviceURL, + Value: serviceURL.Clone(), ConfigType: event.Action, }, ) diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index d3257f9cea..332accdf4c 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -23,18 +23,13 @@ import ( "sync" "time" ) - import ( "github.com/dubbogo/go-zookeeper/zk" - gxzookeeper "github.com/dubbogo/gost/database/kv/zk" "github.com/dubbogo/gost/log/logger" - perrors "github.com/pkg/errors" - uatomic "go.uber.org/atomic" ) - import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" @@ -65,6 +60,7 @@ func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener { func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remoting.DataListener) { l.wg.Add(1) go func(zkPath string, listener remoting.DataListener) { + defer l.wg.Done() if l.listenServiceNodeEvent(zkPath, listener) { listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) l.pathMapLock.Lock() @@ -128,7 +124,6 @@ func (l *ZkEventListener) ListenConfigurationEvent(zkPath string, listener remot // nolint func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool { - defer l.wg.Done() l.pathMapLock.Lock() a, ok := l.pathMap[zkPath] @@ -139,7 +134,6 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo a.Inc() l.pathMapLock.Unlock() defer a.Dec() - var zkEvent zk.Event for { keyEventCh, err := l.Client.ExistW(zkPath) @@ -147,7 +141,6 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo logger.Warnf("existW{key:%s} = error{%v}", zkPath, err) return false } - select { case zkEvent = <-keyEventCh: logger.Warnf("get a zookeeper keyEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", @@ -194,19 +187,16 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li } return false } - newChildren, err := l.Client.GetChildren(zkPath) if err != nil { logger.Errorf("[ZkEventListener handleZkNodeEvent]Path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err)) return } - // a node was added -- listen the new node var ( newNode string ) for _, n := range newChildren { - newNode = path.Join(zkPath, n) logger.Debugf("[Zookeeper Listener] add zkNode{%s}", newNode) content, _, connErr := l.Client.Conn.Get(newNode) @@ -214,14 +204,13 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li logger.Errorf("Get new node path {%v} 's content error,message is {%v}", newNode, perrors.WithStack(connErr)) } - if !listener.DataChange(remoting.Event{Path: newNode, Action: remoting.EventTypeAdd, Content: string(content)}) { continue } // listen l service node l.wg.Add(1) go func(node string, listener remoting.DataListener) { - // invoker l.wg.Done() in l.listenServiceNodeEvent + defer l.wg.Done() if l.listenServiceNodeEvent(node, listener) { logger.Warnf("delete zkNode{%s}", node) listener.DataChange(remoting.Event{Path: node, Action: remoting.EventTypeDel}) @@ -239,16 +228,13 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li if contains(newChildren, n) { continue } - oldNode = path.Join(zkPath, n) logger.Warnf("delete oldNode{%s}", oldNode) listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel}) } } - func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) { defer l.wg.Done() - var ( failTimes int ttl time.Duration @@ -271,7 +257,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li failTimes = MaxFailTimes } logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", zkRootPath, err) - // Maybe the provider does not ready yet, sleep failTimes * ConnDelay senconds to wait after := time.After(timeSecondDuration(failTimes * ConnDelay)) select { @@ -293,10 +278,8 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li continue } } - // Build the children path zkNodePath := path.Join(zkRootPath, c) - // Save the path to avoid listen repeatedly l.pathMapLock.Lock() _, ok := l.pathMap[zkNodePath] @@ -308,7 +291,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li logger.Warnf("[Zookeeper EventListener][listenDirEvent] The child with zk path {%s} has already been listened.", zkNodePath) continue } - // When Zk disconnected, the Conn will be set to nil, so here need check the value of Conn l.Client.RLock() if l.Client.Conn == nil { @@ -316,7 +298,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li break } content, _, err := l.Client.Conn.Get(zkNodePath) - l.Client.RUnlock() if err != nil { logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get content of the child node {%v} failed, the error is %+v", zkNodePath, perrors.WithStack(err)) @@ -328,7 +309,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li logger.Debugf("[Zookeeper EventListener][listenDirEvent] listen dubbo service key{%s}", zkNodePath) l.wg.Add(1) go func(zkPath string, listener remoting.DataListener) { - // invoker l.wg.Done() in l.listenServiceNodeEvent + defer l.wg.Done() if l.listenServiceNodeEvent(zkPath, listener) { listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) l.pathMapLock.Lock() @@ -340,6 +321,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li } if l.startScheduleWatchTask(zkRootPath, children, ttl, listener, childEventCh) { return + } } } @@ -380,7 +362,6 @@ func (l *ZkEventListener) startScheduleWatchTask( } } } - func timeSecondDuration(sec int) time.Duration { return time.Duration(sec) * time.Second }