Skip to content

Commit

Permalink
1. add zk mesh route dynamic configuration listener
Browse files Browse the repository at this point in the history
2. add base64Enable flag to zookeeperDynamicConfiguration
  • Loading branch information
dongjianhui03 committed Oct 17, 2021
1 parent f16ab55 commit 902c085
Show file tree
Hide file tree
Showing 18 changed files with 209 additions and 198 deletions.
17 changes: 2 additions & 15 deletions cluster/router/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)

var (
virtualServiceConfigByte []byte
destinationRuleConfigByte []byte
)

// RouterChain Router chain
type RouterChain struct {
// Full list of addresses from registry, classified by method name.
Expand Down Expand Up @@ -106,11 +101,6 @@ func (c *RouterChain) copyInvokers() []protocol.Invoker {
return ret
}

func SetVSAndDRConfigByte(vs, dr []byte) {
virtualServiceConfigByte = vs
destinationRuleConfigByte = dr
}

// NewRouterChain init router chain
// Loop routerFactories and call NewRouter method
func NewRouterChain() (*RouterChain, error) {
Expand All @@ -122,12 +112,9 @@ func NewRouterChain() (*RouterChain, error) {
routers := make([]router.PriorityRouter, 0, len(routerFactories))

for key, routerFactory := range routerFactories {
if virtualServiceConfigByte == nil || destinationRuleConfigByte == nil {
logger.Warnf("virtual Service ProtocolConfig or destinationRule Confi Byte may be empty, pls check your CONF_VIRTUAL_SERVICE_FILE_PATH and CONF_DEST_RULE_FILE_PATH env is correctly point to your yaml file\n")
}
r, err := routerFactory().NewPriorityRouter(virtualServiceConfigByte, destinationRuleConfigByte)
r, err := routerFactory().NewPriorityRouter()
if r == nil || err != nil {
logger.Errorf("router chain build router fail! routerFactories key:%s error:%vv", key, err)
logger.Errorf("router chain build router fail! routerFactories key:%s error:%v", key, err)
continue
}
routers = append(routers, r)
Expand Down
10 changes: 3 additions & 7 deletions cluster/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,17 @@ import (
// PriorityRouterFactory creates creates priority router with url
type PriorityRouterFactory interface {
// NewPriorityRouter creates router instance with URL
NewPriorityRouter([]byte, []byte) (PriorityRouter, error)
NewPriorityRouter() (PriorityRouter, error)
}

// Router
type router interface {
// PriorityRouter routes with priority
type PriorityRouter interface {
// Route Determine the target invokers list.
Route([]protocol.Invoker, *common.URL, protocol.Invocation) []protocol.Invoker

// URL Return URL in router
URL() *common.URL
}

// PriorityRouter routes with priority
type PriorityRouter interface {
router
// Priority Return Priority in router
// 0 to ^int(0) is better
Priority() int64
Expand Down
9 changes: 7 additions & 2 deletions cluster/router/v3router/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ package v3router

import (
"dubbo.apache.org/dubbo-go/v3/cluster/router"
"dubbo.apache.org/dubbo-go/v3/common/extension"
)

func init() {
extension.SetRouterFactory("mesh", NewUniformRouterFactory)
}

// UniformRouteFactory is uniform router's factory
type UniformRouteFactory struct{}

Expand All @@ -30,6 +35,6 @@ func NewUniformRouterFactory() router.PriorityRouterFactory {
}

// NewPriorityRouter construct a new UniformRouteFactory as PriorityRouter
func (f *UniformRouteFactory) NewPriorityRouter(vsConfigBytes, distConfigBytes []byte) (router.PriorityRouter, error) {
return NewUniformRouterChain(vsConfigBytes, distConfigBytes)
func (f *UniformRouteFactory) NewPriorityRouter() (router.PriorityRouter, error) {
return NewUniformRouterChain()
}
6 changes: 3 additions & 3 deletions cluster/router/v3router/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
"github.com/stretchr/testify/assert"
)

// TestUniformRouterFacotry created a new factory that can new uniform router
func TestUniformRouterFacotry(t *testing.T) {
// TestUniformRouterFactory created a new factory that can new uniform router
func TestUniformRouterFactory(t *testing.T) {
factory := NewUniformRouterFactory()
assert.NotNil(t, factory)
router, err := factory.NewPriorityRouter([]byte{}, []byte{})
router, err := factory.NewPriorityRouter()
assert.Nil(t, err)
assert.NotNil(t, router)
}
2 changes: 1 addition & 1 deletion cluster/router/v3router/k8s_crd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func NewK8sCRDClient(groupName, groupVersion, namespace string, handlers ...List
return newClient, nil
}

// func (c *Client) WatchResources() []cache.Store { can only be called once
// WatchResources can only be called once
func (c *Client) WatchResources() []cache.Store {
stores := make([]cache.Store, 0)
c.once.Do(
Expand Down
192 changes: 95 additions & 97 deletions cluster/router/v3router/router_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package v3router

import (
"encoding/json"
"io"
"strings"
)
Expand All @@ -29,43 +28,47 @@ import (

import (
"dubbo.apache.org/dubbo-go/v3/cluster/router"
"dubbo.apache.org/dubbo-go/v3/cluster/router/v3router/k8s_api"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/remoting"
)

// RouterChain contains all uniform router logic
// it has UniformRouter list,
type RouterChain struct {
routers []*UniformRouter
virtualServiceConfigBytes []byte
destinationRuleConfigBytes []byte
notify chan struct{}
routers []*UniformRouter
notify chan struct{}
}

// NewUniformRouterChain return
func NewUniformRouterChain(virtualServiceConfig, destinationRuleConfig []byte) (router.PriorityRouter, error) {
fromFileConfig := true
uniformRouters, err := parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig)
// nolint
func NewUniformRouterChain() (router.PriorityRouter, error) {
// 1. add mesh route listener
r := &RouterChain{}
rootConfig := config.GetRootConfig()
dynamicConfiguration, err := rootConfig.ConfigCenter.GetDynamicConfiguration()
if err != nil {
fromFileConfig = false
logger.Warnf("parse router config form local file failed, error = %+v", err)
return nil, err
}
r := &RouterChain{
virtualServiceConfigBytes: virtualServiceConfig,
destinationRuleConfigBytes: destinationRuleConfig,
routers: uniformRouters,
dynamicConfiguration.AddListener(rootConfig.Application.Name, r)

// 2. try to get mesh route configuration, default key is "dubbo.io.MESHAPPRULE" with group "dubbo"
key := rootConfig.Application.Name + constant.MeshRouteSuffix
meshRouteValue, err := dynamicConfiguration.GetProperties(key, config_center.WithGroup(rootConfig.ConfigCenter.Group))
if err != nil {
// the mesh route may not be initialized now
logger.Warnf("Can not get mesh route for key=%s, error=%v", key, err)
return r, nil
}
if err := k8s_api.SetK8sEventListener(r); err != nil {
logger.Warnf("try listen K8s router config failed, error = %+v", err)
if !fromFileConfig {
panic("No config file from both local file and k8s")
}
logger.Debugf("Successfully get mesh route:%s", meshRouteValue)
routes, err := parseRoute(meshRouteValue)
if err != nil {
logger.Warnf("Parse mesh route failed, error=%v", err)
return nil, err
}
r.routers = routes
return r, nil
}

Expand All @@ -77,84 +80,15 @@ func (r *RouterChain) Route(invokers []protocol.Invoker, url *common.URL, invoca
return invokers
}

// Process process route config change event
func (r *RouterChain) Process(event *config_center.ConfigChangeEvent) {
logger.Debugf("on processed event = %+v\n", *event)
if event.ConfigType == remoting.EventTypeAdd || event.ConfigType == remoting.EventTypeUpdate {
switch event.Key {
case k8s_api.VirtualServiceEventKey:
logger.Debug("virtual service event")
newVSValue, ok := event.Value.(*config.VirtualServiceConfig)
if !ok {
logger.Error("event.Value assertion error")
return
}

newVSJsonValue, ok := newVSValue.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"]
if !ok {
logger.Error("newVSValue.ObjectMeta.Annotations has no key named kubectl.kubernetes.io/last-applied-configuration")
return
}
logger.Debugf("new virtual service json value = \n%v\n", newVSJsonValue)
newVirtualServiceConfig := &config.VirtualServiceConfig{}
if err := json.Unmarshal([]byte(newVSJsonValue), newVirtualServiceConfig); err != nil {
logger.Error("on process json data unmarshal error = ", err)
return
}
newVirtualServiceConfig.YamlAPIVersion = newVirtualServiceConfig.APIVersion
newVirtualServiceConfig.YamlKind = newVirtualServiceConfig.Kind
newVirtualServiceConfig.MetaData.Name = newVirtualServiceConfig.ObjectMeta.Name
logger.Debugf("get event after assertion = %+v\n", newVirtualServiceConfig)
data, err := yaml.Marshal(newVirtualServiceConfig)
if err != nil {
logger.Error("Process change of virtual service: event.Value marshal error:", err)
return
}
r.routers, err = parseFromConfigToRouters(data, r.destinationRuleConfigBytes)
if err != nil {
logger.Error("Process change of virtual service: parseFromConfigToRouters:", err)
return
}
case k8s_api.DestinationRuleEventKey:
logger.Debug("handling dest rule event")
newDRValue, ok := event.Value.(*config.DestinationRuleConfig)
if !ok {
logger.Error("event.Value assertion error")
return
}

newDRJsonValue, ok := newDRValue.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"]
if !ok {
logger.Error("newVSValue.ObjectMeta.Annotations has no key named kubectl.kubernetes.io/last-applied-configuration")
return
}
newDestRuleConfig := &config.DestinationRuleConfig{}
if err := json.Unmarshal([]byte(newDRJsonValue), newDestRuleConfig); err != nil {
logger.Error("on process json data unmarshal error = ", err)
return
}
newDestRuleConfig.YamlAPIVersion = newDestRuleConfig.APIVersion
newDestRuleConfig.YamlKind = newDestRuleConfig.Kind
newDestRuleConfig.MetaData.Name = newDestRuleConfig.ObjectMeta.Name
logger.Debugf("get event after asseration = %+v\n", newDestRuleConfig)
data, err := yaml.Marshal(newDestRuleConfig)
if err != nil {
logger.Error("Process change of dest rule: event.Value marshal error:", err)
return
}
r.routers, err = parseFromConfigToRouters(r.virtualServiceConfigBytes, data)
if err != nil {
logger.Error("Process change of dest rule: parseFromConfigToRouters:", err)
return
}
default:
logger.Error("unknown unsupported event key:", event.Key)
}
logger.Debugf("RouteChain process event:\n%+v", event)
routers, err := parseRoute(event.Value.(string))
if err != nil {
return
}

r.routers = routers
// todo delete router
//if event.ConfigType == remoting.EventTypeDel {
//
//}
}

// Name get name of ConnCheckerRouter
Expand Down Expand Up @@ -236,6 +170,70 @@ func parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig []byte
return routers, nil
}

func parseRoute(routeContent string) ([]*UniformRouter, error) {
var virtualServiceConfigList []*config.VirtualServiceConfig
destRuleConfigsMap := make(map[string]map[string]map[string]string)

meshRouteDecoder := yaml.NewDecoder(strings.NewReader(routeContent))
for {
meshRouteMetadata := &config.MeshRouteMetadata{}
err := meshRouteDecoder.Decode(meshRouteMetadata)
if err == io.EOF {
break
} else if err != nil {
logger.Error("parseRoute route metadata err = ", err)
return nil, err
}

bytes, err := yaml.Marshal(meshRouteMetadata.Spec)
if err != nil {
return nil, err
}
specDecoder := yaml.NewDecoder(strings.NewReader(string(bytes)))
switch meshRouteMetadata.YamlKind {
case "VirtualService":
meshRouteConfigSpec := &config.UniformRouterConfigSpec{}
err := specDecoder.Decode(meshRouteConfigSpec)
if err != nil {
return nil, err
}
virtualServiceConfigList = append(virtualServiceConfigList, &config.VirtualServiceConfig{
YamlAPIVersion: meshRouteMetadata.YamlAPIVersion,
YamlKind: meshRouteMetadata.YamlKind,
TypeMeta: meshRouteMetadata.TypeMeta,
ObjectMeta: meshRouteMetadata.ObjectMeta,
MetaData: meshRouteMetadata.MetaData,
Spec: *meshRouteConfigSpec,
})
case "DestinationRule":
meshRouteDestinationRuleSpec := &config.DestinationRuleSpec{}
err := specDecoder.Decode(meshRouteDestinationRuleSpec)
if err != nil {
return nil, err
}
destRuleCfgMap := make(map[string]map[string]string)
for _, v := range meshRouteDestinationRuleSpec.SubSets {
destRuleCfgMap[v.Name] = v.Labels
}

destRuleConfigsMap[meshRouteDestinationRuleSpec.Host] = destRuleCfgMap
}
}

routers := make([]*UniformRouter, 0)

for _, v := range virtualServiceConfigList {
tempServiceNeedsDescMap := make(map[string]map[string]string)
for _, host := range v.Spec.Hosts {
targetDestMap := destRuleConfigsMap[host]
mapCopy(tempServiceNeedsDescMap, targetDestMap)
}
routers = append(routers, NewUniformRouter(v.Spec.Dubbo, tempServiceNeedsDescMap))
}
logger.Debug("parsed successfully with router size = ", len(routers))
return routers, nil
}

func mapCopy(dist map[string]map[string]string, source map[string]map[string]string) {
for k, v := range source {
dist[k] = v
Expand Down
Loading

0 comments on commit 902c085

Please sign in to comment.