Skip to content

Commit

Permalink
[ISSUE apache#2124] feat:support polaris router ability (apache#2132)
Browse files Browse the repository at this point in the history
* feat:support polaris router ability

* feat:support polaris router ability

* chore:update polaris-go version to 1.3.0-alpha

* style:fix code style

* style:fix code style

* style:fix code style
  • Loading branch information
chuntaojun authored and binbin committed Dec 10, 2022
1 parent e06207f commit 8f279d9
Show file tree
Hide file tree
Showing 9 changed files with 399 additions and 8 deletions.
27 changes: 27 additions & 0 deletions cluster/router/polaris/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package polaris

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
)

func init() {
extension.SetRouterFactory(constant.PluginPolarisRouterFactory, NewPolarisRouterFactory)
}
35 changes: 35 additions & 0 deletions cluster/router/polaris/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package polaris

import (
"dubbo.apache.org/dubbo-go/v3/cluster/router"
)

// RouteFactory router factory
type RouteFactory struct{}

// NewPolarisRouterFactory constructs a new PriorityRouterFactory
func NewPolarisRouterFactory() router.PriorityRouterFactory {
return &RouteFactory{}
}

// NewPriorityRouter construct a new PriorityRouter
func (f *RouteFactory) NewPriorityRouter() (router.PriorityRouter, error) {
return newPolarisRouter()
}
314 changes: 314 additions & 0 deletions cluster/router/polaris/router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package polaris

import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
)

import (
"github.com/dubbogo/gost/log/logger"

"github.com/polarismesh/polaris-go"
"github.com/polarismesh/polaris-go/pkg/model"
v1 "github.com/polarismesh/polaris-go/pkg/model/pb/v1"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster/router"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/protocol"
remotingpolaris "dubbo.apache.org/dubbo-go/v3/remoting/polaris"
"dubbo.apache.org/dubbo-go/v3/remoting/polaris/parser"
)

var (
_ router.PriorityRouter = (*polarisRouter)(nil)
)

var (
ErrorPolarisServiceRouteRuleEmpty = errors.New("service route rule is empty")
)

func newPolarisRouter() (*polarisRouter, error) {
routerAPI, err := remotingpolaris.GetRouterAPI()
if err != nil {
return nil, err
}
consumerAPI, err := remotingpolaris.GetConsumerAPI()
if err != nil {
return nil, err
}

return &polarisRouter{
routerAPI: routerAPI,
consumerAPI: consumerAPI,
}, nil
}

type polarisRouter struct {
routerAPI polaris.RouterAPI
consumerAPI polaris.ConsumerAPI

cancel context.CancelFunc

lock sync.RWMutex
instances map[string]model.Instance
}

// Route Determine the target invokers list.
func (p *polarisRouter) Route(invokers []protocol.Invoker, url *common.URL,
invoaction protocol.Invocation) []protocol.Invoker {

if len(invokers) == 0 {
logger.Warnf("[tag router] invokers from previous router is empty")
return invokers
}

service := getService(url)
instanceMap := p.buildInstanceMap(service)
if len(instanceMap) == 0 {
return invokers
}

invokersMap := make(map[string]protocol.Invoker, len(invokers))
targetIns := make([]model.Instance, 0, len(invokers))
for i := range invokers {
invoker := invokers[i]
instanceID := invoker.GetURL().GetParam(constant.PolarisInstanceID, "")
if len(instanceID) == 0 {
continue
}
invokersMap[instanceID] = invoker
if val, ok := instanceMap[instanceID]; ok {
targetIns = append(targetIns, val)
}
}

req, err := p.buildRouteRequest(service, url, invoaction)
if err != nil {
return invokers
}
req.DstInstances = model.NewDefaultServiceInstances(model.ServiceInfo{
Service: service,
Namespace: remotingpolaris.GetNamespace(),
}, targetIns)

resp, err := p.routerAPI.ProcessRouters(&req)
if err != nil {
return invokers
}

ret := make([]protocol.Invoker, 0, len(resp.GetInstances()))
for i := range resp.GetInstances() {
if val, ok := invokersMap[resp.GetInstances()[i].GetId()]; ok {
ret = append(ret, val)
}
}

return ret
}

func getService(url *common.URL) string {
applicationMode := false
for _, item := range config.GetRootConfig().Registries {
if item.Protocol == constant.PolarisKey {
applicationMode = item.RegistryType == constant.ServiceKey
}
}

service := "providers:" + url.Service()
if applicationMode {
service = config.GetApplicationConfig().Name
}

return service
}

