Skip to content

Commit

Permalink
Merge pull request #976 from LaurenceLiZhixin/fix/eventDrivenChainCache
Browse files Browse the repository at this point in the history
Fix: event driven chain cache
  • Loading branch information
cityiron authored Feb 7, 2021
2 parents 5fe10ee + 4135457 commit 02c853e
Show file tree
Hide file tree
Showing 39 changed files with 911 additions and 138 deletions.
58 changes: 38 additions & 20 deletions cluster/cluster_impl/base_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,41 +121,49 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p

func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
if len(invokers) == 0 {
logger.Errorf("the invokers of %s is nil. ", invocation.Invoker().GetUrl().ServiceKey())
return nil
}
go protocol.TryRefreshBlackList()
if len(invokers) == 1 {
return invokers[0]
if invokers[0].IsAvailable() {
return invokers[0]
}
protocol.SetInvokerUnhealthyStatus(invokers[0])
logger.Errorf("the invokers of %s is nil. ", invokers[0].GetUrl().ServiceKey())
return nil
}

selectedInvoker := lb.Select(invokers, invocation)

//judge to if the selectedInvoker is invoked

//judge if the selected Invoker is invoked and available
if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) {
protocol.SetInvokerUnhealthyStatus(selectedInvoker)
otherInvokers := getOtherInvokers(invokers, selectedInvoker)
// do reselect
var reslectInvokers []protocol.Invoker

for _, invoker := range invokers {
if !invoker.IsAvailable() {
for i := 0; i < 3; i++ {
if len(otherInvokers) == 0 {
// no other ivk to reselect, return to fallback
break
}
reselectedInvoker := lb.Select(otherInvokers, invocation)
if isInvoked(reselectedInvoker, invoked) {
otherInvokers = getOtherInvokers(otherInvokers, reselectedInvoker)
continue
}
if !reselectedInvoker.IsAvailable() {
logger.Infof("the invoker of %s is not available, maybe some network error happened or the server is shutdown.",
invoker.GetUrl().Ip)
protocol.SetInvokerUnhealthyStatus(reselectedInvoker)
otherInvokers = getOtherInvokers(otherInvokers, reselectedInvoker)
continue
}

if !isInvoked(invoker, invoked) {
reslectInvokers = append(reslectInvokers, invoker)
}
}

if len(reslectInvokers) > 0 {
selectedInvoker = lb.Select(reslectInvokers, invocation)
} else {
logger.Errorf("all %d invokers is unavailable for %s.", len(invokers), selectedInvoker.GetUrl().String())
return nil
return reselectedInvoker
}
} else {
return selectedInvoker
}
return selectedInvoker
logger.Errorf("all %d invokers is unavailable for %s.", len(invokers), selectedInvoker.GetUrl().String())
return nil
}

func (invoker *baseClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
Expand Down Expand Up @@ -194,3 +202,13 @@ func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cl
}
return extension.GetLoadbalance(lb)
}

func getOtherInvokers(invokers []protocol.Invoker, invoker protocol.Invoker) []protocol.Invoker {
otherInvokers := make([]protocol.Invoker, 0)
for _, i := range invokers {
if i != invoker {
otherInvokers = append(otherInvokers, i)
}
}
return otherInvokers
}
7 changes: 7 additions & 0 deletions cluster/cluster_impl/failback_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func TestFailbackSuceess(t *testing.T) {

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()

invoker.EXPECT().IsAvailable().Return(true)

mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)

