Skip to content

Commit

Permalink
add activity dispatch configs to matching (#4818)
Browse files Browse the repository at this point in the history
* update matching API ti dispatch activity task before generating the transfer task
  • Loading branch information
mkolodezny authored May 6, 2022
1 parent 8b10063 commit 361edb6
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 36 deletions.
7 changes: 7 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,12 @@ const (
// Default value: false
// Allowed filters: DomainID
MatchingEnableTaskInfoLogByDomainID
// MatchingActivityTaskSyncMatchWaitTime is the amount of time activity task will wait to be sync matched
// KeyName: matching.activityTaskSyncMatchWaitTime
// Value type: Duration
// Default value: 100ms
// Allowed filters: DomainName
MatchingActivityTaskSyncMatchWaitTime

// key for history

Expand Down Expand Up @@ -2350,6 +2356,7 @@ var Keys = map[Key]string{
MatchingShutdownDrainDuration: "matching.shutdownDrainDuration",
MatchingErrorInjectionRate: "matching.errorInjectionRate",
MatchingEnableTaskInfoLogByDomainID: "matching.enableTaskInfoLogByDomainID",
MatchingActivityTaskSyncMatchWaitTime: "matching.activityTaskSyncMatchWaitTime",

// history settings
HistoryRPS: "history.rps",
Expand Down
8 changes: 4 additions & 4 deletions common/types/mapper/thrift/matching.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func FromAddActivityTaskRequest(t *types.AddActivityTaskRequest) *matching.AddAc
ScheduleToStartTimeoutSeconds: t.ScheduleToStartTimeoutSeconds,
Source: FromTaskSource(t.Source),
ForwardedFrom: &t.ForwardedFrom,
ActivityTaskDispatchInfo: FromSyncMatchActivityTaskInfo(t.ActivityTaskDispatchInfo),
ActivityTaskDispatchInfo: FromActivityTaskDispatchInfo(t.ActivityTaskDispatchInfo),
}
}

Expand All @@ -59,11 +59,11 @@ func ToAddActivityTaskRequest(t *matching.AddActivityTaskRequest) *types.AddActi
ScheduleToStartTimeoutSeconds: t.ScheduleToStartTimeoutSeconds,
Source: ToTaskSource(t.Source),
ForwardedFrom: t.GetForwardedFrom(),
ActivityTaskDispatchInfo: ToSyncMatchActivityTaskInfo(t.ActivityTaskDispatchInfo),
ActivityTaskDispatchInfo: ToActivityTaskDispatchInfo(t.ActivityTaskDispatchInfo),
}
}

