Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a wait time for task dispatchers #6223

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2859,6 +2859,13 @@ const (
// Allowed filters: domainName, taskListName, taskListType
LocalPollWaitTime

// LocalTaskWaitTime is the wait time for a task to wait before considering task forwarding
// KeyName: matching.localTaskWaitTime
// Value type: Duration
// Default value: 10ms
// Allowed filters: domainName, taskListName, taskListType
LocalTaskWaitTime

// LastDurationKey must be the last one in this const group
LastDurationKey
)
Expand Down Expand Up @@ -5148,6 +5155,12 @@ var DurationKeys = map[DurationKey]DynamicDuration{
Description: "LocalPollWaitTime is the time a poller waits before considering request forwarding.",
DefaultValue: time.Millisecond * 10,
},
LocalTaskWaitTime: {
KeyName: "matching.localTaskWaitTime",
Filters: []Filter{DomainName, TaskListName, TaskType},
Description: "LocalTaskWaitTime is the time a task waits for a poller to arrive before considering task forwarding",
DefaultValue: time.Millisecond * 10,
},
}

var MapKeys = map[MapKey]DynamicMap{
Expand Down
3 changes: 3 additions & 0 deletions service/matching/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type (
ForwarderMaxChildrenPerNode dynamicconfig.IntPropertyFnWithTaskListInfoFilters
AsyncTaskDispatchTimeout dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
LocalPollWaitTime dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
LocalTaskWaitTime dynamicconfig.DurationPropertyFnWithTaskListInfoFilters

// Time to hold a poll request before returning an empty response if there are no tasks
LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
Expand Down Expand Up @@ -104,6 +105,7 @@ type (
MaxTaskDeleteBatchSize func() int
AsyncTaskDispatchTimeout func() time.Duration
LocalPollWaitTime func() time.Duration
LocalTaskWaitTime func() time.Duration
// taskWriter configuration
OutstandingTaskAppendsThreshold func() int
MaxTaskBatchSize func() int
Expand Down Expand Up @@ -157,6 +159,7 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config {
AllIsolationGroups: mapIGs(dc.GetListProperty(dynamicconfig.AllIsolationGroups)()),
AsyncTaskDispatchTimeout: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.AsyncTaskDispatchTimeout),
LocalPollWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalPollWaitTime),
LocalTaskWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalTaskWaitTime),
HostName: hostName,
TaskDispatchRPS: 100000.0,
TaskDispatchRPSTTL: time.Minute,
Expand Down
1 change: 1 addition & 0 deletions service/matching/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestNewConfig(t *testing.T) {
"AllIsolationGroups": {dynamicconfig.AllIsolationGroups, []interface{}{"a", "b", "c"}},
"AsyncTaskDispatchTimeout": {dynamicconfig.AsyncTaskDispatchTimeout, time.Duration(25)},
"LocalPollWaitTime": {dynamicconfig.LocalPollWaitTime, time.Duration(10)},
"LocalTaskWaitTime": {dynamicconfig.LocalTaskWaitTime, time.Duration(10)},
"HostName": {nil, hostname},
"TaskDispatchRPS": {nil, 100000.0},
"TaskDispatchRPSTTL": {nil, time.Minute},
Expand Down
49 changes: 38 additions & 11 deletions service/matching/tasklist/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,36 @@
return false, err
}
}

