Skip to content

Commit

Permalink
Merge pull request #358 from pantianying/addRlockForDubboInvoker
Browse files Browse the repository at this point in the history
Fix:deal the panic when invoker destroy
  • Loading branch information
flycash committed Feb 20, 2020
2 parents 2d4022d + 873c7d9 commit 2d70b6f
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 16 deletions.
35 changes: 30 additions & 5 deletions protocol/dubbo/dubbo_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"context"
"strconv"
"sync"
"sync/atomic"
"time"
)

import (
Expand All @@ -38,7 +40,8 @@ import (

var (
// ErrNoReply ...
ErrNoReply = perrors.New("request need @response")
ErrNoReply = perrors.New("request need @response")
ErrDestroyedInvoker = perrors.New("request Destroyed invoker")
)

var (
Expand All @@ -50,13 +53,16 @@ type DubboInvoker struct {
protocol.BaseInvoker
client *Client
quitOnce sync.Once
// Used to record the number of requests. -1 represent this DubboInvoker is destroyed
reqNum int64
}

// NewDubboInvoker ...
func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker {
return &DubboInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
client: client,
reqNum: 0,
}
}

Expand All @@ -66,6 +72,15 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
err error
result protocol.RPCResult
)
if di.reqNum < 0 {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this dubboInvoker is destroyed")
result.Err = ErrDestroyedInvoker
return &result
}
atomic.AddInt64(&(di.reqNum), 1)
defer atomic.AddInt64(&(di.reqNum), -1)

inv := invocation.(*invocation_impl.RPCInvocation)
for _, k := range attachmentKey {
Expand Down Expand Up @@ -110,11 +125,21 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
// Destroy ...
func (di *DubboInvoker) Destroy() {
di.quitOnce.Do(func() {
di.BaseInvoker.Destroy()

if di.client != nil {
di.client.Close()
for {
if di.reqNum == 0 {
di.reqNum = -1
logger.Info("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key())
di.BaseInvoker.Destroy()
if di.client != nil {
di.client.Close()
di.client = nil
}
break
}
logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key())
time.Sleep(1 * time.Second)
}

})
}

Expand Down
39 changes: 28 additions & 11 deletions registry/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
}

func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
var url *common.URL
var (
url *common.URL
oldInvoker protocol.Invoker = nil
)
//judge is override or others
if res != nil {
url = &res.Service
Expand All @@ -126,10 +129,10 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
switch res.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
//dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL)
dir.cacheInvoker(url)
oldInvoker = dir.cacheInvoker(url)
case remoting.EventTypeDel:
//dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL)
dir.uncacheInvoker(url)
oldInvoker = dir.uncacheInvoker(url)
logger.Infof("selector delete service url{%s}", res.Service)
default:
return
Expand All @@ -138,8 +141,14 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {

newInvokers := dir.toGroupInvokers()
dir.listenerLock.Lock()
defer dir.listenerLock.Unlock()
dir.cacheInvokers = newInvokers
dir.listenerLock.Unlock()
// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
if oldInvoker != nil {
oldInvoker.Destroy()
}

}

func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
Expand Down Expand Up @@ -177,12 +186,18 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
return groupInvokersList
}

func (dir *registryDirectory) uncacheInvoker(url *common.URL) {
// uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key())
dir.cacheInvokersMap.Delete(url.Key())
if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok {
dir.cacheInvokersMap.Delete(url.Key())
return cacheInvoker.(protocol.Invoker)
}
return nil
}

func (dir *registryDirectory) cacheInvoker(url *common.URL) {
// cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL

Expand All @@ -193,7 +208,7 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
}
if url == nil {
logger.Error("URL is nil ,pls check if service url is subscribe successfully!")
return
return nil
}
//check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
Expand All @@ -210,10 +225,11 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
cacheInvoker.(protocol.Invoker).Destroy()
return cacheInvoker.(protocol.Invoker)
}
}
}
return nil
}

//select the protocol invokers from the directory
Expand All @@ -239,10 +255,11 @@ func (dir *registryDirectory) IsAvailable() bool {
func (dir *registryDirectory) Destroy() {
//TODO:unregister & unsubscribe
dir.BaseDirectory.Destroy(func() {
for _, ivk := range dir.cacheInvokers {
invokers := dir.cacheInvokers
dir.cacheInvokers = []protocol.Invoker{}
for _, ivk := range invokers {
ivk.Destroy()
}
dir.cacheInvokers = []protocol.Invoker{}
})
}

Expand Down

0 comments on commit 2d70b6f

Please sign in to comment.