Skip to content

Commit

Permalink
Fix: Replace assignment behavior with copy operation to avoid OOM pro…
Browse files Browse the repository at this point in the history
…blem
  • Loading branch information
Lvnszn committed Jan 15, 2023
1 parent ecdff40 commit 6c47b83
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 69 deletions.
49 changes: 20 additions & 29 deletions common/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ import (

gxset "github.com/dubbogo/gost/container/set"

"github.com/google/uuid"
"github.com/jinzhu/copier"

perrors "github.com/pkg/errors"

"github.com/satori/go.uuid"
)

import (
Expand Down Expand Up @@ -203,7 +201,7 @@ func WithToken(token string) Option {
if len(token) > 0 {
value := token
if strings.ToLower(token) == "true" || strings.ToLower(token) == "default" {
u, _ := uuid.NewV4()
u, _ := uuid.NewUUID()
value = u.String()
}
url.SetParam(constant.TokenKey, value)
Expand Down Expand Up @@ -684,7 +682,8 @@ func (c *URL) ToMap() map[string]string {
// will be added into result.
// for example, if serviceURL contains params (a1->v1, b1->v2) and referenceURL contains params(a2->v3, b1 -> v4)
// the params of result will be (a1->v1, b1->v2, a2->v3).
// You should notice that the value of b1 is v2, not v4.
// You should notice that the value of b1 is v2, not v4
// except constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey.
// due to URL is not thread-safe, so this method is not thread-safe
func MergeURL(serviceURL *URL, referenceURL *URL) *URL {
// After Clone, it is a new URL that there is no thread safe issue.
Expand All @@ -693,16 +692,15 @@ func MergeURL(serviceURL *URL, referenceURL *URL) *URL {
// iterator the referenceURL if serviceURL not have the key ,merge in
// referenceURL usually will not changed. so change RangeParams to GetParams to avoid the string value copy.// Group get group
for key, value := range referenceURL.GetParams() {
if v := mergedURL.GetParam(key, ""); len(v) == 0 {
if len(value) > 0 {
params[key] = value
if v := mergedURL.GetParam(key, ""); len(v) == 0 && len(value) > 0 {
if params == nil {
params = url.Values{}
}
params[key] = make([]string, len(value))
copy(params[key], value)
}
}

// loadBalance,cluster,retries strategy config
methodConfigMergeFcn := mergeNormalParam(params, referenceURL, []string{constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey})

// remote timestamp
if v := serviceURL.GetParam(constant.TimestampKey, ""); len(v) > 0 {
params[constant.RemoteTimestampKey] = []string{v}
Expand All @@ -711,8 +709,17 @@ func MergeURL(serviceURL *URL, referenceURL *URL) *URL {

// finally execute methodConfigMergeFcn
for _, method := range referenceURL.Methods {
for _, fcn := range methodConfigMergeFcn {
fcn("methods." + method)
for _, paramKey := range []string{constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey} {
if v := referenceURL.GetParam(paramKey, ""); len(v) > 0 {
params[paramKey] = []string{v}
}

methodsKey := "methods." + method + "." + paramKey
//if len(mergedURL.GetParam(methodsKey, "")) == 0 {
if v := referenceURL.GetParam(methodsKey, ""); len(v) > 0 {
params[methodsKey] = []string{v}
}
//}
}
}
// In this way, we will raise some performance.
Expand All @@ -732,7 +739,6 @@ func (c *URL) Clone() *URL {
newURL.SetParam(key, value)
return true
})

return newURL
}

Expand Down Expand Up @@ -818,21 +824,6 @@ func IsEquals(left *URL, right *URL, excludes ...string) bool {
return true
}

func mergeNormalParam(params url.Values, referenceURL *URL, paramKeys []string) []func(method string) {
methodConfigMergeFcn := make([]func(method string), 0, len(paramKeys))
for _, paramKey := range paramKeys {
if v := referenceURL.GetParam(paramKey, ""); len(v) > 0 {
params[paramKey] = []string{v}
}
methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) {
if v := referenceURL.GetParam(method+"."+paramKey, ""); len(v) > 0 {
params[method+"."+paramKey] = []string{v}
}
})
}
return methodConfigMergeFcn
}

// URLSlice will be used to sort URL instance
// Instances will be order by URL.String()
type URLSlice []*URL
Expand Down
2 changes: 1 addition & 1 deletion common/url_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func TestMergeUrl(t *testing.T) {
assert.Equal(t, "1", mergedUrl.GetParam("test2", ""))
assert.Equal(t, "1", mergedUrl.GetParam("test3", ""))
assert.Equal(t, "1", mergedUrl.GetParam(constant.RetriesKey, ""))
assert.Equal(t, "2", mergedUrl.GetParam(constant.MethodKeys+".testMethod."+constant.RetriesKey, ""))
assert.Equal(t, "1", mergedUrl.GetParam(constant.MethodKeys+".testMethod."+constant.RetriesKey, ""))
}

func TestURLSetParams(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/vault/sdk v0.6.2
Expand All @@ -43,7 +44,6 @@ require (
github.com/pkg/errors v0.9.1
github.com/polarismesh/polaris-go v1.3.0
github.com/prometheus/client_golang v1.12.2
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/stretchr/testify v1.8.1
go.etcd.io/etcd/api/v3 v3.5.6
go.etcd.io/etcd/client/v3 v3.5.6
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -705,8 +705,6 @@ github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shirou/gopsutil v3.20.11+incompatible h1:LJr4ZQK4mPpIV5gOa4jCOKOGb4ty4DZO54I4FGqIpto=
github.com/shirou/gopsutil v3.20.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
Expand Down
2 changes: 1 addition & 1 deletion registry/directory/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func Test_MergeProviderUrl(t *testing.T) {
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 1)
if len(registryDirectory.cacheInvokers) > 0 {
assert.Equal(t, "mock", registryDirectory.cacheInvokers[0].GetURL().GetParam(constant.ClusterKey, ""))
assert.Equal(t, "mock1", registryDirectory.cacheInvokers[0].GetURL().GetParam(constant.ClusterKey, ""))
}
}

Expand Down
2 changes: 1 addition & 1 deletion registry/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (l *RegistryDataListener) DataChange(event remoting.Event) bool {
listener.Process(
&config_center.ConfigChangeEvent{
Key: event.Path,
Value: serviceURL,
Value: serviceURL.Clone(),
ConfigType: event.Action,
},
)
Expand Down
56 changes: 22 additions & 34 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,13 @@ func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener {

// ListenServiceNodeEvent listen a path node event
func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remoting.DataListener) {
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
delete(l.pathMap, zkPath)
l.pathMapLock.Unlock()
}
logger.Warnf("ListenServiceNodeEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
delete(l.pathMap, zkPath)
l.pathMapLock.Unlock()
}
logger.Warnf("ListenServiceNodeEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
}

// ListenConfigurationEvent listen a path node event
Expand Down Expand Up @@ -128,7 +125,6 @@ func (l *ZkEventListener) ListenConfigurationEvent(zkPath string, listener remot

// nolint
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
defer l.wg.Done()

l.pathMapLock.Lock()
a, ok := l.pathMap[zkPath]
Expand Down Expand Up @@ -219,18 +215,14 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
continue
}
// listen l service node
l.wg.Add(1)
go func(node string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
if l.listenServiceNodeEvent(node, listener) {
logger.Warnf("delete zkNode{%s}", node)
listener.DataChange(remoting.Event{Path: node, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
delete(l.pathMap, zkPath)
l.pathMapLock.Unlock()
}
logger.Debugf("handleZkNodeEvent->listenSelf(zk path{%s}) goroutine exit now", node)
}(newNode, listener)
if l.listenServiceNodeEvent(newNode, listener) {
logger.Warnf("delete zkNode{%s}", newNode)
listener.DataChange(remoting.Event{Path: newNode, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
delete(l.pathMap, zkPath)
l.pathMapLock.Unlock()
}
logger.Debugf("handleZkNodeEvent->listenSelf(zk path{%s}) goroutine exit now", newNode)
}

// old node was deleted
Expand Down Expand Up @@ -326,17 +318,13 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
continue
}
logger.Debugf("[Zookeeper EventListener][listenDirEvent] listen dubbo service key{%s}", zkNodePath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
delete(l.pathMap, zkPath)
l.pathMapLock.Unlock()
}
logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(zkNodePath, listener)
if l.listenServiceNodeEvent(zkNodePath, listener) {
listener.DataChange(remoting.Event{Path: zkNodePath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
delete(l.pathMap, zkNodePath)
l.pathMapLock.Unlock()
}
logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now", zkNodePath)
}
if l.startScheduleWatchTask(zkRootPath, children, ttl, listener, childEventCh) {
return
Expand Down

0 comments on commit 6c47b83

Please sign in to comment.