// TODO: Pollers are aggressively forwarded to parent partitions so it's unlikely
// to have a poller available to pick up the task below for sub-partitions.
// Try adding some wait here.
e := event.E{
TaskListName: tm.tasklist.GetName(),
TaskListType: tm.tasklist.GetType(),
TaskListKind: tm.tasklistKind.Ptr(),
}
localWaitTime := tm.config.LocalTaskWaitTime()
if localWaitTime > 0 {
Shaddoll marked this conversation as resolved.
Show resolved Hide resolved
childCtx, cancel := context.WithTimeout(ctx, localWaitTime)
select {
case tm.getTaskC(task) <- task: // poller picked up the task
cancel()
if task.ResponseC != nil {
// if there is a response channel, block until resp is received
// and return error if the response contains error
err := <-task.ResponseC
tm.scope.RecordTimer(metrics.SyncMatchLocalPollLatencyPerTaskList, time.Since(startT))
if err == nil {
e.EventName = "Offer task due to local wait"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a new output section to run_matching_simulation.sh to show count of such events?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do it in a separate PR

e.Payload = map[string]any{
"TaskIsForwarded": task.IsForwarded(),
}
event.Log(e)
}
return true, err
}
return false, nil

Check warning on line 164 in service/matching/tasklist/matcher.go

View check run for this annotation

Codecov / codecov/patch

service/matching/tasklist/matcher.go#L164

Added line #L164 was not covered by tests
case <-childCtx.Done():
cancel()
}
}
select {
case tm.getTaskC(task) <- task: // poller picked up the task
if task.ResponseC != nil {
Expand All @@ -155,12 +181,8 @@
// root partition if possible
select {
case token := <-tm.fwdrAddReqTokenC():
event.Log(event.E{
TaskListName: tm.tasklist.GetName(),
TaskListType: tm.tasklist.GetType(),
TaskListKind: tm.tasklistKind.Ptr(),
EventName: "Attempting to Forward Task",
})
e.EventName = "Attempting to Forward Task"
event.Log(e)
err := tm.fwdr.ForwardTask(ctx, task)
token.release("")
if err == nil {
Expand Down Expand Up @@ -259,18 +281,23 @@
// attempt a match with local poller first. When that
// doesn't succeed, try both local match and remote match
taskC := tm.getTaskC(task)
localWaitTime := tm.config.LocalTaskWaitTime()
childCtx, cancel := context.WithTimeout(ctx, localWaitTime)
select {
case taskC <- task: // poller picked up the task
cancel()
tm.scope.IncCounter(metrics.AsyncMatchLocalPollCounterPerTaskList)
tm.scope.RecordTimer(metrics.AsyncMatchLocalPollLatencyPerTaskList, time.Since(startT))
e.EventName = "Dispatched to Local Poller"
event.Log(e)
return nil
case <-ctx.Done():
cancel()
e.EventName = "Context Done While Dispatching to Local Poller"
event.Log(e)
return fmt.Errorf("context done when trying to forward local task: %w", ctx.Err())
default:
case <-childCtx.Done():
Shaddoll marked this conversation as resolved.
Show resolved Hide resolved
cancel()
}

attempt := 0
Expand Down
3 changes: 3 additions & 0 deletions service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,9 @@ func newTaskListConfig(id *Identifier, cfg *config.Config, domainName string) *c
LocalPollWaitTime: func() time.Duration {
return cfg.LocalPollWaitTime(domainName, taskListName, taskType)
},
LocalTaskWaitTime: func() time.Duration {
return cfg.LocalTaskWaitTime(domainName, taskListName, taskType)
},
ForwarderConfig: config.ForwarderConfig{
ForwarderMaxOutstandingPolls: func() int {
return cfg.ForwarderMaxOutstandingPolls(domainName, taskListName, taskType)
Expand Down
7 changes: 4 additions & 3 deletions service/matching/tasklist/task_list_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func defaultTestConfig() *config.Config {
config.AllIsolationGroups = []string{"datacenterA", "datacenterB"}
config.GetTasksBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(10)
config.AsyncTaskDispatchTimeout = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond)
config.LocalTaskWaitTime = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(time.Millisecond)
return config
}

Expand Down Expand Up @@ -595,7 +596,7 @@ func TestTaskListManagerGetTaskBatch(t *testing.T) {
RunID: "run1",
WorkflowID: "workflow1",
ScheduleID: scheduleID,
ScheduleToStartTimeout: 1,
ScheduleToStartTimeout: 100,
},
}
_, err = tlm.AddTask(context.Background(), addParams)
Expand Down Expand Up @@ -739,7 +740,7 @@ func TestTaskExpiryAndCompletion(t *testing.T) {
cfg.MaxTimeBetweenTaskDeletes = tc.maxTimeBtwnDeletes
// set idle timer check to a really small value to assert that we don't accidentally drop tasks while blocking
// on enqueuing a task to task buffer
cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond)
cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(20 * time.Millisecond)
tlMgr, err := NewManager(
mockDomainCache,
logger,
Expand Down Expand Up @@ -767,7 +768,7 @@ func TestTaskExpiryAndCompletion(t *testing.T) {
RunID: "run1",
WorkflowID: "workflow1",
ScheduleID: scheduleID,
ScheduleToStartTimeout: 5,
ScheduleToStartTimeout: 100,
},
}
if i%2 == 0 {
Expand Down
Loading