Skip to content

Commit

Permalink
Add a wait time for task dispatchers (#6223)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Aug 13, 2024
1 parent e7bd499 commit 32e3878
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 14 deletions.
13 changes: 13 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2859,6 +2859,13 @@ const (
// Allowed filters: domainName, taskListName, taskListType
LocalPollWaitTime

// LocalTaskWaitTime is the wait time for a task to wait before considering task forwarding
// KeyName: matching.localTaskWaitTime
// Value type: Duration
// Default value: 10ms
// Allowed filters: domainName, taskListName, taskListType
LocalTaskWaitTime

// LastDurationKey must be the last one in this const group
LastDurationKey
)
Expand Down Expand Up @@ -5148,6 +5155,12 @@ var DurationKeys = map[DurationKey]DynamicDuration{
Description: "LocalPollWaitTime is the time a poller waits before considering request forwarding.",
DefaultValue: time.Millisecond * 10,
},
LocalTaskWaitTime: {
KeyName: "matching.localTaskWaitTime",
Filters: []Filter{DomainName, TaskListName, TaskType},
Description: "LocalTaskWaitTime is the time a task waits for a poller to arrive before considering task forwarding",
DefaultValue: time.Millisecond * 10,
},
}

var MapKeys = map[MapKey]DynamicMap{
Expand Down
3 changes: 3 additions & 0 deletions service/matching/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type (
ForwarderMaxChildrenPerNode dynamicconfig.IntPropertyFnWithTaskListInfoFilters
AsyncTaskDispatchTimeout dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
LocalPollWaitTime dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
LocalTaskWaitTime dynamicconfig.DurationPropertyFnWithTaskListInfoFilters

// Time to hold a poll request before returning an empty response if there are no tasks
LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
Expand Down Expand Up @@ -104,6 +105,7 @@ type (
MaxTaskDeleteBatchSize func() int
AsyncTaskDispatchTimeout func() time.Duration
LocalPollWaitTime func() time.Duration
LocalTaskWaitTime func() time.Duration
// taskWriter configuration
OutstandingTaskAppendsThreshold func() int
MaxTaskBatchSize func() int
Expand Down Expand Up @@ -157,6 +159,7 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config {
AllIsolationGroups: mapIGs(dc.GetListProperty(dynamicconfig.AllIsolationGroups)()),
AsyncTaskDispatchTimeout: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.AsyncTaskDispatchTimeout),
LocalPollWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalPollWaitTime),
LocalTaskWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalTaskWaitTime),
HostName: hostName,
TaskDispatchRPS: 100000.0,
TaskDispatchRPSTTL: time.Minute,
Expand Down
1 change: 1 addition & 0 deletions service/matching/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestNewConfig(t *testing.T) {
"AllIsolationGroups": {dynamicconfig.AllIsolationGroups, []interface{}{"a", "b", "c"}},
"AsyncTaskDispatchTimeout": {dynamicconfig.AsyncTaskDispatchTimeout, time.Duration(25)},
"LocalPollWaitTime": {dynamicconfig.LocalPollWaitTime, time.Duration(10)},
"LocalTaskWaitTime": {dynamicconfig.LocalTaskWaitTime, time.Duration(10)},
"HostName": {nil, hostname},
"TaskDispatchRPS": {nil, 100000.0},
"TaskDispatchRPSTTL": {nil, time.Minute},
Expand Down
49 changes: 38 additions & 11 deletions service/matching/tasklist/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,36 @@ func (tm *TaskMatcher) Offer(ctx context.Context, task *InternalTask) (bool, err
return false, err
}
}

// TODO: Pollers are aggressively forwarded to parent partitions so it's unlikely
// to have a poller available to pick up the task below for sub-partitions.
// Try adding some wait here.
e := event.E{
TaskListName: tm.tasklist.GetName(),
TaskListType: tm.tasklist.GetType(),
TaskListKind: tm.tasklistKind.Ptr(),
}
localWaitTime := tm.config.LocalTaskWaitTime()
if localWaitTime > 0 {
childCtx, cancel := context.WithTimeout(ctx, localWaitTime)
select {
case tm.getTaskC(task) <- task: // poller picked up the task
cancel()
if task.ResponseC != nil {
// if there is a response channel, block until resp is received
// and return error if the response contains error
err := <-task.ResponseC
tm.scope.RecordTimer(metrics.SyncMatchLocalPollLatencyPerTaskList, time.Since(startT))
if err == nil {
e.EventName = "Offer task due to local wait"
e.Payload = map[string]any{
"TaskIsForwarded": task.IsForwarded(),
}
event.Log(e)
}
return true, err
}
return false, nil
case <-childCtx.Done():
cancel()
}
}
select {
case tm.getTaskC(task) <- task: // poller picked up the task
if task.ResponseC != nil {
Expand All @@ -155,12 +181,8 @@ func (tm *TaskMatcher) Offer(ctx context.Context, task *InternalTask) (bool, err
// root partition if possible
select {
case token := <-tm.fwdrAddReqTokenC():
event.Log(event.E{
TaskListName: tm.tasklist.GetName(),
TaskListType: tm.tasklist.GetType(),
TaskListKind: tm.tasklistKind.Ptr(),
EventName: "Attempting to Forward Task",
})
e.EventName = "Attempting to Forward Task"
event.Log(e)
err := tm.fwdr.ForwardTask(ctx, task)
token.release("")
if err == nil {
Expand Down Expand Up @@ -259,18 +281,23 @@ func (tm *TaskMatcher) MustOffer(ctx context.Context, task *InternalTask) error
// attempt a match with local poller first. When that
// doesn't succeed, try both local match and remote match
taskC := tm.getTaskC(task)
localWaitTime := tm.config.LocalTaskWaitTime()
childCtx, cancel := context.WithTimeout(ctx, localWaitTime)
select {
case taskC <- task: // poller picked up the task
cancel()
tm.scope.IncCounter(metrics.AsyncMatchLocalPollCounterPerTaskList)
tm.scope.RecordTimer(metrics.AsyncMatchLocalPollLatencyPerTaskList, time.Since(startT))
e.EventName = "Dispatched to Local Poller"
event.Log(e)
return nil
case <-ctx.Done():
cancel()
e.EventName = "Context Done While Dispatching to Local Poller"
event.Log(e)
return fmt.Errorf("context done when trying to forward local task: %w", ctx.Err())
default:
case <-childCtx.Done():
cancel()
}

attempt := 0
Expand Down
3 changes: 3 additions & 0 deletions service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,9 @@ func newTaskListConfig(id *Identifier, cfg *config.Config, domainName string) *c
LocalPollWaitTime: func() time.Duration {
return cfg.LocalPollWaitTime(domainName, taskListName, taskType)
},
LocalTaskWaitTime: func() time.Duration {
return cfg.LocalTaskWaitTime(domainName, taskListName, taskType)
},
ForwarderConfig: config.ForwarderConfig{
ForwarderMaxOutstandingPolls: func() int {
return cfg.ForwarderMaxOutstandingPolls(domainName, taskListName, taskType)
Expand Down
7 changes: 4 additions & 3 deletions service/matching/tasklist/task_list_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func defaultTestConfig() *config.Config {
config.AllIsolationGroups = []string{"datacenterA", "datacenterB"}
config.GetTasksBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(10)
config.AsyncTaskDispatchTimeout = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond)
config.LocalTaskWaitTime = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(time.Millisecond)
return config
}

Expand Down Expand Up @@ -595,7 +596,7 @@ func TestTaskListManagerGetTaskBatch(t *testing.T) {
RunID: "run1",
WorkflowID: "workflow1",
ScheduleID: scheduleID,
ScheduleToStartTimeout: 1,
ScheduleToStartTimeout: 100,
},
}
_, err = tlm.AddTask(context.Background(), addParams)
Expand Down Expand Up @@ -739,7 +740,7 @@ func TestTaskExpiryAndCompletion(t *testing.T) {
cfg.MaxTimeBetweenTaskDeletes = tc.maxTimeBtwnDeletes
// set idle timer check to a really small value to assert that we don't accidentally drop tasks while blocking
// on enqueuing a task to task buffer
cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond)
cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(20 * time.Millisecond)
tlMgr, err := NewManager(
mockDomainCache,
logger,
Expand Down Expand Up @@ -767,7 +768,7 @@ func TestTaskExpiryAndCompletion(t *testing.T) {
RunID: "run1",
WorkflowID: "workflow1",
ScheduleID: scheduleID,
ScheduleToStartTimeout: 5,
ScheduleToStartTimeout: 100,
},
}
if i%2 == 0 {
Expand Down

0 comments on commit 32e3878

Please sign in to comment.