func FromSyncMatchActivityTaskInfo(t *types.ActivityTaskDispatchInfo) *matching.ActivityTaskDispatchInfo {
func FromActivityTaskDispatchInfo(t *types.ActivityTaskDispatchInfo) *matching.ActivityTaskDispatchInfo {
if t == nil {
return nil
}
Expand All @@ -79,7 +79,7 @@ func FromSyncMatchActivityTaskInfo(t *types.ActivityTaskDispatchInfo) *matching.
}

// ToRecordActivityTaskStartedResponse converts thrift RecordActivityTaskStartedResponse type to internal
func ToSyncMatchActivityTaskInfo(t *matching.ActivityTaskDispatchInfo) *types.ActivityTaskDispatchInfo {
func ToActivityTaskDispatchInfo(t *matching.ActivityTaskDispatchInfo) *types.ActivityTaskDispatchInfo {
if t == nil {
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type (
// debugging configuration
EnableDebugMode bool // note that this value is initialized once on service start
EnableTaskInfoLogByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter

ActivityTaskSyncMatchWaitTime dynamicconfig.DurationPropertyFnWithDomainFilter
}

forwarderConfig struct {
Expand Down Expand Up @@ -126,6 +128,7 @@ func NewConfig(dc *dynamicconfig.Collection) *Config {
ShutdownDrainDuration: dc.GetDurationProperty(dynamicconfig.MatchingShutdownDrainDuration, 0),
EnableDebugMode: dc.GetBoolProperty(dynamicconfig.EnableDebugMode, false)(),
EnableTaskInfoLogByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.MatchingEnableTaskInfoLogByDomainID, false),
ActivityTaskSyncMatchWaitTime: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.MatchingActivityTaskSyncMatchWaitTime, 100*time.Millisecond),
}
}

Expand Down
8 changes: 4 additions & 4 deletions service/matching/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (t *ForwarderTestSuite) TearDownTest() {
}

func (t *ForwarderTestSuite) TestForwardTaskError() {
task := newInternalTask(&persistence.TaskInfo{}, nil, types.TaskSourceHistory, "", false)
task := newInternalTask(&persistence.TaskInfo{}, nil, types.TaskSourceHistory, "", false, nil)
t.Equal(errNoParent, t.fwdr.ForwardTask(context.Background(), task))

t.usingTasklistPartition(persistence.TaskListTypeActivity)
Expand All @@ -90,7 +90,7 @@ func (t *ForwarderTestSuite) TestForwardDecisionTask() {
).Return(nil).Times(1)

taskInfo := t.newTaskInfo()
task := newInternalTask(taskInfo, nil, types.TaskSourceHistory, "", false)
task := newInternalTask(taskInfo, nil, types.TaskSourceHistory, "", false, nil)
t.NoError(t.fwdr.ForwardTask(context.Background(), task))
t.NotNil(request)
t.Equal(t.taskList.Parent(20), request.TaskList.GetName())
Expand All @@ -114,7 +114,7 @@ func (t *ForwarderTestSuite) TestForwardActivityTask() {
).Return(nil).Times(1)

taskInfo := t.newTaskInfo()
task := newInternalTask(taskInfo, nil, types.TaskSourceHistory, "", false)
task := newInternalTask(taskInfo, nil, types.TaskSourceHistory, "", false, nil)
t.NoError(t.fwdr.ForwardTask(context.Background(), task))
t.NotNil(request)
t.Equal(t.taskList.Parent(20), request.TaskList.GetName())
Expand All @@ -134,7 +134,7 @@ func (t *ForwarderTestSuite) TestForwardTaskRateExceeded() {
rps := 2
t.client.EXPECT().AddActivityTask(gomock.Any(), gomock.Any()).Return(nil).Times(rps)
taskInfo := t.newTaskInfo()
task := newInternalTask(taskInfo, nil, types.TaskSourceHistory, "", false)
task := newInternalTask(taskInfo, nil, types.TaskSourceHistory, "", false, nil)
for i := 0; i < rps; i++ {
t.NoError(t.fwdr.ForwardTask(context.Background(), task))
}
Expand Down
12 changes: 6 additions & 6 deletions service/matching/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (t *MatcherTestSuite) TestLocalSyncMatch() {
}
})

task := newInternalTask(t.newTaskInfo(), nil, types.TaskSourceHistory, "", true)
task := newInternalTask(t.newTaskInfo(), nil, types.TaskSourceHistory, "", true, nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
syncMatch, err := t.matcher.Offer(ctx, task)
cancel()
Expand Down Expand Up @@ -143,7 +143,7 @@ func (t *MatcherTestSuite) testRemoteSyncMatch(taskSource types.TaskSource) {
},
).AnyTimes()

task := newInternalTask(t.newTaskInfo(), nil, taskSource, "", true)
task := newInternalTask(t.newTaskInfo(), nil, taskSource, "", true, nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)

var err error
Expand Down Expand Up @@ -176,7 +176,7 @@ func (t *MatcherTestSuite) testRemoteSyncMatch(taskSource types.TaskSource) {
}

func (t *MatcherTestSuite) TestSyncMatchFailure() {
task := newInternalTask(t.newTaskInfo(), nil, types.TaskSourceHistory, "", true)
task := newInternalTask(t.newTaskInfo(), nil, types.TaskSourceHistory, "", true, nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)

var req *types.AddDecisionTaskRequest
Expand Down Expand Up @@ -309,7 +309,7 @@ func (t *MatcherTestSuite) TestMustOfferLocalMatch() {
}
})

task := newInternalTask(t.newTaskInfo(), nil, types.TaskSourceHistory, "", false)
task := newInternalTask(t.newTaskInfo(), nil, types.TaskSourceHistory, "", false, nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
err := t.matcher.MustOffer(ctx, task)
cancel()
Expand Down Expand Up @@ -341,7 +341,7 @@ func (t *MatcherTestSuite) TestMustOfferRemoteMatch() {
taskCompleted = true
}

task := newInternalTask(t.newTaskInfo(), completionFunc, types.TaskSourceDbBacklog, "", false)
task := newInternalTask(t.newTaskInfo(), completionFunc, types.TaskSourceDbBacklog, "", false, nil)
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)

var err error
Expand All @@ -351,7 +351,7 @@ func (t *MatcherTestSuite) TestMustOfferRemoteMatch() {
t.client.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any()).Do(
func(arg0 context.Context, arg1 *types.AddDecisionTaskRequest, option ...yarpc.CallOption) {
req = arg1
task := newInternalTask(task.event.TaskInfo, nil, types.TaskSourceDbBacklog, req.GetForwardedFrom(), true)
task := newInternalTask(task.event.TaskInfo, nil, types.TaskSourceDbBacklog, req.GetForwardedFrom(), true, nil)
close(pollSigC)
remoteSyncMatch, err = t.rootMatcher.Offer(ctx, task)
},
Expand Down
52 changes: 48 additions & 4 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,11 @@ func (e *matchingEngineImpl) AddActivityTask(
CreatedTime: time.Now(),
}
return tlMgr.AddTask(hCtx.Context, addTaskParams{
execution: request.Execution,
taskInfo: taskInfo,
source: request.GetSource(),
forwardedFrom: request.GetForwardedFrom(),
execution: request.Execution,
taskInfo: taskInfo,
source: request.GetSource(),
forwardedFrom: request.GetForwardedFrom(),
activityTaskDispatchInfo: request.ActivityTaskDispatchInfo,
})
}

Expand Down Expand Up @@ -505,6 +506,10 @@ pollLoop:
// tasks received from remote are already started. So, simply forward the response
return task.pollForActivityResponse(), nil
}
if task.activityTaskDispatchInfo != nil {
task.finish(nil)
return e.createSyncMatchPollForActivityTaskResponse(task, task.activityTaskDispatchInfo), nil
}

