Skip to content

Commit

Permalink
Increase number of forward tokens for isolation tasklists
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Jun 2, 2023
1 parent d17aff4 commit a547562
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 12 deletions.
17 changes: 11 additions & 6 deletions service/matching/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type (
// todo: implement a rate limiter that automatically
// adjusts rate based on ServiceBusy errors from API calls
limiter *quotas.DynamicRateLimiter

isolationGroups []string
}
// ForwarderReqToken is the token that must be acquired before
// making forwarder API calls. This type contains the state
Expand Down Expand Up @@ -91,19 +93,21 @@ func newForwarder(
taskListID *taskListID,
kind types.TaskListKind,
client matching.Client,
isolationGroups []string,
) *Forwarder {
rpsFunc := func() float64 { return float64(cfg.ForwarderMaxRatePerSecond()) }
fwdr := &Forwarder{
cfg: cfg,
client: client,
taskListID: taskListID,
taskListKind: kind,
outstandingTasksLimit: int32(cfg.ForwarderMaxOutstandingTasks()),
outstandingPollsLimit: int32(cfg.ForwarderMaxOutstandingPolls()),
outstandingTasksLimit: int32(cfg.ForwarderMaxOutstandingTasks() * (len(isolationGroups) + 1)),
outstandingPollsLimit: int32(cfg.ForwarderMaxOutstandingPolls() * (len(isolationGroups) + 1)),
limiter: quotas.NewDynamicRateLimiter(rpsFunc),
isolationGroups: isolationGroups,
}
fwdr.addReqToken.Store(newForwarderReqToken(cfg.ForwarderMaxOutstandingTasks()))
fwdr.pollReqToken.Store(newForwarderReqToken(cfg.ForwarderMaxOutstandingPolls()))
fwdr.addReqToken.Store(newForwarderReqToken(int(fwdr.outstandingTasksLimit)))
fwdr.pollReqToken.Store(newForwarderReqToken(int(fwdr.outstandingPollsLimit)))
return fwdr
}

Expand Down Expand Up @@ -250,15 +254,16 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context) (*InternalTask, error) {
// that's necessary before making a ForwardTask or ForwardQueryTask API call.
// After the API call is invoked, token.release() must be invoked
func (fwdr *Forwarder) AddReqTokenC() <-chan *ForwarderReqToken {
fwdr.refreshTokenC(&fwdr.addReqToken, &fwdr.outstandingTasksLimit, int32(fwdr.cfg.ForwarderMaxOutstandingTasks()))
fwdr.refreshTokenC(&fwdr.addReqToken, &fwdr.outstandingTasksLimit, int32(fwdr.cfg.ForwarderMaxOutstandingTasks()*(len(fwdr.isolationGroups)+1)))
return fwdr.addReqToken.Load().(*ForwarderReqToken).ch
}

// PollReqTokenC returns a channel that can be used to wait for a token
// that's necessary before making a ForwardPoll API call. After the API
// call is invoked, token.release() must be invoked
// TODO: consider having separate token pools for different isolation groups
func (fwdr *Forwarder) PollReqTokenC() <-chan *ForwarderReqToken {
fwdr.refreshTokenC(&fwdr.pollReqToken, &fwdr.outstandingPollsLimit, int32(fwdr.cfg.ForwarderMaxOutstandingPolls()))
fwdr.refreshTokenC(&fwdr.pollReqToken, &fwdr.outstandingPollsLimit, int32(fwdr.cfg.ForwarderMaxOutstandingPolls()*(len(fwdr.isolationGroups)+1)))
return fwdr.pollReqToken.Load().(*ForwarderReqToken).ch
}

Expand Down
2 changes: 1 addition & 1 deletion service/matching/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (t *ForwarderTestSuite) SetupTest() {
ForwarderMaxOutstandingTasks: func() int { return 1 },
}
t.taskList = newTestTaskListID("fwdr", "tl0", persistence.TaskListTypeDecision)
t.fwdr = newForwarder(t.cfg, t.taskList, types.TaskListKindNormal, t.client)
t.fwdr = newForwarder(t.cfg, t.taskList, types.TaskListKindNormal, t.client, nil)
}

func (t *ForwarderTestSuite) TearDownTest() {
Expand Down
2 changes: 1 addition & 1 deletion service/matching/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (t *MatcherTestSuite) SetupTest() {
ForwarderMaxChildrenPerNode: func() int { return 20 },
}
t.cfg = tlCfg
t.fwdr = newForwarder(&t.cfg.forwarderConfig, t.taskList, types.TaskListKindNormal, t.client)
t.fwdr = newForwarder(&t.cfg.forwarderConfig, t.taskList, types.TaskListKindNormal, t.client, nil)
t.matcher = newTaskMatcher(tlCfg, t.fwdr, metrics.NoopScope(metrics.Matching), []string{"dca1", "dca2"})

rootTaskList := newTestTaskListID(t.taskList.domainID, t.taskList.Parent(20), persistence.TaskListTypeDecision)
Expand Down
8 changes: 4 additions & 4 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,14 @@ func newTaskListManager(
tlMgr.liveness = newLiveness(clock.NewRealTimeSource(), taskListConfig.IdleTasklistCheckInterval(), tlMgr.Stop)
tlMgr.taskWriter = newTaskWriter(tlMgr)
tlMgr.taskReader = newTaskReader(tlMgr)
var fwdr *Forwarder
if tlMgr.isFowardingAllowed(taskList, *taskListKind) {
fwdr = newForwarder(&taskListConfig.forwarderConfig, taskList, *taskListKind, e.matchingClient)
}
var isolationGroups []string
if tlMgr.isIsolationMatcherEnabled() {
isolationGroups = config.AllIsolationGroups
}
var fwdr *Forwarder
if tlMgr.isFowardingAllowed(taskList, *taskListKind) {
fwdr = newForwarder(&taskListConfig.forwarderConfig, taskList, *taskListKind, e.matchingClient, isolationGroups)
}
tlMgr.matcher = newTaskMatcher(taskListConfig, fwdr, tlMgr.scope, isolationGroups)
tlMgr.startWG.Add(1)
return tlMgr, nil
Expand Down

0 comments on commit a547562

Please sign in to comment.