Skip to content

Commit

Permalink
Fix: Replace assignment behavior with copy operation to avoid OOM pro…
Browse files Browse the repository at this point in the history
…blem (#2182)
  • Loading branch information
Lvnszn authored Jan 16, 2023
1 parent ecdff40 commit e504e79
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 58 deletions.
49 changes: 20 additions & 29 deletions common/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ import (

gxset "github.com/dubbogo/gost/container/set"

"github.com/google/uuid"
"github.com/jinzhu/copier"

perrors "github.com/pkg/errors"

"github.com/satori/go.uuid"
)

import (
Expand Down Expand Up @@ -203,7 +201,7 @@ func WithToken(token string) Option {
if len(token) > 0 {
value := token
if strings.ToLower(token) == "true" || strings.ToLower(token) == "default" {
u, _ := uuid.NewV4()
u, _ := uuid.NewUUID()
value = u.String()
}
url.SetParam(constant.TokenKey, value)
Expand Down Expand Up @@ -684,7 +682,8 @@ func (c *URL) ToMap() map[string]string {
// will be added into result.
// for example, if serviceURL contains params (a1->v1, b1->v2) and referenceURL contains params(a2->v3, b1 -> v4)
// the params of result will be (a1->v1, b1->v2, a2->v3).
// You should notice that the value of b1 is v2, not v4.
// You should notice that the value of b1 is v2, not v4
// except constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey.
// due to URL is not thread-safe, so this method is not thread-safe
func MergeURL(serviceURL *URL, referenceURL *URL) *URL {
// After Clone, it is a new URL that there is no thread safe issue.
Expand All @@ -693,16 +692,15 @@ func MergeURL(serviceURL *URL, referenceURL *URL) *URL {
// iterator the referenceURL if serviceURL not have the key ,merge in
// referenceURL usually will not changed. so change RangeParams to GetParams to avoid the string value copy.// Group get group
for key, value := range referenceURL.GetParams() {
if v := mergedURL.GetParam(key, ""); len(v) == 0 {
if len(value) > 0 {
params[key] = value
if v := mergedURL.GetParam(key, ""); len(v) == 0 && len(value) > 0 {
if params == nil {
params = url.Values{}
}
params[key] = make([]string, len(value))
copy(params[key], value)
}
}

// loadBalance,cluster,retries strategy config
methodConfigMergeFcn := mergeNormalParam(params, referenceURL, []string{constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey})

// remote timestamp
if v := serviceURL.GetParam(constant.TimestampKey, ""); len(v) > 0 {
params[constant.RemoteTimestampKey] = []string{v}
Expand All @@ -711,8 +709,17 @@ func MergeURL(serviceURL *URL, referenceURL *URL) *URL {

// finally execute methodConfigMergeFcn
for _, method := range referenceURL.Methods {
for _, fcn := range methodConfigMergeFcn {
fcn("methods." + method)
for _, paramKey := range []string{constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey} {
if v := referenceURL.GetParam(paramKey, ""); len(v) > 0 {
params[paramKey] = []string{v}
}

methodsKey := "methods." + method + "." + paramKey
//if len(mergedURL.GetParam(methodsKey, "")) == 0 {
if v := referenceURL.GetParam(methodsKey, ""); len(v) > 0 {
params[methodsKey] = []string{v}
}
//}
}
}
// In this way, we will raise some performance.
Expand All @@ -732,7 +739,6 @@ func (c *URL) Clone() *URL {
newURL.SetParam(key, value)
return true
})

return newURL
}

Expand Down Expand Up @@ -818,21 +824,6 @@ func IsEquals(left *URL, right *URL, excludes ...string) bool {
return true
}

func mergeNormalParam(params url.Values, referenceURL *URL, paramKeys []string) []func(method string) {
methodConfigMergeFcn := make([]func(method string), 0, len(paramKeys))
for _, paramKey := range paramKeys {
if v := referenceURL.GetParam(paramKey, ""); len(v) > 0 {
params[paramKey] = []string{v}
}
methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) {
if v := referenceURL.GetParam(method+"."+paramKey, ""); len(v) > 0 {
params[method+"."+paramKey] = []string{v}
}
})
}
return methodConfigMergeFcn
}

// URLSlice will be used to sort URL instance
// Instances will be order by URL.String()
type URLSlice []*URL
Expand Down
2 changes: 1 addition & 1 deletion common/url_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func TestMergeUrl(t *testing.T) {
assert.Equal(t, "1", mergedUrl.GetParam("test2", ""))
assert.Equal(t, "1", mergedUrl.GetParam("test3", ""))
assert.Equal(t, "1", mergedUrl.GetParam(constant.RetriesKey, ""))
assert.Equal(t, "2", mergedUrl.GetParam(constant.MethodKeys+".testMethod."+constant.RetriesKey, ""))
assert.Equal(t, "1", mergedUrl.GetParam(constant.MethodKeys+".testMethod."+constant.RetriesKey, ""))
}

func TestURLSetParams(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/vault/sdk v0.6.2
Expand All @@ -43,7 +44,6 @@ require (
github.com/pkg/errors v0.9.1
github.com/polarismesh/polaris-go v1.3.0
github.com/prometheus/client_golang v1.12.2
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/stretchr/testify v1.8.1
go.etcd.io/etcd/api/v3 v3.5.6
go.etcd.io/etcd/client/v3 v3.5.6
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -705,8 +705,6 @@ github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shirou/gopsutil v3.20.11+incompatible h1:LJr4ZQK4mPpIV5gOa4jCOKOGb4ty4DZO54I4FGqIpto=
github.com/shirou/gopsutil v3.20.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
Expand Down
2 changes: 1 addition & 1 deletion registry/directory/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func Test_MergeProviderUrl(t *testing.T) {
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 1)
if len(registryDirectory.cacheInvokers) > 0 {
assert.Equal(t, "mock", registryDirectory.cacheInvokers[0].GetURL().GetParam(constant.ClusterKey, ""))
assert.Equal(t, "mock1", registryDirectory.cacheInvokers[0].GetURL().GetParam(constant.ClusterKey, ""))
}
}

Expand Down
2 changes: 1 addition & 1 deletion registry/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (l *RegistryDataListener) DataChange(event remoting.Event) bool {
listener.Process(
&config_center.ConfigChangeEvent{
Key: event.Path,
Value: serviceURL,
Value: serviceURL.Clone(),
ConfigType: event.Action,
},
)
Expand Down
27 changes: 4 additions & 23 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,13 @@ import (
"sync"
"time"
)

import (
"github.com/dubbogo/go-zookeeper/zk"

gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
"github.com/dubbogo/gost/log/logger"

perrors "github.com/pkg/errors"

uatomic "go.uber.org/atomic"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
Expand Down Expand Up @@ -65,6 +60,7 @@ func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener {
func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remoting.DataListener) {
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
defer l.wg.Done()
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
Expand Down Expand Up @@ -128,7 +124,6 @@ func (l *ZkEventListener) ListenConfigurationEvent(zkPath string, listener remot

// nolint
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
defer l.wg.Done()

l.pathMapLock.Lock()
a, ok := l.pathMap[zkPath]
Expand All @@ -139,15 +134,13 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo
a.Inc()
l.pathMapLock.Unlock()
defer a.Dec()

var zkEvent zk.Event
for {
keyEventCh, err := l.Client.ExistW(zkPath)
if err != nil {
logger.Warnf("existW{key:%s} = error{%v}", zkPath, err)
return false
}

select {
case zkEvent = <-keyEventCh:
logger.Warnf("get a zookeeper keyEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
Expand Down Expand Up @@ -194,34 +187,30 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
}
return false
}

newChildren, err := l.Client.GetChildren(zkPath)
if err != nil {
logger.Errorf("[ZkEventListener handleZkNodeEvent]Path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
return
}

// a node was added -- listen the new node
var (
newNode string
)
for _, n := range newChildren {

newNode = path.Join(zkPath, n)
logger.Debugf("[Zookeeper Listener] add zkNode{%s}", newNode)
content, _, connErr := l.Client.Conn.Get(newNode)
if connErr != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}",
newNode, perrors.WithStack(connErr))
}

if !listener.DataChange(remoting.Event{Path: newNode, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
// listen l service node
l.wg.Add(1)
go func(node string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
defer l.wg.Done()
if l.listenServiceNodeEvent(node, listener) {
logger.Warnf("delete zkNode{%s}", node)
listener.DataChange(remoting.Event{Path: node, Action: remoting.EventTypeDel})
Expand All @@ -239,16 +228,13 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
if contains(newChildren, n) {
continue
}

oldNode = path.Join(zkPath, n)
logger.Warnf("delete oldNode{%s}", oldNode)
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}

func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
defer l.wg.Done()

var (
failTimes int
ttl time.Duration
Expand All @@ -271,7 +257,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
failTimes = MaxFailTimes
}
logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", zkRootPath, err)

// Maybe the provider does not ready yet, sleep failTimes * ConnDelay senconds to wait
after := time.After(timeSecondDuration(failTimes * ConnDelay))
select {
Expand All @@ -293,10 +278,8 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
continue
}
}

// Build the children path
zkNodePath := path.Join(zkRootPath, c)

// Save the path to avoid listen repeatedly
l.pathMapLock.Lock()
_, ok := l.pathMap[zkNodePath]
Expand All @@ -308,15 +291,13 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
logger.Warnf("[Zookeeper EventListener][listenDirEvent] The child with zk path {%s} has already been listened.", zkNodePath)
continue
}

// When Zk disconnected, the Conn will be set to nil, so here need check the value of Conn
l.Client.RLock()
if l.Client.Conn == nil {
l.Client.RUnlock()
break
}
content, _, err := l.Client.Conn.Get(zkNodePath)

l.Client.RUnlock()
if err != nil {
logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get content of the child node {%v} failed, the error is %+v", zkNodePath, perrors.WithStack(err))
Expand All @@ -328,7 +309,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
logger.Debugf("[Zookeeper EventListener][listenDirEvent] listen dubbo service key{%s}", zkNodePath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
defer l.wg.Done()
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
Expand All @@ -340,6 +321,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
if l.startScheduleWatchTask(zkRootPath, children, ttl, listener, childEventCh) {
return

}
}
}
Expand Down Expand Up @@ -380,7 +362,6 @@ func (l *ZkEventListener) startScheduleWatchTask(
}
}
}

func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
Expand Down

0 comments on commit e504e79

Please sign in to comment.