Skip to content

Commit

Permalink
Improve poller detection for isolation (uber#5399)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Sep 15, 2023
1 parent c338eff commit a996d0d
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 16 deletions.
10 changes: 2 additions & 8 deletions service/matching/pollerHistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package matching

import (
"sort"
"time"

"github.com/uber/cadence/common"
Expand Down Expand Up @@ -104,7 +103,7 @@ func (pollers *pollerHistory) getPollerInfo(earliestAccessTime time.Time) []*typ
return result
}

func (pollers *pollerHistory) getPollerIsolationGroups(earliestAccessTime time.Time) []string {
func (pollers *pollerHistory) getPollerIsolationGroups(earliestAccessTime time.Time) map[string]struct{} {
groupSet := make(map[string]struct{})
ite := pollers.history.Iterator()
defer ite.Close()
Expand All @@ -118,10 +117,5 @@ func (pollers *pollerHistory) getPollerIsolationGroups(earliestAccessTime time.T
}
}
}
result := make([]string, 0, len(groupSet))
for k := range groupSet {
result = append(result, k)
}
sort.Strings(result)
return result
return groupSet
}
37 changes: 29 additions & 8 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -92,6 +93,11 @@ type (
TaskListID() *taskListID
}

outstandingPollerInfo struct {
isolationGroup string
cancel context.CancelFunc
}