resp, err := e.recordActivityTaskStarted(hCtx.Context, request, task)
if err != nil {
Expand Down Expand Up @@ -532,6 +537,45 @@ pollLoop:
}
}

func (e *matchingEngineImpl) createSyncMatchPollForActivityTaskResponse(
task *InternalTask,
activityTaskDispatchInfo *types.ActivityTaskDispatchInfo,
) *types.PollForActivityTaskResponse {

scheduledEvent := activityTaskDispatchInfo.ScheduledEvent
attributes := scheduledEvent.ActivityTaskScheduledEventAttributes
response := &types.PollForActivityTaskResponse{}
response.ActivityID = attributes.ActivityID
response.ActivityType = attributes.ActivityType
response.Header = attributes.Header
response.Input = attributes.Input
response.WorkflowExecution = task.workflowExecution()
response.ScheduledTimestampOfThisAttempt = activityTaskDispatchInfo.ScheduledTimestampOfThisAttempt
response.ScheduledTimestamp = common.Int64Ptr(*scheduledEvent.Timestamp)
response.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(*attributes.ScheduleToCloseTimeoutSeconds)
response.StartedTimestamp = activityTaskDispatchInfo.StartedTimestamp
response.StartToCloseTimeoutSeconds = common.Int32Ptr(*attributes.StartToCloseTimeoutSeconds)
response.HeartbeatTimeoutSeconds = common.Int32Ptr(*attributes.HeartbeatTimeoutSeconds)

token := &common.TaskToken{
DomainID: task.event.DomainID,
WorkflowID: task.event.WorkflowID,
WorkflowType: activityTaskDispatchInfo.WorkflowType.GetName(),
RunID: task.event.RunID,
ScheduleID: task.event.ScheduleID,
ScheduleAttempt: common.Int64Default(activityTaskDispatchInfo.Attempt),
ActivityID: attributes.GetActivityID(),
ActivityType: attributes.GetActivityType().GetName(),
}

response.TaskToken, _ = e.tokenSerializer.Serialize(token)
response.Attempt = int32(token.ScheduleAttempt)
response.HeartbeatDetails = activityTaskDispatchInfo.HeartbeatDetails
response.WorkflowType = activityTaskDispatchInfo.WorkflowType
response.WorkflowDomain = activityTaskDispatchInfo.WorkflowDomain
return response
}