Expand All @@ -88,6 +90,7 @@ func TestFailbackRetryOneSuccess(t *testing.T) {
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
invoker.EXPECT().IsAvailable().Return(true)

// failed at first
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
Expand All @@ -98,6 +101,7 @@ func TestFailbackRetryOneSuccess(t *testing.T) {
wg.Add(1)
now := time.Now()
mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().IsAvailable().Return(true)
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= 5)
Expand Down Expand Up @@ -131,6 +135,7 @@ func TestFailbackRetryFailed(t *testing.T) {
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()

mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult)
Expand Down Expand Up @@ -177,6 +182,7 @@ func TestFailbackRetryFailed10Times(t *testing.T) {
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
clusterInvoker.maxRetries = 10

invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()

// 10 task should failed firstly.
Expand Down Expand Up @@ -220,6 +226,7 @@ func TestFailbackOutOfLimit(t *testing.T) {
clusterInvoker.failbackTasks = 1

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()

mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).Times(11)
Expand Down
3 changes: 3 additions & 0 deletions cluster/cluster_impl/failfast_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func registerFailfast(invoker *mock.MockInvoker) protocol.Invoker {
invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)

invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failfastUrl)

staticDir := directory.NewStaticDirectory(invokers)
Expand All @@ -67,6 +68,7 @@ func TestFailfastInvokeSuccess(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(invoker)

invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()

mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
Expand All @@ -87,6 +89,7 @@ func TestFailfastInvokeFail(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(invoker)

invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()

mockResult := &protocol.RPCResult{Err: perrors.New("error")}
Expand Down
4 changes: 4 additions & 0 deletions cluster/cluster_impl/failsafe_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func registerFailsafe(invoker *mock.MockInvoker) protocol.Invoker {

invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()

invoker.EXPECT().GetUrl().Return(failbackUrl)

Expand All @@ -67,6 +68,8 @@ func TestFailSafeInvokeSuccess(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailsafe(invoker)

invoker.EXPECT().IsAvailable().Return(true).AnyTimes()

invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()

mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
Expand All @@ -85,6 +88,7 @@ func TestFailSafeInvokeFail(t *testing.T) {

invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailsafe(invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()

invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()

Expand Down
8 changes: 4 additions & 4 deletions cluster/directory/base_directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {

routers := make([]router.PriorityRouter, 0, len(urls))

rc := dir.routerChain

for _, url := range urls {
routerKey := url.GetParam(constant.ROUTER_KEY, "")

Expand All @@ -94,7 +96,7 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
}
}
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewPriorityRouter(url)
r, err := factory.NewPriorityRouter(url, rc.GetNotifyChan())
if err != nil {
logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err)
return
Expand All @@ -104,10 +106,8 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {

logger.Infof("Init file condition router success, size: %v", len(routers))
dir.mutex.Lock()
rc := dir.routerChain
dir.mutex.Unlock()

rc.AddRouters(routers)
dir.mutex.Unlock()
}

func (dir *BaseDirectory) isProperRouter(url *common.URL) bool {
Expand Down
5 changes: 4 additions & 1 deletion cluster/directory/base_directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
)

import (
"github.com/apache/dubbo-go/cluster/router/chain"
_ "github.com/apache/dubbo-go/cluster/router/condition"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
Expand All @@ -50,7 +51,9 @@ func TestBuildRouterChain(t *testing.T) {
regURL := url
regURL.AddParam(constant.INTERFACE_KEY, "mock-app")
directory := NewBaseDirectory(regURL)

var err error
directory.routerChain, err = chain.NewRouterChain(regURL)
assert.Nil(t, err)
localIP := common.GetLocalIp()
rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
routeURL := getRouteURL(rule, anyURL)
Expand Down
Empty file removed cluster/router/.gitkeep
Empty file.
2 changes: 2 additions & 0 deletions cluster/router/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ type Chain interface {
SetInvokers([]protocol.Invoker)
// AddRouters Add routers
AddRouters([]PriorityRouter)
// GetNotifyChan get notify channel of this chain
GetNotifyChan() chan struct{}
}
60 changes: 28 additions & 32 deletions cluster/router/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package chain
import (
"sort"
"sync"
"sync/atomic"
"time"
)

import (
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)

import (
Expand All @@ -38,9 +38,7 @@ import (
)

const (
timeInterval = 5 * time.Second
timeThreshold = 2 * time.Second
countThreshold = 5
timeInterval = 5 * time.Second
)

// RouterChain Router chain
Expand All @@ -65,8 +63,10 @@ type RouterChain struct {
notify chan struct{}
// Address cache
cache atomic.Value
// init
init sync.Once
}

func (c *RouterChain) GetNotifyChan() chan struct{} {
return c.notify
}

// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
Expand Down Expand Up @@ -104,6 +104,9 @@ func (c *RouterChain) AddRouters(routers []router.PriorityRouter) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.routers = newRouters
go func() {
c.notify <- struct{}{}
}()
}

// SetInvokers receives updated invokers from registry center. If the times of notification exceeds countThreshold and
Expand All @@ -113,32 +116,21 @@ func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
c.invokers = invokers
c.mutex.Unlock()

// it should trigger init router for first call
c.init.Do(func() {
go func() {
c.notify <- struct{}{}
}()
})

c.count++
now := time.Now()
if c.count >= countThreshold && now.Sub(c.last) >= timeThreshold {
c.last = now
c.count = 0
go func() {
c.notify <- struct{}{}
}()
}
go func() {
c.notify <- struct{}{}
}()
}

// loop listens on events to update the address cache when it's necessary, either when it receives notification
// from address update, or when timeInterval exceeds.
// loop listens on events to update the address cache when it receives notification
// from address update,
func (c *RouterChain) loop() {
ticker := time.NewTicker(timeInterval)
for {
select {
case <-ticker.C:
c.buildCache()
if protocol.GetAndRefreshState() {
c.buildCache()
}
case <-c.notify:
c.buildCache()
}
Expand Down Expand Up @@ -235,9 +227,15 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
if len(routerFactories) == 0 {
return nil, perrors.Errorf("No routerFactory exits , create one please")
}

chain := &RouterChain{
last: time.Now(),
notify: make(chan struct{}),
}

routers := make([]router.PriorityRouter, 0, len(routerFactories))
for key, routerFactory := range routerFactories {
r, err := routerFactory().NewPriorityRouter(url)
r, err := routerFactory().NewPriorityRouter(url, chain.notify)
if r == nil || err != nil {
logger.Errorf("router chain build router fail! routerFactories key:%s error:%s", key, err.Error())
continue
Expand All @@ -250,12 +248,10 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {

sortRouter(newRouters)

chain := &RouterChain{
builtinRouters: routers,
routers: newRouters,
last: time.Now(),
notify: make(chan struct{}),
}
routerNeedsUpdateInit := atomic.Bool{}
routerNeedsUpdateInit.Store(false)
chain.routers = newRouters
chain.builtinRouters = routers
if url != nil {
chain.url = url
}
Expand Down
7 changes: 6 additions & 1 deletion cluster/router/chain/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,12 @@ conditions:
url := getConditionRouteUrl(applicationKey)
assert.NotNil(t, url)
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewPriorityRouter(url)
notify := make(chan struct{})
go func() {
for range notify {
}
}()
r, err := factory.NewPriorityRouter(url, notify)
assert.Nil(t, err)
assert.NotNil(t, r)

Expand Down
5 changes: 3 additions & 2 deletions cluster/router/condition/app_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ const (
// AppRouter For listen application router with config center
type AppRouter struct {
listenableRouter
notify interface{}
}

// NewAppRouter Init AppRouter by url
func NewAppRouter(url *common.URL) (*AppRouter, error) {
func NewAppRouter(url *common.URL, notify chan struct{}) (*AppRouter, error) {
if url == nil {
return nil, perrors.Errorf("No route URL for create app router!")
}
appRouter, err := newListenableRouter(url, url.GetParam(constant.APPLICATION_KEY, ""))
appRouter, err := newListenableRouter(url, url.GetParam(constant.APPLICATION_KEY, ""), notify)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 02c853e

Please sign in to comment.