// Single task list in memory state
taskListManagerImpl struct {
createTime time.Time
Expand Down Expand Up @@ -120,7 +126,7 @@ type (
// outstanding poller when the frontend detects client connection is closed to
// prevent tasks being dispatched to zombie pollers.
outstandingPollsLock sync.Mutex
outstandingPollsMap map[string]context.CancelFunc
outstandingPollsMap map[string]outstandingPollerInfo
startWG sync.WaitGroup // ensures that background processes do not start until setup is ready
stopped int32
closeCallback func(taskListManager)
Expand Down Expand Up @@ -173,7 +179,7 @@ func newTaskListManager(
taskAckManager: messaging.NewAckManager(e.logger),
taskGC: newTaskGC(db, taskListConfig),
config: taskListConfig,
outstandingPollsMap: make(map[string]context.CancelFunc),
outstandingPollsMap: make(map[string]outstandingPollerInfo),
domainName: domainName,
scope: scope,
closeCallback: e.removeTaskListManager,
Expand Down Expand Up @@ -371,7 +377,7 @@ func (c *taskListManagerImpl) getTask(ctx context.Context, maxDispatchPerSecond
// Found pollerID on context, add it to the map to allow it to be canceled in
// response to CancelPoller call
c.outstandingPollsLock.Lock()
c.outstandingPollsMap[pollerID] = cancel
c.outstandingPollsMap[pollerID] = outstandingPollerInfo{isolationGroup: isolationGroup, cancel: cancel}
c.outstandingPollsLock.Unlock()
defer func() {
c.outstandingPollsLock.Lock()
Expand Down Expand Up @@ -431,11 +437,11 @@ func (c *taskListManagerImpl) HasPollerAfter(accessTime time.Time) bool {

func (c *taskListManagerImpl) CancelPoller(pollerID string) {
c.outstandingPollsLock.Lock()
cancel, ok := c.outstandingPollsMap[pollerID]
info, ok := c.outstandingPollsMap[pollerID]
c.outstandingPollsLock.Unlock()

if ok && cancel != nil {
cancel()
if ok && info.cancel != nil {
info.cancel()
c.logger.Info("canceled outstanding poller", tag.WorkflowDomainName(c.domainName))
}
}
Expand Down Expand Up @@ -583,7 +589,7 @@ func (c *taskListManagerImpl) getIsolationGroupForTask(ctx context.Context, task
// pollers are available in all isolation groups to avoid the risk of leaking a task to another isolation group.
// Besides, for sticky and scalable tasklists, not all poller information are available, we also use all isolation group.
if time.Now().Sub(c.createTime) > time.Minute && c.taskListKind != types.TaskListKindSticky && c.taskListID.IsRoot() {
pollerIsolationGroups = c.pollerHistory.getPollerIsolationGroups(time.Time{}) // the lookback window must be larger than the timeout of poller requests (2 mins), otherwise we don't get all pollers
pollerIsolationGroups = c.getPollerIsolationGroups()
if len(pollerIsolationGroups) == 0 {
// we don't have any pollers, use all isolation groups and wait for pollers' arriving
pollerIsolationGroups = c.config.AllIsolationGroups
Expand All @@ -605,7 +611,7 @@ func (c *taskListManagerImpl) getIsolationGroupForTask(ctx context.Context, task
// to let the task to be re-enqueued to the non-sticky tasklist. If there is poller, just return an empty isolation group, because
// there is at most one isolation group for sticky tasklist and we could just use empty isolation group for matching.
if c.taskListKind == types.TaskListKindSticky {
pollerIsolationGroups = c.pollerHistory.getPollerIsolationGroups(time.Time{})
pollerIsolationGroups = c.getPollerIsolationGroups()
for _, pollerGroup := range pollerIsolationGroups {
if group == pollerGroup {
return "", nil
Expand All @@ -618,6 +624,21 @@ func (c *taskListManagerImpl) getIsolationGroupForTask(ctx context.Context, task
return defaultTaskBufferIsolationGroup, nil
}

func (c *taskListManagerImpl) getPollerIsolationGroups() []string {
groupSet := c.pollerHistory.getPollerIsolationGroups(time.Now().Add(-10 * time.Second))
c.outstandingPollsLock.Lock()
for _, poller := range c.outstandingPollsMap {
groupSet[poller.isolationGroup] = struct{}{}
}
c.outstandingPollsLock.Unlock()
result := make([]string, 0, len(groupSet))
for k := range groupSet {
result = append(result, k)
}
sort.Strings(result)
return result
}

func getTaskListTypeTag(taskListType int) metrics.Tag {
switch taskListType {
case persistence.TaskListTypeActivity:
Expand Down
45 changes: 45 additions & 0 deletions service/matching/taskListManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,48 @@ func TestAddTaskStandby(t *testing.T) {
require.Error(t, err) // should not persist the task
require.False(t, syncMatch)
}

func TestGetPollerIsolationGroup(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()

config := defaultTestConfig()
config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(30 * time.Second)
tlm := createTestTaskListManagerWithConfig(controller, config)

bgCtx := context.WithValue(context.Background(), pollerIDKey, "poller0")
bgCtx = context.WithValue(bgCtx, identityKey, "id0")
bgCtx = context.WithValue(bgCtx, _isolationGroupKey, config.AllIsolationGroups[0])
ctx, cancel := context.WithTimeout(bgCtx, time.Second)
_, err := tlm.GetTask(ctx, nil)
cancel()
assert.Error(t, err)
assert.Contains(t, err.Error(), ErrNoTasks.Error())

// we should get isolation groups that showed up within last 10 seconds
groups := tlm.getPollerIsolationGroups()
assert.Equal(t, 1, len(groups))
assert.Equal(t, config.AllIsolationGroups[0], groups[0])

// after 10s, the poller from that isolation group are cleared from the poller history
time.Sleep(10 * time.Second)
groups = tlm.getPollerIsolationGroups()
assert.Equal(t, 0, len(groups))

// we should get isolation groups of outstanding pollers
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(bgCtx, time.Second*20)
_, err := tlm.GetTask(ctx, nil)
cancel()
assert.Error(t, err)
assert.Contains(t, err.Error(), ErrNoTasks.Error())
}()
time.Sleep(11 * time.Second)
groups = tlm.getPollerIsolationGroups()
wg.Wait()
assert.Equal(t, 1, len(groups))
assert.Equal(t, config.AllIsolationGroups[0], groups[0])
}

0 comments on commit a996d0d

Please sign in to comment.