Skip to content

Commit

Permalink
Merge branch '3.0' of https://github.com/apache/dubbo-go into 3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexStocks committed Apr 4, 2021
2 parents 6610eae + d1801c1 commit e53721e
Show file tree
Hide file tree
Showing 96 changed files with 5,902 additions and 6,076 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/github-actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
# If you want to matrix build , you can append the following list.
matrix:
go_version:
- 1.13
- 1.15
os:
- ubuntu-latest

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ If you are willing to do some code contributions and document contributions to [

## Community

If u want to communicate with our community, pls scan the following dubbobo Ding-Ding QR code or search our commnity DingDing group code 31363295.
If u want to communicate with our community, pls scan the following dubbobo DingDing QR code or search our commnity DingDing group code 31363295.

<div>
<table>
Expand Down
38 changes: 0 additions & 38 deletions cluster/directory/base_directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
"github.com/apache/dubbo-go/cluster/router/chain"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
)

// BaseDirectory Abstract implementation of Directory: Invoker list returned from this Directory's list method have been filtered by Routers
Expand Down Expand Up @@ -74,42 +72,6 @@ func (dir *BaseDirectory) GetDirectoryUrl() *common.URL {
return dir.url
}

// SetRouters Convert url to routers and add them into dir.routerChain
func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
if len(urls) == 0 {
return
}

routers := make([]router.PriorityRouter, 0, len(urls))

rc := dir.routerChain

for _, url := range urls {
routerKey := url.GetParam(constant.ROUTER_KEY, "")

if len(routerKey) == 0 {
continue
}
if url.Protocol == constant.CONDITION_ROUTE_PROTOCOL {
if !dir.isProperRouter(url) {
continue
}
}
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewPriorityRouter(url, rc.GetNotifyChan())
if err != nil {
logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err)
return
}
routers = append(routers, r)
}

logger.Infof("Init file condition router success, size: %v", len(routers))
dir.mutex.Lock()
rc.AddRouters(routers)
dir.mutex.Unlock()
}

