Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ftr: TpsLimitSupport #237

Merged
merged 12 commits into from
Oct 23, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/constant/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (
const (
DEFAULT_KEY = "default"
PREFIX_DEFAULT_KEY = "default."
DEFAULT_SERVICE_FILTERS = "echo,token,accesslog"
DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps"
DEFAULT_REFERENCE_FILTERS = ""
GENERIC_REFERENCE_FILTERS = "generic"
GENERIC = "$invoke"
Expand Down
33 changes: 20 additions & 13 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,26 @@ const (
)

const (
TIMESTAMP_KEY = "timestamp"
REMOTE_TIMESTAMP_KEY = "remote.timestamp"
CLUSTER_KEY = "cluster"
LOADBALANCE_KEY = "loadbalance"
WEIGHT_KEY = "weight"
WARMUP_KEY = "warmup"
RETRIES_KEY = "retries"
BEAN_NAME = "bean.name"
FAIL_BACK_TASKS_KEY = "failbacktasks"
FORKS_KEY = "forks"
DEFAULT_FORKS = 2
DEFAULT_TIMEOUT = 1000
ACCESS_LOG_KEY = "accesslog"
TIMESTAMP_KEY = "timestamp"
REMOTE_TIMESTAMP_KEY = "remote.timestamp"
CLUSTER_KEY = "cluster"
LOADBALANCE_KEY = "loadbalance"
WEIGHT_KEY = "weight"
WARMUP_KEY = "warmup"
RETRIES_KEY = "retries"
BEAN_NAME = "bean.name"
FAIL_BACK_TASKS_KEY = "failbacktasks"
FORKS_KEY = "forks"
DEFAULT_FORKS = 2
DEFAULT_TIMEOUT = 1000
ACCESS_LOG_KEY = "accesslog"
TPS_LIMITER_KEY = "tps.limiter"
TPS_REJECTED_EXECUTION_HANDLER_KEY = "tps.limit.rejected.handler"
TPS_LIMIT_RATE_KEY = "tps.limit.rate"
DEFAULT_TPS_LIMIT_RATE = "-1"
TPS_LIMIT_INTERVAL_KEY = "tps.limit.interval"
DEFAULT_TPS_LIMIT_INTERVAL = "60000"
TPS_LIMIT_STRATEGY_KEY = "tps.limit.strategy"
)

const (
Expand Down
65 changes: 65 additions & 0 deletions common/extension/tps_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package extension

import "github.com/apache/dubbo-go/filter"
flycash marked this conversation as resolved.
Show resolved Hide resolved

var (
tpsLimitStrategy = make(map[string]func(rate int, interval int) filter.TpsLimitStrategy)
tpsLimiter = make(map[string]func() filter.TpsLimiter)
tpsRejectedExecutionHandler = make(map[string]func() filter.RejectedExecutionHandler)
)

func SetTpsLimiter(name string, creator func() filter.TpsLimiter) {
tpsLimiter[name] = creator
}

func GetTpsLimiter(name string) filter.TpsLimiter {
var creator = tpsLimiter[name]
flycash marked this conversation as resolved.
Show resolved Hide resolved
if creator == nil {
panic("TpsLimiter for " + name + " is not existing, make sure you have import the package " +
"and you have register it by invoking extension.SetTpsLimiter.")
}
return creator()
}

func SetTpsLimitStrategy(name string, creator func(rate int, interval int) filter.TpsLimitStrategy) {
tpsLimitStrategy[name] = creator
}

func GetTpsLimitStrategyCreator(name string) func(rate int, interval int) filter.TpsLimitStrategy {
var creator = tpsLimitStrategy[name]
flycash marked this conversation as resolved.
Show resolved Hide resolved
if creator == nil {
flycash marked this conversation as resolved.
Show resolved Hide resolved
panic("TpsLimitStrategy for " + name + " is not existing, make sure you have import the package " +
"and you have register it by invoking extension.SetTpsLimitStrategy.")
}
return creator
}

func SetTpsRejectedExecutionHandler(name string, creator func() filter.RejectedExecutionHandler) {
tpsRejectedExecutionHandler[name] = creator
}

func GetTpsRejectedExecutionHandler(name string) filter.RejectedExecutionHandler {
var creator = tpsRejectedExecutionHandler[name]
flycash marked this conversation as resolved.
Show resolved Hide resolved
if creator() == nil {
panic("TpsRejectedExecutionHandler for " + name + " is not existing, make sure you have import the package " +
"and you have register it by invoking extension.SetTpsRejectedExecutionHandler.")
}
return creator()
}
15 changes: 9 additions & 6 deletions config/method_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ import (
)

type MethodConfig struct {
InterfaceId string
InterfaceName string
Name string `yaml:"name" json:"name,omitempty" property:"name"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Weight int64 `yaml:"weight" json:"weight,omitempty" property:"weight"`
InterfaceId string
InterfaceName string
Name string `yaml:"name" json:"name,omitempty" property:"name"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Weight int64 `yaml:"weight" json:"weight,omitempty" property:"weight"`
TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"`
TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"`
TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
}

func (c *MethodConfig) Prefix() string {
Expand Down
78 changes: 47 additions & 31 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,32 @@ import (
)

type ServiceConfig struct {
context context.Context
id string
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `default:"dubbo" required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` //multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Token string `yaml:"token" json:"token,omitempty" property:"token"`
AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"`
unexported *atomic.Bool
exported *atomic.Bool
rpcService common.RPCService
cacheProtocol protocol.Protocol
cacheMutex sync.Mutex
context context.Context
id string
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `default:"dubbo" required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` // multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Token string `yaml:"token" json:"token,omitempty" property:"token"`
AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"`
TpsLimiter string `yaml:"tps.limiter" json:"tps.limiter,omitempty" property:"tps.limiter"`
flycash marked this conversation as resolved.
Show resolved Hide resolved
flycash marked this conversation as resolved.
Show resolved Hide resolved
TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"`
TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"`
TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
TpsLimitRejectedHandler string `yaml:"tps.limit.rejected.handler" json:"tps.limit.rejected.handler,omitempty" property:"tps.limit.rejected.handler"`
unexported *atomic.Bool
exported *atomic.Bool
rpcService common.RPCService
cacheProtocol protocol.Protocol
cacheMutex sync.Mutex
}

func (c *ServiceConfig) Prefix() string {
Expand Down Expand Up @@ -94,9 +99,9 @@ func NewServiceConfig(id string, context context.Context) *ServiceConfig {
}

func (srvconfig *ServiceConfig) Export() error {
//TODO: config center start here
// TODO: config center start here

//TODO:delay export
// TODO:delay export
if srvconfig.unexported != nil && srvconfig.unexported.Load() {
err := perrors.Errorf("The service %v has already unexported! ", srvconfig.InterfaceName)
logger.Errorf(err.Error())
Expand All @@ -111,7 +116,7 @@ func (srvconfig *ServiceConfig) Export() error {
urlMap := srvconfig.getUrlMap()

for _, proto := range loadProtocol(srvconfig.Protocol, providerConfig.Protocols) {
//registry the service reflect
// registry the service reflect
methods, err := common.ServiceMap.Register(proto.Name, srvconfig.rpcService)
if err != nil {
err := perrors.Errorf("The service %v export the protocol %v error! Error message is %v .", srvconfig.InterfaceName, proto.Name, err.Error())
Expand Down Expand Up @@ -164,7 +169,7 @@ func (srvconfig *ServiceConfig) Implement(s common.RPCService) {

func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap := url.Values{}
//first set user params
// first set user params
for k, v := range srvconfig.Params {
urlMap.Set(k, v)
flycash marked this conversation as resolved.
Show resolved Hide resolved
}
Expand All @@ -177,7 +182,7 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.GROUP_KEY, srvconfig.Group)
urlMap.Set(constant.VERSION_KEY, srvconfig.Version)
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
//application info
// application info
urlMap.Set(constant.APPLICATION_KEY, providerConfig.ApplicationConfig.Name)
urlMap.Set(constant.ORGANIZATION_KEY, providerConfig.ApplicationConfig.Organization)
urlMap.Set(constant.NAME_KEY, providerConfig.ApplicationConfig.Name)
Expand All @@ -186,16 +191,27 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.OWNER_KEY, providerConfig.ApplicationConfig.Owner)
urlMap.Set(constant.ENVIRONMENT_KEY, providerConfig.ApplicationConfig.Environment)

//filter
// filter
urlMap.Set(constant.SERVICE_FILTER_KEY, mergeValue(providerConfig.Filter, srvconfig.Filter, constant.DEFAULT_SERVICE_FILTERS))

//filter special config
// filter special config
urlMap.Set(constant.ACCESS_LOG_KEY, srvconfig.AccessLog)
// tps limiter
urlMap.Set(constant.TPS_LIMIT_STRATEGY_KEY, srvconfig.TpsLimitStrategy)
urlMap.Set(constant.TPS_LIMIT_INTERVAL_KEY, srvconfig.TpsLimitInterval)
urlMap.Set(constant.TPS_LIMIT_RATE_KEY, srvconfig.TpsLimitRate)
urlMap.Set(constant.TPS_LIMITER_KEY, srvconfig.TpsLimiter)
urlMap.Set(constant.TPS_REJECTED_EXECUTION_HANDLER_KEY, srvconfig.TpsLimitRejectedHandler)

for _, v := range srvconfig.Methods {
urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance)
urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries)
urlMap.Set("methods."+v.Name+"."+constant.WEIGHT_KEY, strconv.FormatInt(v.Weight, 10))
prefix := "methods." + v.Name + "."
urlMap.Set(prefix+constant.LOADBALANCE_KEY, v.Loadbalance)
urlMap.Set(prefix+constant.RETRIES_KEY, v.Retries)
urlMap.Set(prefix+constant.WEIGHT_KEY, strconv.FormatInt(v.Weight, 10))

urlMap.Set(prefix+constant.TPS_LIMIT_STRATEGY_KEY, v.TpsLimitStrategy)
urlMap.Set(prefix+constant.TPS_LIMIT_INTERVAL_KEY, v.TpsLimitInterval)
urlMap.Set(prefix+constant.TPS_LIMIT_RATE_KEY, v.TpsLimitRate)
}

return urlMap
Expand Down
2 changes: 1 addition & 1 deletion filter/impl/access_log_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (ef *AccessLogFilter) writeLogToFile(data AccessLogData) {

logFile, err := ef.openLogFile(accessLog)
if err != nil {
logger.Warnf("Can not open the access log file: %s, %v", accessLog, err)
logger.Warnf("Can not open the access log file: %s, %v", accessLog, err)
return
}
logger.Debugf("Append log to %s", accessLog)
Expand Down
63 changes: 63 additions & 0 deletions filter/impl/rejected_execution_handler_only_log_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package impl

import (
"sync"
)

import (
"github.com/prometheus/common/log"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/protocol"
)

const HandlerName = "log"

func init() {
extension.SetTpsRejectedExecutionHandler(HandlerName, GetOnlyRejectedExecutionHandler)
extension.SetTpsRejectedExecutionHandler(constant.DEFAULT_KEY, GetOnlyRejectedExecutionHandler)
}

var onlyLogHandlerInstance *OnlyLogRejectedExecutionHandler
var onlyLogHandlerOnce sync.Once

/**
* This implementation only logs the invocation info.
* it always return en error inside the result.
*/
type OnlyLogRejectedExecutionHandler struct {
}

func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result {
log.Errorf("The invocation was rejected due to over rate limitation. url: %s", url.String())
return &protocol.RPCResult{}
}

func GetOnlyRejectedExecutionHandler() filter.RejectedExecutionHandler {
onlyLogHandlerOnce.Do(func() {
onlyLogHandlerInstance = &OnlyLogRejectedExecutionHandler{}
})
return onlyLogHandlerInstance
}
Loading