From 32e3878e8edb700d754c2431012d4b13686387e9 Mon Sep 17 00:00:00 2001 From: Shaddoll Date: Tue, 13 Aug 2024 13:02:13 -0700 Subject: [PATCH] Add a wait time for task dispatchers (#6223) --- common/dynamicconfig/constants.go | 13 +++++ service/matching/config/config.go | 3 ++ service/matching/config/config_test.go | 1 + service/matching/tasklist/matcher.go | 49 ++++++++++++++----- .../matching/tasklist/task_list_manager.go | 3 ++ .../tasklist/task_list_manager_test.go | 7 +-- 6 files changed, 62 insertions(+), 14 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index ef1e21c98bb..57b9eb63a09 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -2859,6 +2859,13 @@ const ( // Allowed filters: domainName, taskListName, taskListType LocalPollWaitTime + // LocalTaskWaitTime is the wait time for a task to wait before considering task forwarding + // KeyName: matching.localTaskWaitTime + // Value type: Duration + // Default value: 10ms + // Allowed filters: domainName, taskListName, taskListType + LocalTaskWaitTime + // LastDurationKey must be the last one in this const group LastDurationKey ) @@ -5148,6 +5155,12 @@ var DurationKeys = map[DurationKey]DynamicDuration{ Description: "LocalPollWaitTime is the time a poller waits before considering request forwarding.", DefaultValue: time.Millisecond * 10, }, + LocalTaskWaitTime: { + KeyName: "matching.localTaskWaitTime", + Filters: []Filter{DomainName, TaskListName, TaskType}, + Description: "LocalTaskWaitTime is the time a task waits for a poller to arrive before considering task forwarding", + DefaultValue: time.Millisecond * 10, + }, } var MapKeys = map[MapKey]DynamicMap{ diff --git a/service/matching/config/config.go b/service/matching/config/config.go index cc14d869176..1e1e02184ab 100644 --- a/service/matching/config/config.go +++ b/service/matching/config/config.go @@ -52,6 +52,7 @@ type ( ForwarderMaxChildrenPerNode dynamicconfig.IntPropertyFnWithTaskListInfoFilters AsyncTaskDispatchTimeout dynamicconfig.DurationPropertyFnWithTaskListInfoFilters LocalPollWaitTime dynamicconfig.DurationPropertyFnWithTaskListInfoFilters + LocalTaskWaitTime dynamicconfig.DurationPropertyFnWithTaskListInfoFilters // Time to hold a poll request before returning an empty response if there are no tasks LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters @@ -104,6 +105,7 @@ type ( MaxTaskDeleteBatchSize func() int AsyncTaskDispatchTimeout func() time.Duration LocalPollWaitTime func() time.Duration + LocalTaskWaitTime func() time.Duration // taskWriter configuration OutstandingTaskAppendsThreshold func() int MaxTaskBatchSize func() int @@ -157,6 +159,7 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config { AllIsolationGroups: mapIGs(dc.GetListProperty(dynamicconfig.AllIsolationGroups)()), AsyncTaskDispatchTimeout: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.AsyncTaskDispatchTimeout), LocalPollWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalPollWaitTime), + LocalTaskWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalTaskWaitTime), HostName: hostName, TaskDispatchRPS: 100000.0, TaskDispatchRPSTTL: time.Minute, diff --git a/service/matching/config/config_test.go b/service/matching/config/config_test.go index 1c7738518ff..d6073776930 100644 --- a/service/matching/config/config_test.go +++ b/service/matching/config/config_test.go @@ -74,6 +74,7 @@ func TestNewConfig(t *testing.T) { "AllIsolationGroups": {dynamicconfig.AllIsolationGroups, []interface{}{"a", "b", "c"}}, "AsyncTaskDispatchTimeout": {dynamicconfig.AsyncTaskDispatchTimeout, time.Duration(25)}, "LocalPollWaitTime": {dynamicconfig.LocalPollWaitTime, time.Duration(10)}, + "LocalTaskWaitTime": {dynamicconfig.LocalTaskWaitTime, time.Duration(10)}, "HostName": {nil, hostname}, "TaskDispatchRPS": {nil, 100000.0}, "TaskDispatchRPSTTL": {nil, time.Minute}, diff --git a/service/matching/tasklist/matcher.go b/service/matching/tasklist/matcher.go index c8af219ce5b..14b7250834e 100644 --- a/service/matching/tasklist/matcher.go +++ b/service/matching/tasklist/matcher.go @@ -136,10 +136,36 @@ func (tm *TaskMatcher) Offer(ctx context.Context, task *InternalTask) (bool, err return false, err } } - - // TODO: Pollers are aggressively forwarded to parent partitions so it's unlikely - // to have a poller available to pick up the task below for sub-partitions. - // Try adding some wait here. + e := event.E{ + TaskListName: tm.tasklist.GetName(), + TaskListType: tm.tasklist.GetType(), + TaskListKind: tm.tasklistKind.Ptr(), + } + localWaitTime := tm.config.LocalTaskWaitTime() + if localWaitTime > 0 { + childCtx, cancel := context.WithTimeout(ctx, localWaitTime) + select { + case tm.getTaskC(task) <- task: // poller picked up the task + cancel() + if task.ResponseC != nil { + // if there is a response channel, block until resp is received + // and return error if the response contains error + err := <-task.ResponseC + tm.scope.RecordTimer(metrics.SyncMatchLocalPollLatencyPerTaskList, time.Since(startT)) + if err == nil { + e.EventName = "Offer task due to local wait" + e.Payload = map[string]any{ + "TaskIsForwarded": task.IsForwarded(), + } + event.Log(e) + } + return true, err + } + return false, nil + case <-childCtx.Done(): + cancel() + } + } select { case tm.getTaskC(task) <- task: // poller picked up the task if task.ResponseC != nil { @@ -155,12 +181,8 @@ func (tm *TaskMatcher) Offer(ctx context.Context, task *InternalTask) (bool, err // root partition if possible select { case token := <-tm.fwdrAddReqTokenC(): - event.Log(event.E{ - TaskListName: tm.tasklist.GetName(), - TaskListType: tm.tasklist.GetType(), - TaskListKind: tm.tasklistKind.Ptr(), - EventName: "Attempting to Forward Task", - }) + e.EventName = "Attempting to Forward Task" + event.Log(e) err := tm.fwdr.ForwardTask(ctx, task) token.release("") if err == nil { @@ -259,18 +281,23 @@ func (tm *TaskMatcher) MustOffer(ctx context.Context, task *InternalTask) error // attempt a match with local poller first. When that // doesn't succeed, try both local match and remote match taskC := tm.getTaskC(task) + localWaitTime := tm.config.LocalTaskWaitTime() + childCtx, cancel := context.WithTimeout(ctx, localWaitTime) select { case taskC <- task: // poller picked up the task + cancel() tm.scope.IncCounter(metrics.AsyncMatchLocalPollCounterPerTaskList) tm.scope.RecordTimer(metrics.AsyncMatchLocalPollLatencyPerTaskList, time.Since(startT)) e.EventName = "Dispatched to Local Poller" event.Log(e) return nil case <-ctx.Done(): + cancel() e.EventName = "Context Done While Dispatching to Local Poller" event.Log(e) return fmt.Errorf("context done when trying to forward local task: %w", ctx.Err()) - default: + case <-childCtx.Done(): + cancel() } attempt := 0 diff --git a/service/matching/tasklist/task_list_manager.go b/service/matching/tasklist/task_list_manager.go index c663d1504b1..27fa626ebaf 100644 --- a/service/matching/tasklist/task_list_manager.go +++ b/service/matching/tasklist/task_list_manager.go @@ -724,6 +724,9 @@ func newTaskListConfig(id *Identifier, cfg *config.Config, domainName string) *c LocalPollWaitTime: func() time.Duration { return cfg.LocalPollWaitTime(domainName, taskListName, taskType) }, + LocalTaskWaitTime: func() time.Duration { + return cfg.LocalTaskWaitTime(domainName, taskListName, taskType) + }, ForwarderConfig: config.ForwarderConfig{ ForwarderMaxOutstandingPolls: func() int { return cfg.ForwarderMaxOutstandingPolls(domainName, taskListName, taskType) diff --git a/service/matching/tasklist/task_list_manager_test.go b/service/matching/tasklist/task_list_manager_test.go index 187f827f257..f35de5e4ef9 100644 --- a/service/matching/tasklist/task_list_manager_test.go +++ b/service/matching/tasklist/task_list_manager_test.go @@ -57,6 +57,7 @@ func defaultTestConfig() *config.Config { config.AllIsolationGroups = []string{"datacenterA", "datacenterB"} config.GetTasksBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(10) config.AsyncTaskDispatchTimeout = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond) + config.LocalTaskWaitTime = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(time.Millisecond) return config } @@ -595,7 +596,7 @@ func TestTaskListManagerGetTaskBatch(t *testing.T) { RunID: "run1", WorkflowID: "workflow1", ScheduleID: scheduleID, - ScheduleToStartTimeout: 1, + ScheduleToStartTimeout: 100, }, } _, err = tlm.AddTask(context.Background(), addParams) @@ -739,7 +740,7 @@ func TestTaskExpiryAndCompletion(t *testing.T) { cfg.MaxTimeBetweenTaskDeletes = tc.maxTimeBtwnDeletes // set idle timer check to a really small value to assert that we don't accidentally drop tasks while blocking // on enqueuing a task to task buffer - cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond) + cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(20 * time.Millisecond) tlMgr, err := NewManager( mockDomainCache, logger, @@ -767,7 +768,7 @@ func TestTaskExpiryAndCompletion(t *testing.T) { RunID: "run1", WorkflowID: "workflow1", ScheduleID: scheduleID, - ScheduleToStartTimeout: 5, + ScheduleToStartTimeout: 100, }, } if i%2 == 0 {