diff --git a/common/constant/key.go b/common/constant/key.go index d7fb47884c..7adafa75bb 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -264,18 +264,6 @@ const ( NacosUpdateCacheWhenEmpty = "nacos.updateCacheWhenEmpty" ) -const ( - PolarisKey = "polaris" - PolarisDefaultRoleType = 3 - PolarisServiceToken = "token" - PolarisServiceNameSeparator = ":" - PolarisDubboPath = "DUBBOPATH" - PolarisInstanceID = "polaris.instanceID" - PolarisDefaultNamespace = "default" - PolarisDubboGroup = "dubbo.group" - PolarisClientName = "polaris-client" -) - const ( FileKey = "file" ) diff --git a/common/constant/polaris_key.go b/common/constant/polaris_key.go new file mode 100644 index 0000000000..97d0ab552e --- /dev/null +++ b/common/constant/polaris_key.go @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package constant + +const ( + PolarisKey = "polaris" + PolarisDefaultRoleType = 3 + PolarisServiceToken = "token" + PolarisServiceNameSeparator = ":" + PolarisDubboPath = "DUBBOPATH" + PolarisInstanceID = "polaris.instanceID" + PolarisDefaultNamespace = "default" + PolarisDubboGroup = "dubbo.group" + PolarisClientName = "polaris-client" +) + +const ( + PluginPolarisTpsLimiter = "polaris-limit" +) diff --git a/filter/filter_impl/import.go b/filter/filter_impl/import.go index ca38303ca2..2e389b90aa 100644 --- a/filter/filter_impl/import.go +++ b/filter/filter_impl/import.go @@ -30,6 +30,7 @@ import ( _ "dubbo.apache.org/dubbo-go/v3/filter/graceful_shutdown" _ "dubbo.apache.org/dubbo-go/v3/filter/hystrix" _ "dubbo.apache.org/dubbo-go/v3/filter/metrics" + _ "dubbo.apache.org/dubbo-go/v3/filter/polaris/limit" _ "dubbo.apache.org/dubbo-go/v3/filter/seata" _ "dubbo.apache.org/dubbo-go/v3/filter/sentinel" _ "dubbo.apache.org/dubbo-go/v3/filter/token" diff --git a/filter/polaris/limit/default.go b/filter/polaris/limit/default.go new file mode 100644 index 0000000000..5f40295c4a --- /dev/null +++ b/filter/polaris/limit/default.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package limit + +import ( + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/filter" +) + +func init() { + extension.SetTpsLimiter(constant.PluginPolarisTpsLimiter, func() filter.TpsLimiter { + return &polarisTpsLimiter{} + }) +} diff --git a/filter/polaris/limit/limiter.go b/filter/polaris/limit/limiter.go new file mode 100644 index 0000000000..64ecb6ef6b --- /dev/null +++ b/filter/polaris/limit/limiter.go @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package limit + +import ( + "fmt" + "time" +) + +import ( + "github.com/dubbogo/gost/log/logger" + + "github.com/polarismesh/polaris-go" + "github.com/polarismesh/polaris-go/pkg/flow/data" + "github.com/polarismesh/polaris-go/pkg/model" + v1 "github.com/polarismesh/polaris-go/pkg/model/pb/v1" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/config" + "dubbo.apache.org/dubbo-go/v3/protocol" + remotingpolaris "dubbo.apache.org/dubbo-go/v3/remoting/polaris" + "dubbo.apache.org/dubbo-go/v3/remoting/polaris/parser" +) + +type polarisTpsLimiter struct { + limitApi polaris.LimitAPI +} + +func (pl *polarisTpsLimiter) IsAllowable(url *common.URL, invocation protocol.Invocation) bool { + var err error + + pl.limitApi, err = remotingpolaris.GetLimiterAPI() + if err != nil { + logger.Error("[TpsLimiter][Polaris] create polaris LimitAPI fail : %+v", err) + return true + } + + req := pl.buildQuotaRequest(url, invocation) + if req == nil { + return true + } + logger.Debugf("[TpsLimiter][Polaris] quota req : %+v", req) + + resp, err := pl.limitApi.GetQuota(req) + if err != nil { + logger.Error("[TpsLimiter][Polaris] ns:%s svc:%s get quota fail : %+v", remotingpolaris.GetNamespace(), url.Service(), err) + return true + } + + return resp.Get().Code == model.QuotaResultOk +} + +func (pl *polarisTpsLimiter) buildQuotaRequest(url *common.URL, invoaction protocol.Invocation) polaris.QuotaRequest { + ns := remotingpolaris.GetNamespace() + applicationMode := false + for _, item := range config.GetRootConfig().Registries { + if item.Protocol == constant.PolarisKey { + applicationMode = item.RegistryType == constant.ServiceKey + } + } + + svc := "providers:" + url.Service() + method := invoaction.MethodName() + if applicationMode { + svc = config.GetApplicationConfig().Name + method = url.Interface() + "/" + invoaction.MethodName() + } + + req := polaris.NewQuotaRequest() + req.SetNamespace(ns) + req.SetService(svc) + req.SetMethod(method) + + matchs, ok := pl.buildArguments(req.(*model.QuotaRequestImpl)) + if !ok { + return nil + } + + attachement := invoaction.Attachments() + arguments := invoaction.Arguments() + + for i := range matchs { + item := matchs[i] + switch item.GetType() { + case v1.MatchArgument_HEADER: + if val, ok := attachement[item.GetKey()]; ok { + req.AddArgument(model.BuildHeaderArgument(item.GetKey(), fmt.Sprintf("%+v", val))) + } + case v1.MatchArgument_QUERY: + if val := parser.ParseArgumentsByExpression(item.GetKey(), arguments); val != nil { + req.AddArgument(model.BuildQueryArgument(item.GetKey(), fmt.Sprintf("%+v", val))) + } + case v1.MatchArgument_CALLER_IP: + callerIp := url.GetParam(constant.RemoteAddr, "") + if len(callerIp) != 0 { + req.AddArgument(model.BuildCallerIPArgument(callerIp)) + } + case model.ArgumentTypeCallerService: + } + } + + return req +} + +func (pl *polarisTpsLimiter) buildArguments(req *model.QuotaRequestImpl) ([]*v1.MatchArgument, bool) { + engine := pl.limitApi.SDKContext().GetEngine() + + getRuleReq := &data.CommonRateLimitRequest{ + DstService: model.ServiceKey{ + Namespace: req.GetNamespace(), + Service: req.GetService(), + }, + Trigger: model.NotifyTrigger{ + EnableDstRateLimit: true, + }, + ControlParam: model.ControlParam{ + Timeout: time.Millisecond * 500, + }, + } + + if err := engine.SyncGetResources(getRuleReq); err != nil { + logger.Error("[TpsLimiter][Polaris] ns:%s svc:%s get RateLimit Rule fail : %+v", req.GetNamespace(), req.GetService(), err) + return nil, false + } + + svcRule := getRuleReq.RateLimitRule + if svcRule == nil || svcRule.GetValue() == nil { + logger.Warnf("[TpsLimiter][Polaris] ns:%s svc:%s get RateLimit Rule is nil", req.GetNamespace(), req.GetService()) + return nil, false + } + + rules, ok := svcRule.GetValue().(*v1.RateLimit) + if !ok { + logger.Error("[TpsLimiter][Polaris] ns:%s svc:%s get RateLimit Rule invalid", req.GetNamespace(), req.GetService()) + return nil, false + } + + ret := make([]*v1.MatchArgument, 0, 4) + for i := range rules.GetRules() { + rule := rules.GetRules()[i] + if len(rule.GetArguments()) == 0 { + continue + } + + ret = append(ret, rule.Arguments...) + } + + return ret, true +} diff --git a/go.mod b/go.mod index b67c712bc4..19871490e0 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd github.com/nacos-group/nacos-sdk-go/v2 v2.1.2 github.com/natefinch/lumberjack v2.0.0+incompatible + github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 github.com/opentracing/opentracing-go v1.2.0 github.com/pkg/errors v0.9.1 github.com/polarismesh/polaris-go v1.2.0 diff --git a/go.sum b/go.sum index 47f43871e1..00fc9ce36d 100644 --- a/go.sum +++ b/go.sum @@ -598,6 +598,8 @@ github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtb github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= +github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 h1:Yl0tPBa8QPjGmesFh1D0rDy+q1Twx6FyU7VWHi8wZbI= +github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852/go.mod h1:eqOVx5Vwu4gd2mmMZvVZsgIqNSaW3xxRThUJ0k/TPk4= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= diff --git a/imports/imports.go b/imports/imports.go index f818e4322c..f3ecaf225d 100644 --- a/imports/imports.go +++ b/imports/imports.go @@ -46,6 +46,7 @@ import ( _ "dubbo.apache.org/dubbo-go/v3/filter/hystrix" _ "dubbo.apache.org/dubbo-go/v3/filter/metrics" _ "dubbo.apache.org/dubbo-go/v3/filter/otel/trace" + _ "dubbo.apache.org/dubbo-go/v3/filter/polaris/limit" _ "dubbo.apache.org/dubbo-go/v3/filter/seata" _ "dubbo.apache.org/dubbo-go/v3/filter/sentinel" _ "dubbo.apache.org/dubbo-go/v3/filter/token" diff --git a/remoting/polaris/builder.go b/remoting/polaris/builder.go index a45fad7d9b..f384345e17 100644 --- a/remoting/polaris/builder.go +++ b/remoting/polaris/builder.go @@ -36,11 +36,13 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/common/constant" ) var ( - once sync.Once - sdkCtx api.SDKContext + once sync.Once + namesapce string + sdkCtx api.SDKContext ) var ( @@ -83,6 +85,11 @@ func GetLimiterAPI() (polaris.LimitAPI, error) { return polaris.NewLimitAPIByContext(sdkCtx), nil } +// GetNamespace gets user defined namespace info +func GetNamespace() string { + return namesapce +} + // InitSDKContext inits polaris SDKContext by URL func InitSDKContext(url *common.URL) error { if url == nil { @@ -91,7 +98,6 @@ func InitSDKContext(url *common.URL) error { var rerr error once.Do(func() { - addresses := strings.Split(url.Location, ",") serverConfigs := make([]string, 0, len(addresses)) for _, addr := range addresses { @@ -107,6 +113,7 @@ func InitSDKContext(url *common.URL) error { _sdkCtx, err := api.InitContextByConfig(polarisConf) rerr = err sdkCtx = _sdkCtx + namesapce = url.GetParam(constant.RegistryNamespaceKey, constant.PolarisDefaultNamespace) }) return rerr diff --git a/remoting/polaris/parser/parser.go b/remoting/polaris/parser/parser.go new file mode 100644 index 0000000000..42e42b57bd --- /dev/null +++ b/remoting/polaris/parser/parser.go @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parser + +import ( + "encoding/json" + "regexp" + "strconv" + "strings" +) + +import ( + "github.com/dubbogo/gost/log/logger" + + "github.com/oliveagle/jsonpath" +) + +const ( + _pefixParam = "param" + _prefixParamArr = "param[" +) + +var ( + _arrayRegx, _ = regexp.Compile(`"^.+\\[[0-9]+\\]"`) +) + +// ParseArgumentsByExpression follow https://goessner.net/articles/JsonPath/ +// +// { +// "store":{ +// "book":[ +// { +// "category":"reference", +// "author":"Nigel Rees", +// "title":"Sayings of the Century", +// "price":8.95 +// }, +// { +// "category":"fiction", +// "author":"Evelyn Waugh", +// "title":"Sword of Honor", +// "price":12.99 +// }, +// { +// "category":"fiction", +// "author":"Herman Melville", +// "title":"Moby Dick", +// "isbn":"0-553-21311-3", +// "price":8.99 +// }, +// { +// "category":"fiction", +// "author":"J. R. R. Tolkien", +// "title":"The Lord of the Rings", +// "isbn":"0-395-19395-8", +// "price":22.99 +// } +// ], +// "bicycle":{ +// "color":"red", +// "price":19.95 +// } +// } +// } +// +// examples +// - case 1: param.$.store.book[*].author +func ParseArgumentsByExpression(key string, parameters []interface{}) interface{} { + index, key := resolveIndex(key) + if index == -1 || index >= len(parameters) { + logger.Errorf("[Parser][Polaris] invalid expression for : %s", key) + return nil + } + + data, err := json.Marshal(parameters[index]) + if err != nil { + logger.Errorf("[Parser][Polaris] marshal parameter %+v fail : %+v", parameters[index], err) + return nil + } + var searchVal interface{} + _ = json.Unmarshal(data, &searchVal) + res, err := jsonpath.JsonPathLookup(searchVal, key) + if err != nil { + logger.Errorf("[Parser][Polaris] invalid do json path lookup by key : %s, err : %+v", key, err) + } + + return res +} + +func resolveIndex(key string) (int, string) { + if strings.HasPrefix(key, _prefixParamArr) { + // param[0].$. + endIndex := strings.Index(key, "]") + indexStr := key[len(_prefixParamArr):endIndex] + index, err := strconv.ParseInt(indexStr, 10, 32) + if err != nil { + return -1, "" + } + startIndex := endIndex + 2 + if rune(key[endIndex+1]) != rune('.') { + startIndex = endIndex + 1 + } + return int(index), key[startIndex:] + } else if strings.HasPrefix(key, _pefixParam) { + key = strings.TrimPrefix(key, _pefixParam+".") + return 0, strings.TrimPrefix(key, _pefixParam+".") + } + + return -1, "" +} diff --git a/remoting/polaris/parser/parser_test.go b/remoting/polaris/parser/parser_test.go new file mode 100644 index 0000000000..1917b49051 --- /dev/null +++ b/remoting/polaris/parser/parser_test.go @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parser + +import ( + "encoding/json" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +var ( + testDataStore = ` + { + "book":[ + { + "category":"reference", + "author":"Nigel Rees", + "title":"Sayings of the Century", + "price":8.95 + }, + { + "category":"fiction", + "author":"Evelyn Waugh", + "title":"Sword of Honor", + "price":12.99 + }, + { + "category":"fiction", + "author":"Herman Melville", + "title":"Moby Dick", + "isbn":"0-553-21311-3", + "price":8.99 + }, + { + "category":"fiction", + "author":"J. R. R. Tolkien", + "title":"The Lord of the Rings", + "isbn":"0-395-19395-8", + "price":22.99 + } + ], + "bicycle":{ + "color":"red", + "price":19.95 + } + } + ` + + testDataBicyle = ` + { + "color":"red", + "price":19.95 + } + ` +) + +func TestParseArgumentsByExpression(t *testing.T) { + + var ( + argStore, argBicyle interface{} + ) + + json.Unmarshal([]byte(testDataStore), &argStore) + json.Unmarshal([]byte(testDataBicyle), &argBicyle) + + t.Run("test-case-1", func(t *testing.T) { + ret := ParseArgumentsByExpression("param.$.book[0].category", []interface{}{argStore}) + assert.Equal(t, "reference", ret) + }) + + t.Run("test-case-2", func(t *testing.T) { + ret := ParseArgumentsByExpression("param[0].$.book[0].category", []interface{}{argStore, argBicyle}) + assert.Equal(t, "reference", ret) + }) + + t.Run("test-case-2", func(t *testing.T) { + ret := ParseArgumentsByExpression("param[1].$.color", []interface{}{argStore, argBicyle}) + assert.Equal(t, "red", ret) + }) + + t.Run("test-case-3", func(t *testing.T) { + ret := ParseArgumentsByExpression("param.$.color", []interface{}{argBicyle}) + assert.Equal(t, "red", ret) + }) + +} + +func Test_resolveIndex(t *testing.T) { + type args struct { + key string + } + tests := []struct { + name string + args args + want int + want1 string + }{ + { + name: "case-1", + args: args{ + key: "param.$.key", + }, + want: 0, + want1: "$.key", + }, + { + name: "case-2", + args: args{ + key: "param[1].$.key", + }, + want: 1, + want1: "$.key", + }, + { + name: "case-3", + args: args{ + key: "param[10].$.key", + }, + want: 10, + want1: "$.key", + }, + { + name: "case-4", + args: args{ + key: "param[11]$.key", + }, + want: 11, + want1: "$.key", + }, + { + name: "case-5", + args: args{ + key: "param[11]key", + }, + want: 11, + want1: "key", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, got1 := resolveIndex(tt.args.key) + if got != tt.want { + t.Errorf("resolveIndex() got = %v, want %v", got, tt.want) + } + if got1 != tt.want1 { + t.Errorf("resolveIndex() got1 = %v, want %v", got1, tt.want1) + } + }) + } +}