func (dir *BaseDirectory) isProperRouter(url *common.URL) bool {
app := url.GetParam(constant.APPLICATION_KEY, "")
dirApp := dir.GetUrl().GetParam(constant.APPLICATION_KEY, "")
Expand Down
13 changes: 1 addition & 12 deletions cluster/directory/base_directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

import (
"github.com/apache/dubbo-go/cluster/router/chain"
_ "github.com/apache/dubbo-go/cluster/router/condition"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
)
Expand All @@ -52,17 +51,7 @@ func TestBuildRouterChain(t *testing.T) {
directory := NewBaseDirectory(regURL)
var err error
directory.routerChain, err = chain.NewRouterChain(regURL)
assert.Nil(t, err)
localIP := common.GetLocalIp()
rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
routeURL := getRouteURL(rule, anyURL)
routeURL.AddParam(constant.INTERFACE_KEY, "mock-app")
routerURLs := make([]*common.URL, 0)
routerURLs = append(routerURLs, routeURL)
directory.SetRouters(routerURLs)
chain := directory.RouterChain()

assert.NotNil(t, chain)
assert.Error(t, err)
}

func getRouteURL(rule string, u *common.URL) *common.URL {
Expand Down
135 changes: 14 additions & 121 deletions cluster/router/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ import (
"github.com/apache/dubbo-go/protocol"
)

const (
timeInterval = 5 * time.Second
var (
virtualServiceConfigByte []byte
destinationRuleConfigByte []byte
)

// RouterChain Router chain
Expand Down Expand Up @@ -71,24 +72,10 @@ func (c *RouterChain) GetNotifyChan() chan struct{} {

// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
func (c *RouterChain) Route(url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
cache := c.loadCache()
if cache == nil {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.invokers
}

bitmap := cache.bitmap
finalInvokers := c.invokers
for _, r := range c.copyRouters() {
bitmap = r.Route(bitmap, cache, url, invocation)
}

indexes := bitmap.ToArray()
finalInvokers := make([]protocol.Invoker, len(indexes))
for i, index := range indexes {
finalInvokers[i] = cache.invokers[index]
finalInvokers = r.Route(c.invokers, url, invocation)
}

return finalInvokers
}

Expand Down Expand Up @@ -121,22 +108,6 @@ func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
}()
}

// loop listens on events to update the address cache when it receives notification
// from address update,
func (c *RouterChain) loop() {
ticker := time.NewTicker(timeInterval)
for {
select {
case <-ticker.C:
if protocol.GetAndRefreshState() {
c.buildCache()
}
case <-c.notify:
c.buildCache()
}
}
}

// copyRouters make a snapshot copy from RouterChain's router list.
func (c *RouterChain) copyRouters() []router.PriorityRouter {
c.mutex.RLock()
Expand All @@ -158,66 +129,9 @@ func (c *RouterChain) copyInvokers() []protocol.Invoker {
return ret
}

// loadCache loads cache from sync.Value to guarantee the visibility
func (c *RouterChain) loadCache() *InvokerCache {
v := c.cache.Load()
if v == nil {
return nil
}

return v.(*InvokerCache)
}

// copyInvokerIfNecessary compares chain's invokers copy and cache's invokers copy, to avoid copy as much as possible
func (c *RouterChain) copyInvokerIfNecessary(cache *InvokerCache) []protocol.Invoker {
var invokers []protocol.Invoker
if cache != nil {
invokers = cache.invokers
}

c.mutex.RLock()
defer c.mutex.RUnlock()
if isInvokersChanged(invokers, c.invokers) {
invokers = c.copyInvokers()
}
return invokers
}

// buildCache builds address cache with the new invokers for all poolable routers.
func (c *RouterChain) buildCache() {
origin := c.loadCache()
invokers := c.copyInvokerIfNecessary(origin)
if len(invokers) == 0 {
return
}

var (
mutex sync.Mutex
wg sync.WaitGroup
)

cache := BuildCache(invokers)
for _, r := range c.copyRouters() {
if p, ok := r.(router.Poolable); ok {
wg.Add(1)
go func(p router.Poolable) {
defer wg.Done()
pool, info := poolRouter(p, origin, invokers)
mutex.Lock()
defer mutex.Unlock()
cache.pools[p.Name()] = pool
cache.metadatas[p.Name()] = info
}(p)
}
}
wg.Wait()

c.cache.Store(cache)
}

// URL Return URL in RouterChain
func (c *RouterChain) URL() *common.URL {
return c.url
func SetVSAndDRConfigByte(vs, dr []byte) {
virtualServiceConfigByte = vs
destinationRuleConfigByte = dr
}

// NewRouterChain Use url to init router chain
Expand All @@ -234,10 +148,14 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
}

routers := make([]router.PriorityRouter, 0, len(routerFactories))

for key, routerFactory := range routerFactories {
r, err := routerFactory().NewPriorityRouter(url, chain.notify)
if virtualServiceConfigByte == nil || destinationRuleConfigByte == nil {
logger.Warnf("virtual Service Config or destinationRule Confi Byte may be empty, pls check your CONF_VIRTUAL_SERVICE_FILE_PATH and CONF_DEST_RULE_FILE_PATH env is correctly point to your yaml file\n")
}
r, err := routerFactory().NewPriorityRouter(virtualServiceConfigByte, destinationRuleConfigByte, chain.notify)
if r == nil || err != nil {
logger.Errorf("router chain build router fail! routerFactories key:%s error:%s", key, err.Error())
logger.Errorf("router chain build router fail! routerFactories key:%s error:%vv", key, err)
continue
}
routers = append(routers, r)
Expand All @@ -256,34 +174,9 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
chain.url = url
}

go chain.loop()
return chain, nil
}

// poolRouter calls poolable router's Pool() to create new address pool and address metadata if necessary.
// If the corresponding cache entry exists, and the poolable router answers no need to re-pool (possibly because its
// rule doesn't change), and the address list doesn't change, then the existing data will be re-used.
func poolRouter(p router.Poolable, origin *InvokerCache, invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
name := p.Name()
if isCacheMiss(origin, name) || p.ShouldPool() || &(origin.invokers) != &invokers {
logger.Debugf("build address cache for router %q", name)
return p.Pool(invokers)
}

logger.Debugf("reuse existing address cache for router %q", name)
return origin.pools[name], origin.metadatas[name]
}

// isCacheMiss checks if the corresponding cache entry for a poolable router has already existed.
// False returns when the cache is nil, or cache's pool is nil, or cache's invokers snapshot is nil, or the entry
// doesn't exist.
func isCacheMiss(cache *InvokerCache, key string) bool {
if cache == nil || cache.pools == nil || cache.invokers == nil || cache.pools[key] == nil {
return true
}
return false
}

// isInvokersChanged compares new invokers on the right changes, compared with the old invokers on the left.
func isInvokersChanged(left []protocol.Invoker, right []protocol.Invoker) bool {
if len(right) != len(left) {
Expand Down
Loading

0 comments on commit e53721e

Please sign in to comment.