diff --git a/service/frontend/api/handler.go b/service/frontend/api/handler.go index 5ce426a28b2..d49f01ad6eb 100644 --- a/service/frontend/api/handler.go +++ b/service/frontend/api/handler.go @@ -44,7 +44,6 @@ import ( "github.com/uber/cadence/common/partition" "github.com/uber/cadence/common/persistence" persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils" - "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/resource" "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" @@ -63,15 +62,6 @@ const ( var _ Handler = (*WorkflowHandler)(nil) -// ratelimitType differentiates between the three categories of ratelimiters -type ratelimitType int - -const ( - ratelimitTypeUser ratelimitType = iota + 1 - ratelimitTypeWorker - ratelimitTypeVisibility -) - type ( // WorkflowHandler - Thrift handler interface for workflow service WorkflowHandler struct { @@ -80,9 +70,6 @@ type ( shuttingDown int32 healthStatus int32 tokenSerializer common.TaskTokenSerializer - userRateLimiter quotas.Policy - workerRateLimiter quotas.Policy - visibilityRateLimiter quotas.Policy config *config.Config versionChecker client.VersionChecker domainHandler domain.Handler @@ -125,35 +112,8 @@ func NewWorkflowHandler( config: config, healthStatus: int32(HealthStatusWarmingUp), tokenSerializer: common.NewJSONTaskTokenSerializer(), - userRateLimiter: quotas.NewMultiStageRateLimiter( - quotas.NewDynamicRateLimiter(config.UserRPS.AsFloat64()), - quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory( - service.Frontend, - config.GlobalDomainUserRPS, - config.MaxDomainUserRPSPerInstance, - resource.GetMembershipResolver(), - )), - ), - workerRateLimiter: quotas.NewMultiStageRateLimiter( - quotas.NewDynamicRateLimiter(config.WorkerRPS.AsFloat64()), - quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory( - service.Frontend, - config.GlobalDomainWorkerRPS, - config.MaxDomainWorkerRPSPerInstance, - resource.GetMembershipResolver(), - )), - ), - visibilityRateLimiter: quotas.NewMultiStageRateLimiter( - quotas.NewDynamicRateLimiter(config.VisibilityRPS.AsFloat64()), - quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory( - service.Frontend, - config.GlobalDomainVisibilityRPS, - config.MaxDomainVisibilityRPSPerInstance, - resource.GetMembershipResolver(), - )), - ), - versionChecker: versionChecker, - domainHandler: domainHandler, + versionChecker: versionChecker, + domainHandler: domainHandler, visibilityQueryValidator: validator.NewQueryValidator( config.ValidSearchAttributes, config.EnableQueryAttributeValidation, @@ -495,11 +455,6 @@ func (wh *WorkflowHandler) PollForActivityTask( return nil, validate.ErrIdentityTooLong } - if ok := wh.allow(ratelimitTypeWorker, pollRequest); !ok { - // pollers exponentially back off up to 10s - return nil, createServiceBusyError() - } - domainID, err := wh.GetDomainCache().GetDomainID(domainName) if err != nil { return nil, err @@ -625,11 +580,6 @@ func (wh *WorkflowHandler) PollForDecisionTask( return nil, err } - if ok := wh.allow(ratelimitTypeWorker, pollRequest); !ok { - // pollers exponentially back off up to 10s - return nil, createServiceBusyError() - } - isolationGroup := wh.getIsolationGroup(ctx, domainName) if !wh.waitUntilIsolationGroupHealthy(ctx, domainName, isolationGroup) { return &types.PollForDecisionTaskResponse{}, nil @@ -808,10 +758,6 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat( dw := domainWrapper{ domain: domainName, } - // Count the request in the host RPS, - // but we still accept it even if RPS is exceeded - wh.allow(ratelimitTypeWorker, dw) - scope := getMetricsScopeWithDomain(metrics.FrontendRecordActivityTaskHeartbeatScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...) sizeLimitError := wh.config.BlobSizeLimitError(domainName) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName) @@ -877,10 +823,6 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID( return nil, validate.ErrDomainNotSet } - // Count the request in the host RPS, - // but we still accept it even if RPS is exceeded - wh.allow(ratelimitTypeWorker, heartbeatRequest) - wh.GetLogger().Debug("Received RecordActivityTaskHeartbeatByID") domainID, err := wh.GetDomainCache().GetDomainID(domainName) if err != nil { @@ -997,11 +939,6 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted( dw := domainWrapper{ domain: domainName, } - - // Count the request in the host RPS, - // but we still accept it even if RPS is exceeded - wh.allow(ratelimitTypeWorker, dw) - scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...) if !common.IsValidIDLength( completeRequest.GetIdentity(), @@ -1077,11 +1014,6 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedByID( if domainName == "" { return validate.ErrDomainNotSet } - - // Count the request in the host RPS, - // but we still accept it even if RPS is exceeded - wh.allow(ratelimitTypeWorker, completeRequest) - domainID, err := wh.GetDomainCache().GetDomainID(domainName) if err != nil { return err @@ -1208,11 +1140,6 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed( dw := domainWrapper{ domain: domainName, } - - // Count the request in the host RPS, - // but we still accept it even if RPS is exceeded - wh.allow(ratelimitTypeWorker, dw) - scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskFailedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...) if !common.IsValidIDLength( failedRequest.GetIdentity(), @@ -1277,11 +1204,6 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedByID( if domainName == "" { return validate.ErrDomainNotSet } - - // Count the request in the host RPS, - // but we still accept it even if RPS is exceeded - wh.allow(ratelimitTypeWorker, failedRequest) - domainID, err := wh.GetDomainCache().GetDomainID(domainName) if err != nil { return err @@ -1399,11 +1321,6 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled( dw := domainWrapper{ domain: domainName, } - - // Count the request in the host RPS, - // but we still accept it even if RPS is exceeded - wh.allow(ratelimitTypeWorker, dw) - scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCanceledScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...) if !common.IsValidIDLength( cancelRequest.GetIdentity(), @@ -1479,11 +1396,6 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledByID( if domainName == "" { return validate.ErrDomainNotSet } - - // Count the request in the host RPS, - // but we still accept it even if RPS is exceeded - wh.allow(ratelimitTypeWorker, cancelRequest) - domainID, err := wh.GetDomainCache().GetDomainID(domainName) if err != nil { return err @@ -1610,11 +1522,6 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted( dw := domainWrapper{ domain: domainName, } - - // Count the request in the host RPS, - // but we still accept it even if RPS is exceeded - wh.allow(ratelimitTypeWorker, dw) - scope := getMetricsScopeWithDomain(metrics.FrontendRespondDecisionTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...) if !common.IsValidIDLength( completeRequest.GetIdentity(), @@ -1706,11 +1613,6 @@ func (wh *WorkflowHandler) RespondDecisionTaskFailed( dw := domainWrapper{ domain: domainName, } - - // Count the request in the host RPS, - // but we still accept it even if RPS is exceeded - wh.allow(ratelimitTypeWorker, dw) - scope := getMetricsScopeWithDomain(metrics.FrontendRespondDecisionTaskFailedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...) if !common.IsValidIDLength( failedRequest.GetIdentity(), @@ -1788,11 +1690,6 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted( dw := domainWrapper{ domain: domainName, } - - // Count the request in the host RPS, - // but we still accept it even if RPS is exceeded - wh.allow(ratelimitTypeWorker, dw) - sizeLimitError := wh.config.BlobSizeLimitError(domainName) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName) @@ -1875,11 +1772,6 @@ func (wh *WorkflowHandler) StartWorkflowExecution( if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, err } - - if ok := wh.allow(ratelimitTypeUser, startRequest); !ok { - return nil, createServiceBusyError() - } - scope := getMetricsScopeWithDomain(metrics.FrontendStartWorkflowExecutionScope, startRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...) idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit() if !common.IsValidIDLength( @@ -2053,10 +1945,6 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory( return nil, validate.ErrDomainNotSet } - if ok := wh.allow(ratelimitTypeUser, getRequest); !ok { - return nil, createServiceBusyError() - } - if err := validate.CheckExecution(wfExecution); err != nil { return nil, err } @@ -2310,11 +2198,6 @@ func (wh *WorkflowHandler) SignalWorkflowExecution( if domainName == "" { return validate.ErrDomainNotSet } - - if ok := wh.allow(ratelimitTypeUser, signalRequest); !ok { - return createServiceBusyError() - } - if err := validate.CheckExecution(wfExecution); err != nil { return err } @@ -2423,11 +2306,6 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution( if domainName == "" { return nil, validate.ErrDomainNotSet } - - if ok := wh.allow(ratelimitTypeUser, signalWithStartRequest); !ok { - return nil, createServiceBusyError() - } - if signalWithStartRequest.GetWorkflowID() == "" { return nil, validate.ErrWorkflowIDNotSet } @@ -2603,11 +2481,6 @@ func (wh *WorkflowHandler) TerminateWorkflowExecution( if terminateRequest.GetDomain() == "" { return validate.ErrDomainNotSet } - - if ok := wh.allow(ratelimitTypeUser, terminateRequest); !ok { - return createServiceBusyError() - } - if err := validate.CheckExecution(wfExecution); err != nil { return err } @@ -2651,11 +2524,6 @@ func (wh *WorkflowHandler) ResetWorkflowExecution( if domainName == "" { return nil, validate.ErrDomainNotSet } - - if ok := wh.allow(ratelimitTypeUser, resetRequest); !ok { - return nil, createServiceBusyError() - } - if err := validate.CheckExecution(wfExecution); err != nil { return nil, err } @@ -2698,11 +2566,6 @@ func (wh *WorkflowHandler) RequestCancelWorkflowExecution( if domainName == "" { return validate.ErrDomainNotSet } - - if ok := wh.allow(ratelimitTypeUser, cancelRequest); !ok { - return createServiceBusyError() - } - if err := validate.CheckExecution(wfExecution); err != nil { return err } @@ -2744,10 +2607,6 @@ func (wh *WorkflowHandler) ListOpenWorkflowExecutions( return nil, validate.ErrDomainNotSet } - if ok := wh.allow(ratelimitTypeVisibility, listRequest); !ok { - return nil, createServiceBusyError() - } - if listRequest.StartTimeFilter == nil { return nil, &types.BadRequestError{Message: "StartTimeFilter is required"} } @@ -2856,10 +2715,6 @@ func (wh *WorkflowHandler) ListArchivedWorkflowExecutions( return nil, validate.ErrDomainNotSet } - if ok := wh.allow(ratelimitTypeVisibility, listRequest); !ok { - return nil, createServiceBusyError() - } - if listRequest.GetPageSize() <= 0 { listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain())) } @@ -2943,10 +2798,6 @@ func (wh *WorkflowHandler) ListClosedWorkflowExecutions( return nil, validate.ErrDomainNotSet } - if ok := wh.allow(ratelimitTypeVisibility, listRequest); !ok { - return nil, createServiceBusyError() - } - if listRequest.StartTimeFilter == nil { return nil, &types.BadRequestError{Message: "StartTimeFilter is required"} } @@ -3078,10 +2929,6 @@ func (wh *WorkflowHandler) ListWorkflowExecutions( return nil, validate.ErrDomainNotSet } - if ok := wh.allow(ratelimitTypeVisibility, listRequest); !ok { - return nil, createServiceBusyError() - } - if listRequest.GetPageSize() <= 0 { listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain())) } @@ -3141,10 +2988,6 @@ func (wh *WorkflowHandler) RestartWorkflowExecution(ctx context.Context, request return nil, validate.ErrDomainNotSet } - if ok := wh.allow(ratelimitTypeUser, request); !ok { - return nil, createServiceBusyError() - } - if err := validate.CheckExecution(wfExecution); err != nil { return nil, err } @@ -3208,10 +3051,6 @@ func (wh *WorkflowHandler) ScanWorkflowExecutions( return nil, validate.ErrDomainNotSet } - if ok := wh.allow(ratelimitTypeUser, listRequest); !ok { - return nil, createServiceBusyError() - } - if listRequest.GetPageSize() <= 0 { listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain())) } @@ -3271,10 +3110,6 @@ func (wh *WorkflowHandler) CountWorkflowExecutions( return nil, validate.ErrDomainNotSet } - if ok := wh.allow(ratelimitTypeUser, countRequest); !ok { - return nil, createServiceBusyError() - } - validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(countRequest.GetQuery()) if err != nil { return nil, err @@ -3343,10 +3178,6 @@ func (wh *WorkflowHandler) ResetStickyTaskList( return nil, validate.ErrDomainNotSet } - // Count the request in the host RPS, - // but we still accept it even if RPS is exceeded - wh.allow(ratelimitTypeWorker, resetRequest) - if err := validate.CheckExecution(wfExecution); err != nil { return nil, err } @@ -3390,10 +3221,6 @@ func (wh *WorkflowHandler) QueryWorkflow( return nil, validate.ErrDomainNotSet } - if ok := wh.allow(ratelimitTypeUser, queryRequest); !ok { - return nil, createServiceBusyError() - } - if err := validate.CheckExecution(wfExecution); err != nil { return nil, err } @@ -3466,10 +3293,6 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution( return nil, validate.ErrDomainNotSet } - if ok := wh.allow(ratelimitTypeUser, request); !ok { - return nil, createServiceBusyError() - } - if err := validate.CheckExecution(wfExecution); err != nil { return nil, err } @@ -3514,10 +3337,6 @@ func (wh *WorkflowHandler) DescribeTaskList( return nil, validate.ErrDomainNotSet } - if ok := wh.allow(ratelimitTypeUser, request); !ok { - return nil, createServiceBusyError() - } - domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain()) if err != nil { return nil, err @@ -3560,10 +3379,6 @@ func (wh *WorkflowHandler) ListTaskListPartitions( return nil, validate.ErrDomainNotSet } - if ok := wh.allow(ratelimitTypeUser, request); !ok { - return nil, createServiceBusyError() - } - scope := getMetricsScopeWithDomain(metrics.FrontendListTaskListPartitionsScope, request, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...) if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil { return nil, err @@ -3593,10 +3408,6 @@ func (wh *WorkflowHandler) GetTaskListsByDomain( return nil, validate.ErrDomainNotSet } - if ok := wh.allow(ratelimitTypeUser, request); !ok { - return nil, createServiceBusyError() - } - resp, err := wh.GetMatchingClient().GetTaskListsByDomain(ctx, &types.GetTaskListsByDomainRequest{ Domain: request.Domain, }) @@ -3980,12 +3791,6 @@ func serializeHistoryToken(token *getHistoryContinuationToken) ([]byte, error) { return bytes, err } -func createServiceBusyError() *types.ServiceBusyError { - err := &types.ServiceBusyError{} - err.Message = "Too many outstanding requests to the cadence service" - return err -} - func isFailoverRequest(updateRequest *types.UpdateDomainRequest) bool { return updateRequest.ActiveClusterName != nil } @@ -4118,34 +3923,10 @@ func (wh *WorkflowHandler) isListRequestPageSizeTooLarge(pageSize int32, domain pageSize > int32(wh.config.ESIndexMaxResultWindow()) } -func (wh *WorkflowHandler) allow(requestType ratelimitType, d domainGetter) bool { - domain := "" - if d != nil { - domain = d.GetDomain() - } - switch requestType { - case ratelimitTypeUser: - return wh.userRateLimiter.Allow(quotas.Info{Domain: domain}) - case ratelimitTypeWorker: - return wh.workerRateLimiter.Allow(quotas.Info{Domain: domain}) - case ratelimitTypeVisibility: - return wh.visibilityRateLimiter.Allow(quotas.Info{Domain: domain}) - default: - wh.GetLogger().Fatal("coding error, unrecognized request ratelimit type value", tag.Value(requestType)) - panic("unreachable") - } -} - // GetClusterInfo return information about cadence deployment func (wh *WorkflowHandler) GetClusterInfo( ctx context.Context, ) (resp *types.ClusterInfo, err error) { - defer func() { log.CapturePanic(recover(), wh.GetLogger(), &err) }() - - if ok := wh.allow(ratelimitTypeUser, nil); !ok { - return nil, createServiceBusyError() - } - return &types.ClusterInfo{ SupportedClientVersions: &types.SupportedClientVersions{ GoSdk: client.SupportedGoSDKVersion, diff --git a/service/frontend/api/interface.go b/service/frontend/api/interface.go index c77728237e6..ef69eb86a44 100644 --- a/service/frontend/api/interface.go +++ b/service/frontend/api/interface.go @@ -22,6 +22,7 @@ //go:generate gowrap gen -g -p . -i Handler -t ../templates/accesscontrolled.tmpl -o ../wrappers/accesscontrolled/api_generated.go -v handler=API //go:generate gowrap gen -g -p . -i Handler -t ../templates/clusterredirection.tmpl -o ../wrappers/clusterredirection/api_generated.go //go:generate gowrap gen -g -p . -i Handler -t ../templates/metered.tmpl -o ../wrappers/metered/api_generated.go -v handler=API +//go:generate gowrap gen -g -p . -i Handler -t ../templates/ratelimited.tmpl -o ../wrappers/ratelimited/api_generated.go -v handler=API //go:generate gowrap gen -g -p . -i Handler -t ../../templates/grpc.tmpl -o ../wrappers/grpc/api_generated.go -v handler=API -v package=apiv1 -v path=github.com/uber/cadence-idl/go/proto/api/v1 -v prefix= //go:generate gowrap gen -g -p ../../../.gen/go/cadence/workflowserviceserver -i Interface -t ../../templates/thrift.tmpl -o ../wrappers/thrift/api_generated.go -v handler=API -v prefix= diff --git a/service/frontend/service.go b/service/frontend/service.go index 5d9bf9b9ec7..a5824e2a32e 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -28,6 +28,7 @@ import ( "github.com/uber/cadence/common/client" "github.com/uber/cadence/common/domain" "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/resource" "github.com/uber/cadence/common/service" "github.com/uber/cadence/service/frontend/admin" @@ -37,6 +38,7 @@ import ( "github.com/uber/cadence/service/frontend/wrappers/clusterredirection" "github.com/uber/cadence/service/frontend/wrappers/grpc" "github.com/uber/cadence/service/frontend/wrappers/metered" + "github.com/uber/cadence/service/frontend/wrappers/ratelimited" "github.com/uber/cadence/service/frontend/wrappers/thrift" ) @@ -131,8 +133,36 @@ func (s *Service) Start() { // Base handler s.handler = api.NewWorkflowHandler(s, s.config, client.NewVersionChecker(), dh) + userRateLimiter := quotas.NewMultiStageRateLimiter( + quotas.NewDynamicRateLimiter(s.config.UserRPS.AsFloat64()), + quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory( + service.Frontend, + s.config.GlobalDomainUserRPS, + s.config.MaxDomainUserRPSPerInstance, + s.GetMembershipResolver(), + )), + ) + workerRateLimiter := quotas.NewMultiStageRateLimiter( + quotas.NewDynamicRateLimiter(s.config.WorkerRPS.AsFloat64()), + quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory( + service.Frontend, + s.config.GlobalDomainWorkerRPS, + s.config.MaxDomainWorkerRPSPerInstance, + s.GetMembershipResolver(), + )), + ) + visibilityRateLimiter := quotas.NewMultiStageRateLimiter( + quotas.NewDynamicRateLimiter(s.config.VisibilityRPS.AsFloat64()), + quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory( + service.Frontend, + s.config.GlobalDomainVisibilityRPS, + s.config.MaxDomainVisibilityRPSPerInstance, + s.GetMembershipResolver(), + )), + ) // Additional decorations var handler api.Handler = s.handler + handler = ratelimited.NewAPIHandler(handler, s.GetDomainCache(), userRateLimiter, workerRateLimiter, visibilityRateLimiter) handler = metered.NewAPIHandler(handler, s.GetLogger(), s.GetMetricsClient(), s.GetDomainCache(), s.config) if s.params.ClusterRedirectionPolicy != nil { handler = clusterredirection.NewAPIHandler(handler, s, s.config, *s.params.ClusterRedirectionPolicy) diff --git a/service/frontend/templates/ratelimited.tmpl b/service/frontend/templates/ratelimited.tmpl new file mode 100644 index 00000000000..9bb2c75be71 --- /dev/null +++ b/service/frontend/templates/ratelimited.tmpl @@ -0,0 +1,119 @@ +import ( + "context" + + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/quotas" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/frontend/api" +) + +{{$ratelimitTypeMap := dict "PollForActivityTask" "ratelimitTypeWorker"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "PollForDecisionTask" "ratelimitTypeWorker"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "RecordActivityTaskHeartbeat" "ratelimitTypeWorker"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "RecordActivityTaskHeartbeatByID" "ratelimitTypeWorker"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "ResetStickyTaskList" "ratelimitTypeWorker"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "RespondActivityTaskCanceled" "ratelimitTypeWorker"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "RespondActivityTaskCanceledByID" "ratelimitTypeWorker"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "RespondActivityTaskCompleted" "ratelimitTypeWorker"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "RespondActivityTaskCompletedByID" "ratelimitTypeWorker"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "RespondActivityTaskFailed" "ratelimitTypeWorker"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "RespondActivityTaskFailedByID" "ratelimitTypeWorker"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "RespondDecisionTaskCompleted" "ratelimitTypeWorker"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "RespondDecisionTaskFailed" "ratelimitTypeWorker"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "RespondQueryTaskCompleted" "ratelimitTypeWorker"}} + +{{$ratelimitTypeMap = set $ratelimitTypeMap "DescribeTaskList" "ratelimitTypeUser"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "DescribeWorkflowExecution" "ratelimitTypeUser"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "GetTaskListsByDomain" "ratelimitTypeUser"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "GetWorkflowExecutionHistory" "ratelimitTypeUser"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "ListTaskListPartitions" "ratelimitTypeUser"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "QueryWorkflow" "ratelimitTypeUser"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "RefreshWorkflowTasks" "ratelimitTypeUser"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "RequestCancelWorkflowExecution" "ratelimitTypeUser"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "ResetWorkflowExecution" "ratelimitTypeUser"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "RestartWorkflowExecution" "ratelimitTypeUser"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "SignalWorkflowExecution" "ratelimitTypeUser"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "SignalWithStartWorkflowExecution" "ratelimitTypeUser"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "StartWorkflowExecution" "ratelimitTypeUser"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "TerminateWorkflowExecution" "ratelimitTypeUser"}} + +{{$ratelimitTypeMap = set $ratelimitTypeMap "CountWorkflowExecutions" "ratelimitTypeVisibility"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "ListArchivedWorkflowExecutions" "ratelimitTypeVisibility"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "ListClosedWorkflowExecutions" "ratelimitTypeVisibility"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "ListOpenWorkflowExecutions" "ratelimitTypeVisibility"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "ListWorkflowExecutions" "ratelimitTypeVisibility"}} +{{$ratelimitTypeMap = set $ratelimitTypeMap "ScanWorkflowExecutions" "ratelimitTypeVisibility"}} + +{{$domainIDAPIs := list "RecordActivityTaskHeartbeat" "RespondActivityTaskCanceled" "RespondActivityTaskCompleted" "RespondActivityTaskFailed" "RespondDecisionTaskCompleted" "RespondDecisionTaskFailed" "RespondQueryTaskCompleted"}} +{{$queryTaskTokenAPIs := list "RespondQueryTaskCompleted"}} +{{$nonBlockingAPIs := list "RecordActivityTaskHeartbeat" "RecordActivityTaskHeartbeatByID" "RespondActivityTaskCompleted" "RespondActivityTaskCompletedByID" "RespondActivityTaskFailed" "RespondActivityTaskFailedByID" "RespondActivityTaskCanceled" "RespondActivityTaskCanceledByID" "RespondDecisionTaskCompleted" "RespondDecisionTaskFailed" "RespondQueryTaskCompleted" "ResetStickyTaskList"}} + +{{$interfaceName := .Interface.Name}} +{{$handlerName := (index .Vars "handler")}} +{{ $decorator := (printf "%s%s" (down $handlerName) $interfaceName) }} +{{ $Decorator := (printf "%s%s" $handlerName $interfaceName) }} + +// {{$decorator}} implements {{.Interface.Type}} interface instrumented with rate limiter. +type {{$decorator}} struct { + wrapped {{.Interface.Type}} + tokenSerializer common.TaskTokenSerializer + domainCache cache.DomainCache + userRateLimiter quotas.Policy + workerRateLimiter quotas.Policy + visibilityRateLimiter quotas.Policy +} + +// New{{$Decorator}} creates a new instance of {{$interfaceName}} with ratelimiter. +func New{{$Decorator}}( + wrapped {{.Interface.Type}}, + domainCache cache.DomainCache, + userRateLimiter quotas.Policy, + workerRateLimiter quotas.Policy, + visibilityRateLimiter quotas.Policy, +) {{.Interface.Type}} { + return &{{$decorator}}{ + wrapped: wrapped, + tokenSerializer: common.NewJSONTaskTokenSerializer(), + domainCache: domainCache, + userRateLimiter: userRateLimiter, + workerRateLimiter: workerRateLimiter, + visibilityRateLimiter: visibilityRateLimiter, + } +} + +{{range $method := .Interface.Methods}} +func (h *{{$decorator}}) {{$method.Declaration}} { + {{- if hasKey $ratelimitTypeMap $method.Name}} + {{- $ratelimitType := get $ratelimitTypeMap $method.Name}} + {{- $domain := printf "%s.GetDomain()" (index $method.Params 1).Name}} + {{- $rateLimitHelper := "allowDomain"}} + {{- if has $method.Name $domainIDAPIs}} + {{- $domain = "domainName"}} + {{- $rateLimitHelper := "allowDomainID"}} + {{- if has $method.Name $queryTaskTokenAPIs}} + token, err := h.tokenSerializer.DeserializeQueryTaskToken({{(index $method.Params 1).Name}}.TaskToken) + {{- else}} + token, err := h.tokenSerializer.Deserialize({{(index $method.Params 1).Name}}.TaskToken) + {{- end}} + if err != nil { + return + } + domainName, err := h.domainCache.GetDomainName(token.DomainID) + if err != nil { + return + } + {{- end}} + {{- if has $method.Name $nonBlockingAPIs}} + // Count the request in the host RPS, + // but we still accept it even if RPS is exceeded + h.{{$rateLimitHelper}}({{$ratelimitType}}, {{$domain}}) + {{- else}} + if ok := h.{{$rateLimitHelper}}({{$ratelimitType}}, {{$domain}}); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + {{- end}} + {{- end}} + {{$method.Pass "h.wrapped."}} +} +{{end}} diff --git a/service/frontend/wrappers/ratelimited/api_generated.go b/service/frontend/wrappers/ratelimited/api_generated.go new file mode 100644 index 00000000000..361b0b0dca4 --- /dev/null +++ b/service/frontend/wrappers/ratelimited/api_generated.go @@ -0,0 +1,417 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package ratelimited + +// Code generated by gowrap. DO NOT EDIT. +// template: ../../templates/ratelimited.tmpl +// gowrap: http://github.com/hexdigest/gowrap + +import ( + "context" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/quotas" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/frontend/api" +) + +// apiHandler implements api.Handler interface instrumented with rate limiter. +type apiHandler struct { + wrapped api.Handler + tokenSerializer common.TaskTokenSerializer + domainCache cache.DomainCache + userRateLimiter quotas.Policy + workerRateLimiter quotas.Policy + visibilityRateLimiter quotas.Policy +} + +// NewAPIHandler creates a new instance of Handler with ratelimiter. +func NewAPIHandler( + wrapped api.Handler, + domainCache cache.DomainCache, + userRateLimiter quotas.Policy, + workerRateLimiter quotas.Policy, + visibilityRateLimiter quotas.Policy, +) api.Handler { + return &apiHandler{ + wrapped: wrapped, + tokenSerializer: common.NewJSONTaskTokenSerializer(), + domainCache: domainCache, + userRateLimiter: userRateLimiter, + workerRateLimiter: workerRateLimiter, + visibilityRateLimiter: visibilityRateLimiter, + } +} + +func (h *apiHandler) CountWorkflowExecutions(ctx context.Context, cp1 *types.CountWorkflowExecutionsRequest) (cp2 *types.CountWorkflowExecutionsResponse, err error) { + if ok := h.allowDomain(ratelimitTypeVisibility, cp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.CountWorkflowExecutions(ctx, cp1) +} + +func (h *apiHandler) DeprecateDomain(ctx context.Context, dp1 *types.DeprecateDomainRequest) (err error) { + return h.wrapped.DeprecateDomain(ctx, dp1) +} + +func (h *apiHandler) DescribeDomain(ctx context.Context, dp1 *types.DescribeDomainRequest) (dp2 *types.DescribeDomainResponse, err error) { + return h.wrapped.DescribeDomain(ctx, dp1) +} + +func (h *apiHandler) DescribeTaskList(ctx context.Context, dp1 *types.DescribeTaskListRequest) (dp2 *types.DescribeTaskListResponse, err error) { + if ok := h.allowDomain(ratelimitTypeUser, dp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.DescribeTaskList(ctx, dp1) +} + +func (h *apiHandler) DescribeWorkflowExecution(ctx context.Context, dp1 *types.DescribeWorkflowExecutionRequest) (dp2 *types.DescribeWorkflowExecutionResponse, err error) { + if ok := h.allowDomain(ratelimitTypeUser, dp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.DescribeWorkflowExecution(ctx, dp1) +} + +func (h *apiHandler) GetClusterInfo(ctx context.Context) (cp1 *types.ClusterInfo, err error) { + return h.wrapped.GetClusterInfo(ctx) +} + +func (h *apiHandler) GetSearchAttributes(ctx context.Context) (gp1 *types.GetSearchAttributesResponse, err error) { + return h.wrapped.GetSearchAttributes(ctx) +} + +func (h *apiHandler) GetTaskListsByDomain(ctx context.Context, gp1 *types.GetTaskListsByDomainRequest) (gp2 *types.GetTaskListsByDomainResponse, err error) { + if ok := h.allowDomain(ratelimitTypeUser, gp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.GetTaskListsByDomain(ctx, gp1) +} + +func (h *apiHandler) GetWorkflowExecutionHistory(ctx context.Context, gp1 *types.GetWorkflowExecutionHistoryRequest) (gp2 *types.GetWorkflowExecutionHistoryResponse, err error) { + if ok := h.allowDomain(ratelimitTypeUser, gp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.GetWorkflowExecutionHistory(ctx, gp1) +} + +func (h *apiHandler) Health(ctx context.Context) (hp1 *types.HealthStatus, err error) { + return h.wrapped.Health(ctx) +} + +func (h *apiHandler) ListArchivedWorkflowExecutions(ctx context.Context, lp1 *types.ListArchivedWorkflowExecutionsRequest) (lp2 *types.ListArchivedWorkflowExecutionsResponse, err error) { + if ok := h.allowDomain(ratelimitTypeVisibility, lp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.ListArchivedWorkflowExecutions(ctx, lp1) +} + +func (h *apiHandler) ListClosedWorkflowExecutions(ctx context.Context, lp1 *types.ListClosedWorkflowExecutionsRequest) (lp2 *types.ListClosedWorkflowExecutionsResponse, err error) { + if ok := h.allowDomain(ratelimitTypeVisibility, lp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.ListClosedWorkflowExecutions(ctx, lp1) +} + +func (h *apiHandler) ListDomains(ctx context.Context, lp1 *types.ListDomainsRequest) (lp2 *types.ListDomainsResponse, err error) { + return h.wrapped.ListDomains(ctx, lp1) +} + +func (h *apiHandler) ListOpenWorkflowExecutions(ctx context.Context, lp1 *types.ListOpenWorkflowExecutionsRequest) (lp2 *types.ListOpenWorkflowExecutionsResponse, err error) { + if ok := h.allowDomain(ratelimitTypeVisibility, lp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.ListOpenWorkflowExecutions(ctx, lp1) +} + +func (h *apiHandler) ListTaskListPartitions(ctx context.Context, lp1 *types.ListTaskListPartitionsRequest) (lp2 *types.ListTaskListPartitionsResponse, err error) { + if ok := h.allowDomain(ratelimitTypeUser, lp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.ListTaskListPartitions(ctx, lp1) +} + +func (h *apiHandler) ListWorkflowExecutions(ctx context.Context, lp1 *types.ListWorkflowExecutionsRequest) (lp2 *types.ListWorkflowExecutionsResponse, err error) { + if ok := h.allowDomain(ratelimitTypeVisibility, lp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.ListWorkflowExecutions(ctx, lp1) +} + +func (h *apiHandler) PollForActivityTask(ctx context.Context, pp1 *types.PollForActivityTaskRequest) (pp2 *types.PollForActivityTaskResponse, err error) { + if ok := h.allowDomain(ratelimitTypeWorker, pp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.PollForActivityTask(ctx, pp1) +} + +func (h *apiHandler) PollForDecisionTask(ctx context.Context, pp1 *types.PollForDecisionTaskRequest) (pp2 *types.PollForDecisionTaskResponse, err error) { + if ok := h.allowDomain(ratelimitTypeWorker, pp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.PollForDecisionTask(ctx, pp1) +} + +func (h *apiHandler) QueryWorkflow(ctx context.Context, qp1 *types.QueryWorkflowRequest) (qp2 *types.QueryWorkflowResponse, err error) { + if ok := h.allowDomain(ratelimitTypeUser, qp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.QueryWorkflow(ctx, qp1) +} + +func (h *apiHandler) RecordActivityTaskHeartbeat(ctx context.Context, rp1 *types.RecordActivityTaskHeartbeatRequest) (rp2 *types.RecordActivityTaskHeartbeatResponse, err error) { + token, err := h.tokenSerializer.Deserialize(rp1.TaskToken) + if err != nil { + return + } + domainName, err := h.domainCache.GetDomainName(token.DomainID) + if err != nil { + return + } + // Count the request in the host RPS, + // but we still accept it even if RPS is exceeded + h.allowDomain(ratelimitTypeWorker, domainName) + return h.wrapped.RecordActivityTaskHeartbeat(ctx, rp1) +} + +func (h *apiHandler) RecordActivityTaskHeartbeatByID(ctx context.Context, rp1 *types.RecordActivityTaskHeartbeatByIDRequest) (rp2 *types.RecordActivityTaskHeartbeatResponse, err error) { + // Count the request in the host RPS, + // but we still accept it even if RPS is exceeded + h.allowDomain(ratelimitTypeWorker, rp1.GetDomain()) + return h.wrapped.RecordActivityTaskHeartbeatByID(ctx, rp1) +} + +func (h *apiHandler) RefreshWorkflowTasks(ctx context.Context, rp1 *types.RefreshWorkflowTasksRequest) (err error) { + if ok := h.allowDomain(ratelimitTypeUser, rp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.RefreshWorkflowTasks(ctx, rp1) +} + +func (h *apiHandler) RegisterDomain(ctx context.Context, rp1 *types.RegisterDomainRequest) (err error) { + return h.wrapped.RegisterDomain(ctx, rp1) +} + +func (h *apiHandler) RequestCancelWorkflowExecution(ctx context.Context, rp1 *types.RequestCancelWorkflowExecutionRequest) (err error) { + if ok := h.allowDomain(ratelimitTypeUser, rp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.RequestCancelWorkflowExecution(ctx, rp1) +} + +func (h *apiHandler) ResetStickyTaskList(ctx context.Context, rp1 *types.ResetStickyTaskListRequest) (rp2 *types.ResetStickyTaskListResponse, err error) { + // Count the request in the host RPS, + // but we still accept it even if RPS is exceeded + h.allowDomain(ratelimitTypeWorker, rp1.GetDomain()) + return h.wrapped.ResetStickyTaskList(ctx, rp1) +} + +func (h *apiHandler) ResetWorkflowExecution(ctx context.Context, rp1 *types.ResetWorkflowExecutionRequest) (rp2 *types.ResetWorkflowExecutionResponse, err error) { + if ok := h.allowDomain(ratelimitTypeUser, rp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.ResetWorkflowExecution(ctx, rp1) +} + +func (h *apiHandler) RespondActivityTaskCanceled(ctx context.Context, rp1 *types.RespondActivityTaskCanceledRequest) (err error) { + token, err := h.tokenSerializer.Deserialize(rp1.TaskToken) + if err != nil { + return + } + domainName, err := h.domainCache.GetDomainName(token.DomainID) + if err != nil { + return + } + // Count the request in the host RPS, + // but we still accept it even if RPS is exceeded + h.allowDomain(ratelimitTypeWorker, domainName) + return h.wrapped.RespondActivityTaskCanceled(ctx, rp1) +} + +func (h *apiHandler) RespondActivityTaskCanceledByID(ctx context.Context, rp1 *types.RespondActivityTaskCanceledByIDRequest) (err error) { + // Count the request in the host RPS, + // but we still accept it even if RPS is exceeded + h.allowDomain(ratelimitTypeWorker, rp1.GetDomain()) + return h.wrapped.RespondActivityTaskCanceledByID(ctx, rp1) +} + +func (h *apiHandler) RespondActivityTaskCompleted(ctx context.Context, rp1 *types.RespondActivityTaskCompletedRequest) (err error) { + token, err := h.tokenSerializer.Deserialize(rp1.TaskToken) + if err != nil { + return + } + domainName, err := h.domainCache.GetDomainName(token.DomainID) + if err != nil { + return + } + // Count the request in the host RPS, + // but we still accept it even if RPS is exceeded + h.allowDomain(ratelimitTypeWorker, domainName) + return h.wrapped.RespondActivityTaskCompleted(ctx, rp1) +} + +func (h *apiHandler) RespondActivityTaskCompletedByID(ctx context.Context, rp1 *types.RespondActivityTaskCompletedByIDRequest) (err error) { + // Count the request in the host RPS, + // but we still accept it even if RPS is exceeded + h.allowDomain(ratelimitTypeWorker, rp1.GetDomain()) + return h.wrapped.RespondActivityTaskCompletedByID(ctx, rp1) +} + +func (h *apiHandler) RespondActivityTaskFailed(ctx context.Context, rp1 *types.RespondActivityTaskFailedRequest) (err error) { + token, err := h.tokenSerializer.Deserialize(rp1.TaskToken) + if err != nil { + return + } + domainName, err := h.domainCache.GetDomainName(token.DomainID) + if err != nil { + return + } + // Count the request in the host RPS, + // but we still accept it even if RPS is exceeded + h.allowDomain(ratelimitTypeWorker, domainName) + return h.wrapped.RespondActivityTaskFailed(ctx, rp1) +} + +func (h *apiHandler) RespondActivityTaskFailedByID(ctx context.Context, rp1 *types.RespondActivityTaskFailedByIDRequest) (err error) { + // Count the request in the host RPS, + // but we still accept it even if RPS is exceeded + h.allowDomain(ratelimitTypeWorker, rp1.GetDomain()) + return h.wrapped.RespondActivityTaskFailedByID(ctx, rp1) +} + +func (h *apiHandler) RespondDecisionTaskCompleted(ctx context.Context, rp1 *types.RespondDecisionTaskCompletedRequest) (rp2 *types.RespondDecisionTaskCompletedResponse, err error) { + token, err := h.tokenSerializer.Deserialize(rp1.TaskToken) + if err != nil { + return + } + domainName, err := h.domainCache.GetDomainName(token.DomainID) + if err != nil { + return + } + // Count the request in the host RPS, + // but we still accept it even if RPS is exceeded + h.allowDomain(ratelimitTypeWorker, domainName) + return h.wrapped.RespondDecisionTaskCompleted(ctx, rp1) +} + +func (h *apiHandler) RespondDecisionTaskFailed(ctx context.Context, rp1 *types.RespondDecisionTaskFailedRequest) (err error) { + token, err := h.tokenSerializer.Deserialize(rp1.TaskToken) + if err != nil { + return + } + domainName, err := h.domainCache.GetDomainName(token.DomainID) + if err != nil { + return + } + // Count the request in the host RPS, + // but we still accept it even if RPS is exceeded + h.allowDomain(ratelimitTypeWorker, domainName) + return h.wrapped.RespondDecisionTaskFailed(ctx, rp1) +} + +func (h *apiHandler) RespondQueryTaskCompleted(ctx context.Context, rp1 *types.RespondQueryTaskCompletedRequest) (err error) { + token, err := h.tokenSerializer.DeserializeQueryTaskToken(rp1.TaskToken) + if err != nil { + return + } + domainName, err := h.domainCache.GetDomainName(token.DomainID) + if err != nil { + return + } + // Count the request in the host RPS, + // but we still accept it even if RPS is exceeded + h.allowDomain(ratelimitTypeWorker, domainName) + return h.wrapped.RespondQueryTaskCompleted(ctx, rp1) +} + +func (h *apiHandler) RestartWorkflowExecution(ctx context.Context, rp1 *types.RestartWorkflowExecutionRequest) (rp2 *types.RestartWorkflowExecutionResponse, err error) { + if ok := h.allowDomain(ratelimitTypeUser, rp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.RestartWorkflowExecution(ctx, rp1) +} + +func (h *apiHandler) ScanWorkflowExecutions(ctx context.Context, lp1 *types.ListWorkflowExecutionsRequest) (lp2 *types.ListWorkflowExecutionsResponse, err error) { + if ok := h.allowDomain(ratelimitTypeVisibility, lp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.ScanWorkflowExecutions(ctx, lp1) +} + +func (h *apiHandler) SignalWithStartWorkflowExecution(ctx context.Context, sp1 *types.SignalWithStartWorkflowExecutionRequest) (sp2 *types.StartWorkflowExecutionResponse, err error) { + if ok := h.allowDomain(ratelimitTypeUser, sp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.SignalWithStartWorkflowExecution(ctx, sp1) +} + +func (h *apiHandler) SignalWorkflowExecution(ctx context.Context, sp1 *types.SignalWorkflowExecutionRequest) (err error) { + if ok := h.allowDomain(ratelimitTypeUser, sp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.SignalWorkflowExecution(ctx, sp1) +} + +func (h *apiHandler) StartWorkflowExecution(ctx context.Context, sp1 *types.StartWorkflowExecutionRequest) (sp2 *types.StartWorkflowExecutionResponse, err error) { + if ok := h.allowDomain(ratelimitTypeUser, sp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.StartWorkflowExecution(ctx, sp1) +} + +func (h *apiHandler) StartWorkflowExecutionAsync(ctx context.Context, sp1 *types.StartWorkflowExecutionAsyncRequest) (sp2 *types.StartWorkflowExecutionAsyncResponse, err error) { + return h.wrapped.StartWorkflowExecutionAsync(ctx, sp1) +} + +func (h *apiHandler) TerminateWorkflowExecution(ctx context.Context, tp1 *types.TerminateWorkflowExecutionRequest) (err error) { + if ok := h.allowDomain(ratelimitTypeUser, tp1.GetDomain()); !ok { + err = &types.ServiceBusyError{Message: "Too many outstanding requests to the cadence service"} + return + } + return h.wrapped.TerminateWorkflowExecution(ctx, tp1) +} + +func (h *apiHandler) UpdateDomain(ctx context.Context, up1 *types.UpdateDomainRequest) (up2 *types.UpdateDomainResponse, err error) { + return h.wrapped.UpdateDomain(ctx, up1) +} diff --git a/service/frontend/wrappers/ratelimited/ratelimit.go b/service/frontend/wrappers/ratelimited/ratelimit.go new file mode 100644 index 00000000000..97fd3b406dc --- /dev/null +++ b/service/frontend/wrappers/ratelimited/ratelimit.go @@ -0,0 +1,49 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package ratelimited + +import ( + "github.com/uber/cadence/common/quotas" +) + +// ratelimitType differentiates between the three categories of ratelimiters +type ratelimitType int + +const ( + ratelimitTypeUser ratelimitType = iota + 1 + ratelimitTypeWorker + ratelimitTypeVisibility +) + +func (h *apiHandler) allowDomain(requestType ratelimitType, domain string) bool { + switch requestType { + case ratelimitTypeUser: + return h.userRateLimiter.Allow(quotas.Info{Domain: domain}) + case ratelimitTypeWorker: + return h.workerRateLimiter.Allow(quotas.Info{Domain: domain}) + case ratelimitTypeVisibility: + return h.visibilityRateLimiter.Allow(quotas.Info{Domain: domain}) + default: + panic("coding error, unrecognized request ratelimit type value") + } +}