Skip to content

Commit

Permalink
Use domain rate limiter in workflow handler (uber#2243)
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyassrivatsan authored Jul 21, 2019
1 parent f0de6d1 commit 18234b7
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
2 changes: 1 addition & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableVisibil
ESIndexMaxResultWindow: dc.GetIntProperty(dynamicconfig.FrontendESIndexMaxResultWindow, 10000),
HistoryMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendHistoryMaxPageSize, common.GetHistoryMaxPageSize),
RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 1200),
DomainRPS: dc.GetIntProperty(dynamicconfig.FrontendDomainRPS, 400),
DomainRPS: dc.GetIntProperty(dynamicconfig.FrontendDomainRPS, 1200),
MaxIDLengthLimit: dc.GetIntProperty(dynamicconfig.MaxIDLengthLimit, 1000),
HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.FrontendHistoryMgrNumConns, 10),
MaxDecisionStartToCloseTimeout: dc.GetIntPropertyFilteredByDomain(dynamicconfig.MaxDecisionStartToCloseTimeout, 600),
Expand Down
19 changes: 14 additions & 5 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type (
tokenSerializer common.TaskTokenSerializer
metricsClient metrics.Client
startWG sync.WaitGroup
rateLimiter quotas.Limiter
rateLimiter quotas.Policy
config *Config
versionChecker *versionChecker
domainHandler *domainHandlerImpl
Expand Down Expand Up @@ -159,9 +159,14 @@ func NewWorkflowHandler(sVice service.Service, config *Config, metadataMgr persi
tokenSerializer: common.NewJSONTaskTokenSerializer(),
metricsClient: sVice.GetMetricsClient(),
domainCache: domainCache,
rateLimiter: quotas.NewDynamicRateLimiter(func() float64 {
return float64(config.RPS())
}),
rateLimiter: quotas.NewMultiStageRateLimiter(
func() float64 {
return float64(config.RPS())
},
func() float64 {
return float64(config.DomainRPS())
},
),
versionChecker: &versionChecker{checkVersion: config.EnableClientVersionCheck()},
domainHandler: newDomainHandler(
config,
Expand Down Expand Up @@ -3321,5 +3326,9 @@ func (wh *WorkflowHandler) isListRequestPageSizeTooLarge(pageSize int32, domain
}

func (wh *WorkflowHandler) allow(d domainGetter) bool {
return wh.rateLimiter.Allow()
domain := ""
if d != nil {
domain = d.GetDomain()
}
return wh.rateLimiter.Allow(quotas.Info{Domain: domain})
}

0 comments on commit 18234b7

Please sign in to comment.