Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Replace assignment behavior with copy operation to avoid OOM pro… #2182

Merged
merged 1 commit into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 20 additions & 29 deletions common/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ import (

gxset "github.com/dubbogo/gost/container/set"

"github.com/google/uuid"
"github.com/jinzhu/copier"

perrors "github.com/pkg/errors"

"github.com/satori/go.uuid"
)

import (
Expand Down Expand Up @@ -203,7 +201,7 @@ func WithToken(token string) Option {
if len(token) > 0 {
value := token
if strings.ToLower(token) == "true" || strings.ToLower(token) == "default" {
u, _ := uuid.NewV4()
u, _ := uuid.NewUUID()
value = u.String()
}
url.SetParam(constant.TokenKey, value)
Expand Down Expand Up @@ -684,7 +682,8 @@ func (c *URL) ToMap() map[string]string {
// will be added into result.
// for example, if serviceURL contains params (a1->v1, b1->v2) and referenceURL contains params(a2->v3, b1 -> v4)
// the params of result will be (a1->v1, b1->v2, a2->v3).
// You should notice that the value of b1 is v2, not v4.
// You should notice that the value of b1 is v2, not v4
// except constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey.
// due to URL is not thread-safe, so this method is not thread-safe
func MergeURL(serviceURL *URL, referenceURL *URL) *URL {
// After Clone, it is a new URL that there is no thread safe issue.
Expand All @@ -693,16 +692,15 @@ func MergeURL(serviceURL *URL, referenceURL *URL) *URL {
// iterator the referenceURL if serviceURL not have the key ,merge in
// referenceURL usually will not changed. so change RangeParams to GetParams to avoid the string value copy.// Group get group
for key, value := range referenceURL.GetParams() {
if v := mergedURL.GetParam(key, ""); len(v) == 0 {
if len(value) > 0 {
params[key] = value
if v := mergedURL.GetParam(key, ""); len(v) == 0 && len(value) > 0 {
if params == nil {
params = url.Values{}
}
params[key] = make([]string, len(value))
copy(params[key], value)
}
}

// loadBalance,cluster,retries strategy config
methodConfigMergeFcn := mergeNormalParam(params, referenceURL, []string{constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey})

// remote timestamp
if v := serviceURL.GetParam(constant.TimestampKey, ""); len(v) > 0 {
params[constant.RemoteTimestampKey] = []string{v}
Expand All @@ -711,8 +709,17 @@ func MergeURL(serviceURL *URL, referenceURL *URL) *URL {

// finally execute methodConfigMergeFcn
for _, method := range referenceURL.Methods {
for _, fcn := range methodConfigMergeFcn {
fcn("methods." + method)
for _, paramKey := range []string{constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey} {
if v := referenceURL.GetParam(paramKey, ""); len(v) > 0 {
params[paramKey] = []string{v}
}

methodsKey := "methods." + method + "." + paramKey
//if len(mergedURL.GetParam(methodsKey, "")) == 0 {
if v := referenceURL.GetParam(methodsKey, ""); len(v) > 0 {
params[methodsKey] = []string{v}
}
//}
}
}
// In this way, we will raise some performance.
Expand All @@ -732,7 +739,6 @@ func (c *URL) Clone() *URL {
newURL.SetParam(key, value)
return true
})

return newURL
}

Expand Down Expand Up @@ -818,21 +824,6 @@ func IsEquals(left *URL, right *URL, excludes ...string) bool {
return true
}

func mergeNormalParam(params url.Values, referenceURL *URL, paramKeys []string) []func(method string) {
methodConfigMergeFcn := make([]func(method string), 0, len(paramKeys))
for _, paramKey := range paramKeys {
if v := referenceURL.GetParam(paramKey, ""); len(v) > 0 {
params[paramKey] = []string{v}
}
methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) {
if v := referenceURL.GetParam(method+"."+paramKey, ""); len(v) > 0 {
params[method+"."+paramKey] = []string{v}
}
})
}
return methodConfigMergeFcn
}

// URLSlice will be used to sort URL instance
// Instances will be order by URL.String()
type URLSlice []*URL
Expand Down
2 changes: 1 addition & 1 deletion common/url_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func TestMergeUrl(t *testing.T) {
assert.Equal(t, "1", mergedUrl.GetParam("test2", ""))
assert.Equal(t, "1", mergedUrl.GetParam("test3", ""))
assert.Equal(t, "1", mergedUrl.GetParam(constant.RetriesKey, ""))
assert.Equal(t, "2", mergedUrl.GetParam(constant.MethodKeys+".testMethod."+constant.RetriesKey, ""))
assert.Equal(t, "1", mergedUrl.GetParam(constant.MethodKeys+".testMethod."+constant.RetriesKey, ""))
}

func TestURLSetParams(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/vault/sdk v0.6.2
Expand All @@ -43,7 +44,6 @@ require (
github.com/pkg/errors v0.9.1
github.com/polarismesh/polaris-go v1.3.0
github.com/prometheus/client_golang v1.12.2
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/stretchr/testify v1.8.1
go.etcd.io/etcd/api/v3 v3.5.6
go.etcd.io/etcd/client/v3 v3.5.6
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -705,8 +705,6 @@ github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shirou/gopsutil v3.20.11+incompatible h1:LJr4ZQK4mPpIV5gOa4jCOKOGb4ty4DZO54I4FGqIpto=
github.com/shirou/gopsutil v3.20.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
Expand Down
2 changes: 1 addition & 1 deletion registry/directory/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func Test_MergeProviderUrl(t *testing.T) {
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 1)
if len(registryDirectory.cacheInvokers) > 0 {
assert.Equal(t, "mock", registryDirectory.cacheInvokers[0].GetURL().GetParam(constant.ClusterKey, ""))
assert.Equal(t, "mock1", registryDirectory.cacheInvokers[0].GetURL().GetParam(constant.ClusterKey, ""))
}
}

Expand Down
2 changes: 1 addition & 1 deletion registry/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (l *RegistryDataListener) DataChange(event remoting.Event) bool {
listener.Process(
&config_center.ConfigChangeEvent{
Key: event.Path,
Value: serviceURL,
Value: serviceURL.Clone(),
ConfigType: event.Action,
},
)
Expand Down
27 changes: 4 additions & 23 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,13 @@ import (
"sync"
"time"
)

import (
"github.com/dubbogo/go-zookeeper/zk"

gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
"github.com/dubbogo/gost/log/logger"

perrors "github.com/pkg/errors"

uatomic "go.uber.org/atomic"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
Expand Down Expand Up @@ -65,6 +60,7 @@ func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener {
func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remoting.DataListener) {
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
defer l.wg.Done()
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
Expand Down Expand Up @@ -128,7 +124,6 @@ func (l *ZkEventListener) ListenConfigurationEvent(zkPath string, listener remot

// nolint
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
defer l.wg.Done()

l.pathMapLock.Lock()
a, ok := l.pathMap[zkPath]
Expand All @@ -139,15 +134,13 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo
a.Inc()
l.pathMapLock.Unlock()
defer a.Dec()

var zkEvent zk.Event
for {
keyEventCh, err := l.Client.ExistW(zkPath)
if err != nil {
logger.Warnf("existW{key:%s} = error{%v}", zkPath, err)
return false
}

select {
case zkEvent = <-keyEventCh:
logger.Warnf("get a zookeeper keyEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
Expand Down Expand Up @@ -194,34 +187,30 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
}
return false
}

newChildren, err := l.Client.GetChildren(zkPath)
if err != nil {
logger.Errorf("[ZkEventListener handleZkNodeEvent]Path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
return
}

// a node was added -- listen the new node
var (
newNode string
)
for _, n := range newChildren {

newNode = path.Join(zkPath, n)
logger.Debugf("[Zookeeper Listener] add zkNode{%s}", newNode)
content, _, connErr := l.Client.Conn.Get(newNode)
if connErr != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}",
newNode, perrors.WithStack(connErr))
}

if !listener.DataChange(remoting.Event{Path: newNode, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
// listen l service node
l.wg.Add(1)
go func(node string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
defer l.wg.Done()
if l.listenServiceNodeEvent(node, listener) {
logger.Warnf("delete zkNode{%s}", node)
listener.DataChange(remoting.Event{Path: node, Action: remoting.EventTypeDel})
Expand All @@ -239,16 +228,13 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
if contains(newChildren, n) {
continue
}

oldNode = path.Join(zkPath, n)
logger.Warnf("delete oldNode{%s}", oldNode)
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}

func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
defer l.wg.Done()

var (
failTimes int
ttl time.Duration
Expand All @@ -271,7 +257,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
failTimes = MaxFailTimes
}
logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", zkRootPath, err)

// Maybe the provider does not ready yet, sleep failTimes * ConnDelay senconds to wait
after := time.After(timeSecondDuration(failTimes * ConnDelay))
select {
Expand All @@ -293,10 +278,8 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
continue
}
}

// Build the children path
zkNodePath := path.Join(zkRootPath, c)

// Save the path to avoid listen repeatedly
l.pathMapLock.Lock()
_, ok := l.pathMap[zkNodePath]
Expand All @@ -308,15 +291,13 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
logger.Warnf("[Zookeeper EventListener][listenDirEvent] The child with zk path {%s} has already been listened.", zkNodePath)
continue
}

// When Zk disconnected, the Conn will be set to nil, so here need check the value of Conn
l.Client.RLock()
if l.Client.Conn == nil {
l.Client.RUnlock()
break
}
content, _, err := l.Client.Conn.Get(zkNodePath)

l.Client.RUnlock()
if err != nil {
logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get content of the child node {%v} failed, the error is %+v", zkNodePath, perrors.WithStack(err))
Expand All @@ -328,7 +309,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
logger.Debugf("[Zookeeper EventListener][listenDirEvent] listen dubbo service key{%s}", zkNodePath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
defer l.wg.Done()
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
Expand All @@ -340,6 +321,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
if l.startScheduleWatchTask(zkRootPath, children, ttl, listener, childEventCh) {
return

}
}
}
Expand Down Expand Up @@ -380,7 +362,6 @@ func (l *ZkEventListener) startScheduleWatchTask(
}
}
}

func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
Expand Down