Skip to content

Commit

Permalink
Merge pull request #364 from fangyincheng/1.3
Browse files Browse the repository at this point in the history
Merge pull request #358 from pantianying/addRlockForDubboInvoker
  • Loading branch information
fangyincheng authored Feb 22, 2020
2 parents 7109429 + def1159 commit 39e4158
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 17 deletions.
40 changes: 34 additions & 6 deletions protocol/dubbo/dubbo_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package dubbo
import (
"strconv"
"sync"
"sync/atomic"
"time"
)

import (
Expand All @@ -34,7 +36,11 @@ import (
invocation_impl "github.com/apache/dubbo-go/protocol/invocation"
)

var Err_No_Reply = perrors.New("request need @response")
var (
// ErrNoReply ...
ErrNoReply = perrors.New("request need @response")
ErrDestroyedInvoker = perrors.New("request Destroyed invoker")
)

var (
attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY}
Expand All @@ -44,12 +50,15 @@ type DubboInvoker struct {
protocol.BaseInvoker
client *Client
quitOnce sync.Once
// Used to record the number of requests. -1 represent this DubboInvoker is destroyed
reqNum int64
}

func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker {
return &DubboInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
client: client,
reqNum: 0,
}
}

Expand All @@ -59,6 +68,15 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
err error
result protocol.RPCResult
)
if di.reqNum < 0 {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this dubboInvoker is destroyed")
result.Err = ErrDestroyedInvoker
return &result
}
atomic.AddInt64(&(di.reqNum), 1)
defer atomic.AddInt64(&(di.reqNum), -1)

inv := invocation.(*invocation_impl.RPCInvocation)
for _, k := range attachmentKey {
Expand All @@ -82,7 +100,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
}
} else {
if inv.Reply() == nil {
result.Err = Err_No_Reply
result.Err = ErrNoReply
} else {
result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response)
}
Expand All @@ -98,10 +116,20 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {

func (di *DubboInvoker) Destroy() {
di.quitOnce.Do(func() {
di.BaseInvoker.Destroy()

if di.client != nil {
di.client.Close()
for {
if di.reqNum == 0 {
di.reqNum = -1
logger.Info("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key())
di.BaseInvoker.Destroy()
if di.client != nil {
di.client.Close()
di.client = nil
}
break
}
logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key())
time.Sleep(1 * time.Second)
}

})
}
39 changes: 28 additions & 11 deletions registry/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
}

func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
var url *common.URL
var (
url *common.URL
oldInvoker protocol.Invoker = nil
)
//judge is override or others
if res != nil {
url = &res.Service
Expand All @@ -123,10 +126,10 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
switch res.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
//dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL)
dir.cacheInvoker(url)
oldInvoker = dir.cacheInvoker(url)
case remoting.EventTypeDel:
//dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL)
dir.uncacheInvoker(url)
oldInvoker = dir.uncacheInvoker(url)
logger.Infof("selector delete service url{%s}", res.Service)
default:
return
Expand All @@ -135,8 +138,14 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {

newInvokers := dir.toGroupInvokers()
dir.listenerLock.Lock()
defer dir.listenerLock.Unlock()
dir.cacheInvokers = newInvokers
dir.listenerLock.Unlock()
// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
if oldInvoker != nil {
oldInvoker.Destroy()
}

}

func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
Expand Down Expand Up @@ -174,12 +183,18 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
return groupInvokersList
}

func (dir *registryDirectory) uncacheInvoker(url *common.URL) {
// uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key())
dir.cacheInvokersMap.Delete(url.Key())
if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok {
dir.cacheInvokersMap.Delete(url.Key())
return cacheInvoker.(protocol.Invoker)
}
return nil
}

func (dir *registryDirectory) cacheInvoker(url *common.URL) {
// cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL

Expand All @@ -190,7 +205,7 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
}
if url == nil {
logger.Error("URL is nil ,pls check if service url is subscribe successfully!")
return
return nil
}
//check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
Expand All @@ -207,10 +222,11 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
cacheInvoker.(protocol.Invoker).Destroy()
return cacheInvoker.(protocol.Invoker)
}
}
}
return nil
}

//select the protocol invokers from the directory
Expand All @@ -235,10 +251,11 @@ func (dir *registryDirectory) IsAvailable() bool {
func (dir *registryDirectory) Destroy() {
//TODO:unregister & unsubscribe
dir.BaseDirectory.Destroy(func() {
for _, ivk := range dir.cacheInvokers {
invokers := dir.cacheInvokers
dir.cacheInvokers = []protocol.Invoker{}
for _, ivk := range invokers {
ivk.Destroy()
}
dir.cacheInvokers = []protocol.Invoker{}
})
}

Expand Down

0 comments on commit 39e4158

Please sign in to comment.