func (p *polarisRouter) buildRouteRequest(svc string, url *common.URL,
invocation protocol.Invocation) (polaris.ProcessRoutersRequest, error) {

routeReq := polaris.ProcessRoutersRequest{
ProcessRoutersRequest: model.ProcessRoutersRequest{
SourceService: model.ServiceInfo{
Metadata: map[string]string{},
},
},
}

attachement := invocation.Attachments()
arguments := invocation.Arguments()

labels, err := p.buildTrafficLabels(svc)
if err != nil {
return polaris.ProcessRoutersRequest{}, err
}

for i := range labels {
label := labels[i]
if strings.Compare(label, model.LabelKeyPath) == 0 {
routeReq.AddArguments(model.BuildPathArgument(getInvokeMethod(url, invocation)))
continue
}
if strings.HasPrefix(label, model.LabelKeyHeader) {
if val, ok := attachement[strings.TrimPrefix(label, model.LabelKeyHeader)]; ok {
routeReq.SourceService.Metadata[label] = fmt.Sprintf("%+v", val)
routeReq.AddArguments(model.BuildArgumentFromLabel(label, fmt.Sprintf("%+v", val)))
}
}
if strings.HasPrefix(label, model.LabelKeyQuery) {
if val := parser.ParseArgumentsByExpression(label, arguments); val != nil {
routeReq.AddArguments(model.BuildArgumentFromLabel(label, fmt.Sprintf("%+v", val)))
}
}
}

return routeReq, nil
}

func (p *polarisRouter) buildTrafficLabels(svc string) ([]string, error) {
req := &model.GetServiceRuleRequest{}
req.Namespace = remotingpolaris.GetNamespace()
req.Service = svc
req.SetTimeout(time.Second)
engine := p.routerAPI.SDKContext().GetEngine()
resp, err := engine.SyncGetServiceRule(model.EventRouting, req)
if err != nil {
logger.Errorf("[Router][Polaris] ns:%s svc:%s get route rule fail : %+v", req.GetNamespace(), req.GetService(), err)
return nil, err
}

if resp == nil || resp.GetValue() == nil {
logger.Errorf("[Router][Polaris] ns:%s svc:%s get route rule empty", req.GetNamespace(), req.GetService())
return nil, ErrorPolarisServiceRouteRuleEmpty
}

routeRule := resp.GetValue().(*v1.Routing)
labels := make([]string, 0, 4)
labels = append(labels, collectRouteLabels(routeRule.GetInbounds())...)
labels = append(labels, collectRouteLabels(routeRule.GetInbounds())...)

return labels, nil
}

func getInvokeMethod(url *common.URL, invoaction protocol.Invocation) string {
applicationMode := false
for _, item := range config.GetRootConfig().Registries {
if item.Protocol == constant.PolarisKey {
applicationMode = item.RegistryType == constant.ServiceKey
}
}

method := invoaction.MethodName()
if applicationMode {
method = url.Interface() + "/" + invoaction.MethodName()
}

return method
}

func collectRouteLabels(routings []*v1.Route) []string {
ret := make([]string, 0, 4)

for i := range routings {
route := routings[i]
sources := route.GetSources()
for p := range sources {
source := sources[p]
for k := range source.GetMetadata() {
ret = append(ret, k)
}
}
}

return ret
}

func (p *polarisRouter) buildInstanceMap(svc string) map[string]model.Instance {
resp, err := p.consumerAPI.GetAllInstances(&polaris.GetAllInstancesRequest{
GetAllInstancesRequest: model.GetAllInstancesRequest{
Service: svc,
Namespace: remotingpolaris.GetNamespace(),
},
})
if err != nil {
logger.Errorf("[Router][Polaris] ns:%s svc:%s get all instances fail : %+v", remotingpolaris.GetNamespace(), svc, err)
return nil
}

ret := make(map[string]model.Instance, len(resp.GetInstances()))

for i := range resp.GetInstances() {
ret[resp.GetInstances()[i].GetId()] = resp.GetInstances()[i]
}

return ret
}

// URL Return URL in router
func (p *polarisRouter) URL() *common.URL {
return nil
}

// Priority Return Priority in router
// 0 to ^int(0) is better
func (p *polarisRouter) Priority() int64 {
return 0
}

// Notify the router the invoker list
func (p *polarisRouter) Notify(invokers []protocol.Invoker) {
if len(invokers) == 0 {
return
}
service := getService(invokers[0].GetURL())
if service == "" {
logger.Error("url service is empty")
return
}

req := &model.GetServiceRuleRequest{}
req.Namespace = remotingpolaris.GetNamespace()
req.Service = service
req.SetTimeout(time.Second)

engine := p.routerAPI.SDKContext().GetEngine()
_, err := engine.SyncGetServiceRule(model.EventRouting, req)
if err != nil {
logger.Errorf("[Router][Polaris] ns:%s svc:%s get route rule fail : %+v", req.GetNamespace(), req.GetService(), err)
return
}

_, err = p.consumerAPI.GetAllInstances(&polaris.GetAllInstancesRequest{
GetAllInstancesRequest: model.GetAllInstancesRequest{
Service: service,
Namespace: remotingpolaris.GetNamespace(),
},
})
if err != nil {
logger.Errorf("[Router][Polaris] ns:%s svc:%s get all instances fail : %+v", req.GetNamespace(), req.GetService(), err)
return
}
}
10 changes: 9 additions & 1 deletion common/constant/polaris_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,13 @@ const (
)

const (
PluginPolarisTpsLimiter = "polaris-limit"
PolarisInstanceHealthStatus = "healthstatus"
PolarisInstanceIsolatedStatus = "isolated"
PolarisCIrcuirbreakerStatus = "circuitbreaker"
)

const (
PluginPolarisTpsLimiter = "polaris-limit"
PluginPolarisRouterFactory = "polaris-router"
PluginPolarisReportFilter = "polaris-report"
)
Loading

0 comments on commit 8f279d9

Please sign in to comment.