diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 67d1d1e7f1..ef5016e22e 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -21,6 +21,8 @@ import ( "context" "strconv" "sync" + "sync/atomic" + "time" ) import ( @@ -38,7 +40,8 @@ import ( var ( // ErrNoReply ... - ErrNoReply = perrors.New("request need @response") + ErrNoReply = perrors.New("request need @response") + ErrDestroyedInvoker = perrors.New("request Destroyed invoker") ) var ( @@ -50,6 +53,8 @@ type DubboInvoker struct { protocol.BaseInvoker client *Client quitOnce sync.Once + // Used to record the number of requests. -1 represent this DubboInvoker is destroyed + reqNum int64 } // NewDubboInvoker ... @@ -57,6 +62,7 @@ func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker { return &DubboInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), client: client, + reqNum: 0, } } @@ -66,6 +72,15 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati err error result protocol.RPCResult ) + if di.reqNum < 0 { + // Generally, the case will not happen, because the invoker has been removed + // from the invoker list before destroy,so no new request will enter the destroyed invoker + logger.Warnf("this dubboInvoker is destroyed") + result.Err = ErrDestroyedInvoker + return &result + } + atomic.AddInt64(&(di.reqNum), 1) + defer atomic.AddInt64(&(di.reqNum), -1) inv := invocation.(*invocation_impl.RPCInvocation) for _, k := range attachmentKey { @@ -110,11 +125,21 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati // Destroy ... func (di *DubboInvoker) Destroy() { di.quitOnce.Do(func() { - di.BaseInvoker.Destroy() - - if di.client != nil { - di.client.Close() + for { + if di.reqNum == 0 { + di.reqNum = -1 + logger.Info("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key()) + di.BaseInvoker.Destroy() + if di.client != nil { + di.client.Close() + di.client = nil + } + break + } + logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key()) + time.Sleep(1 * time.Second) } + }) } diff --git a/registry/directory/directory.go b/registry/directory/directory.go index f9670af7ea..42d03e40be 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -109,7 +109,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) { } func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { - var url *common.URL + var ( + url *common.URL + oldInvoker protocol.Invoker = nil + ) //judge is override or others if res != nil { url = &res.Service @@ -126,10 +129,10 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { switch res.Action { case remoting.EventTypeAdd, remoting.EventTypeUpdate: //dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL) - dir.cacheInvoker(url) + oldInvoker = dir.cacheInvoker(url) case remoting.EventTypeDel: //dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL) - dir.uncacheInvoker(url) + oldInvoker = dir.uncacheInvoker(url) logger.Infof("selector delete service url{%s}", res.Service) default: return @@ -138,8 +141,14 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { newInvokers := dir.toGroupInvokers() dir.listenerLock.Lock() - defer dir.listenerLock.Unlock() dir.cacheInvokers = newInvokers + dir.listenerLock.Unlock() + // After dir.cacheInvokers is updated,destroy the oldInvoker + // Ensure that no request will enter the oldInvoker + if oldInvoker != nil { + oldInvoker.Destroy() + } + } func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { @@ -177,12 +186,18 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { return groupInvokersList } -func (dir *registryDirectory) uncacheInvoker(url *common.URL) { +// uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil +func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker { logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key()) - dir.cacheInvokersMap.Delete(url.Key()) + if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok { + dir.cacheInvokersMap.Delete(url.Key()) + return cacheInvoker.(protocol.Invoker) + } + return nil } -func (dir *registryDirectory) cacheInvoker(url *common.URL) { +// cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil +func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { dir.overrideUrl(dir.GetDirectoryUrl()) referenceUrl := dir.GetDirectoryUrl().SubURL @@ -193,7 +208,7 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) { } if url == nil { logger.Error("URL is nil ,pls check if service url is subscribe successfully!") - return + return nil } //check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" { @@ -210,10 +225,11 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) { newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl) if newInvoker != nil { dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker) - cacheInvoker.(protocol.Invoker).Destroy() + return cacheInvoker.(protocol.Invoker) } } } + return nil } //select the protocol invokers from the directory @@ -239,10 +255,11 @@ func (dir *registryDirectory) IsAvailable() bool { func (dir *registryDirectory) Destroy() { //TODO:unregister & unsubscribe dir.BaseDirectory.Destroy(func() { - for _, ivk := range dir.cacheInvokers { + invokers := dir.cacheInvokers + dir.cacheInvokers = []protocol.Invoker{} + for _, ivk := range invokers { ivk.Destroy() } - dir.cacheInvokers = []protocol.Invoker{} }) }