// QueryWorkflow creates a DecisionTask with query data, send it through sync match channel, wait for that DecisionTask
// to be processed by worker, and then return the query result.
func (e *matchingEngineImpl) QueryWorkflow(
Expand Down
23 changes: 13 additions & 10 deletions service/matching/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ type (
// this struct is more like a union and only one of [ query, event, forwarded ] is
// non-nil for any given task
InternalTask struct {
event *genericTaskInfo // non-nil for activity or decision task that's locally generated
query *queryTaskInfo // non-nil for a query task that's locally sync matched
started *startedTaskInfo // non-nil for a task received from a parent partition which is already started
domainName string
source types.TaskSource
forwardedFrom string // name of the child partition this task is forwarded from (empty if not forwarded)
responseC chan error // non-nil only where there is a caller waiting for response (sync-match)
backlogCountHint int64
event *genericTaskInfo // non-nil for activity or decision task that's locally generated
query *queryTaskInfo // non-nil for a query task that's locally sync matched
started *startedTaskInfo // non-nil for a task received from a parent partition which is already started
domainName string
source types.TaskSource
forwardedFrom string // name of the child partition this task is forwarded from (empty if not forwarded)
responseC chan error // non-nil only where there is a caller waiting for response (sync-match)
backlogCountHint int64
activityTaskDispatchInfo *types.ActivityTaskDispatchInfo
}
)

Expand All @@ -63,14 +64,16 @@ func newInternalTask(
source types.TaskSource,
forwardedFrom string,
forSyncMatch bool,
activityTaskDispatchInfo *types.ActivityTaskDispatchInfo,
) *InternalTask {
task := &InternalTask{
event: &genericTaskInfo{
TaskInfo: info,
completionFunc: completionFunc,
},
source: source,
forwardedFrom: forwardedFrom,
source: source,
forwardedFrom: forwardedFrom,
activityTaskDispatchInfo: activityTaskDispatchInfo,
}
if forSyncMatch {
task.responseC = make(chan error, 1)
Expand Down
28 changes: 21 additions & 7 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ var (

type (
addTaskParams struct {
execution *types.WorkflowExecution
taskInfo *persistence.TaskInfo
source types.TaskSource
forwardedFrom string
execution *types.WorkflowExecution
taskInfo *persistence.TaskInfo
source types.TaskSource
forwardedFrom string
activityTaskDispatchInfo *types.ActivityTaskDispatchInfo
}

taskListManager interface {
Expand Down Expand Up @@ -254,6 +255,9 @@ func (c *taskListManagerImpl) AddTask(ctx context.Context, params addTaskParams)
if syncMatch {
return &persistence.CreateTasksResponse{}, err
}
if params.activityTaskDispatchInfo != nil {
return false, errRemoteSyncMatchFailed
}

if isForwarded {
// forwarded from child partition - only do sync match
Expand Down Expand Up @@ -528,15 +532,25 @@ func (c *taskListManagerImpl) executeWithRetry(
}

func (c *taskListManagerImpl) trySyncMatch(ctx context.Context, params addTaskParams) (bool, error) {
task := newInternalTask(params.taskInfo, c.completeTask, params.source, params.forwardedFrom, true)
task := newInternalTask(params.taskInfo, c.completeTask, params.source, params.forwardedFrom, true, params.activityTaskDispatchInfo)
childCtx := ctx
cancel := func() {}
waitTime := maxSyncMatchWaitTime
if params.activityTaskDispatchInfo != nil {
waitTime = c.engine.config.ActivityTaskSyncMatchWaitTime(params.activityTaskDispatchInfo.WorkflowDomain)
}
if !task.isForwarded() {
// when task is forwarded from another matching host, we trust the context as is
// otherwise, we override to limit the amount of time we can block on sync match
childCtx, cancel = c.newChildContext(ctx, maxSyncMatchWaitTime, time.Second)
childCtx, cancel = c.newChildContext(ctx, waitTime, time.Second)
}
var matched bool
var err error
if params.activityTaskDispatchInfo != nil {
matched, err = c.matcher.offerOrTimeout(childCtx, task)
} else {
matched, err = c.matcher.Offer(childCtx, task)
}
matched, err := c.matcher.Offer(childCtx, task)
cancel()
return matched, err
}
Expand Down
2 changes: 1 addition & 1 deletion service/matching/taskReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ dispatchLoop:
if !ok { // Task list getTasks pump is shutdown
break dispatchLoop
}
task := newInternalTask(taskInfo, tr.tlMgr.completeTask, types.TaskSourceDbBacklog, "", false)
task := newInternalTask(taskInfo, tr.tlMgr.completeTask, types.TaskSourceDbBacklog, "", false, nil)
for {
err := tr.tlMgr.DispatchTask(tr.cancelCtx, task)
if err == nil {
Expand Down

0 comments on commit 361edb6

Please sign in to comment.