Skip to content

Commit

Permalink
add activity dispatch configs (uber#4816)
Browse files Browse the repository at this point in the history
* dispatch activity task before generating the transfer task
  • Loading branch information
mkolodezny authored May 5, 2022
1 parent 2b0b03f commit e77b43d
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 21 deletions.
7 changes: 7 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1620,6 +1620,12 @@ const (
// Default value: false
// Allowed filters: DomainName
EnableActivityLocalDispatchByDomain
// MaxActivityCountDispatchByDomain max # of activity tasks to dispatch to matching before creating transfer tasks. This is an performance optimization to skip activity scheduling efforts.
// KeyName: history.activityDispatchForSyncMatchCountByDomain
// Value type: Int
// Default value: 0
// Allowed filters: DomainName
MaxActivityCountDispatchByDomain
// HistoryErrorInjectionRate is rate for injecting random error in history client
// KeyName: history.errorInjectionRate
// Value type: Float64
Expand Down Expand Up @@ -2529,6 +2535,7 @@ var Keys = map[Key]string{
NotifyFailoverMarkerTimerJitterCoefficient: "history.NotifyFailoverMarkerTimerJitterCoefficient",
EnableDropStuckTaskByDomainID: "history.DropStuckTaskByDomain",
EnableActivityLocalDispatchByDomain: "history.enableActivityLocalDispatchByDomain",
MaxActivityCountDispatchByDomain: "history.maxActivityCountDispatchByDomain",
HistoryErrorInjectionRate: "history.errorInjectionRate",
HistoryEnableTaskInfoLogByDomainID: "history.enableTaskInfoLogByDomainID",
ActivityMaxScheduleToStartTimeoutForRetry: "history.activityMaxScheduleToStartTimeoutForRetry",
Expand Down
4 changes: 4 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1992,6 +1992,8 @@ const (
AckLevelUpdateCounter
AckLevelUpdateFailedCounter
DecisionTypeScheduleActivityCounter
DecisionTypeScheduleActivityDispatchSucceedCounter
DecisionTypeScheduleActivityDispatchCounter
DecisionTypeCompleteWorkflowCounter
DecisionTypeFailWorkflowCounter
DecisionTypeCancelWorkflowCounter
Expand Down Expand Up @@ -2534,6 +2536,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
AckLevelUpdateCounter: {metricName: "ack_level_update", metricType: Counter},
AckLevelUpdateFailedCounter: {metricName: "ack_level_update_failed", metricType: Counter},
DecisionTypeScheduleActivityCounter: {metricName: "schedule_activity_decision", metricType: Counter},
DecisionTypeScheduleActivityDispatchSucceedCounter: {metricName: "schedule_activity_decision_sync_match_succeed", metricType: Counter},
DecisionTypeScheduleActivityDispatchCounter: {metricName: "schedule_activity_decision_try_sync_match", metricType: Counter},
DecisionTypeCompleteWorkflowCounter: {metricName: "complete_workflow_decision", metricType: Counter},
DecisionTypeFailWorkflowCounter: {metricName: "fail_workflow_decision", metricType: Counter},
DecisionTypeCancelWorkflowCounter: {metricName: "cancel_workflow_decision", metricType: Counter},
Expand Down
4 changes: 4 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ type Config struct {

// Allows worker to dispatch activity tasks through local tunnel after decisions are made. This is an performance optimization to skip activity scheduling efforts.
EnableActivityLocalDispatchByDomain dynamicconfig.BoolPropertyFnWithDomainFilter
// Max # of activity tasks to dispatch to matching before creating transfer tasks. This is an performance optimization to skip activity scheduling efforts.
MaxActivityCountDispatchByDomain dynamicconfig.IntPropertyFnWithDomainFilter

ActivityMaxScheduleToStartTimeoutForRetry dynamicconfig.DurationPropertyFnWithDomainFilter

Expand Down Expand Up @@ -577,6 +579,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
EnableGracefulFailover: dc.GetBoolProperty(dynamicconfig.EnableGracefulFailover, true),

EnableActivityLocalDispatchByDomain: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableActivityLocalDispatchByDomain, true),
MaxActivityCountDispatchByDomain: dc.GetIntPropertyFilteredByDomain(dynamicconfig.MaxActivityCountDispatchByDomain, 0),

ActivityMaxScheduleToStartTimeoutForRetry: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.ActivityMaxScheduleToStartTimeoutForRetry, 30*time.Minute),

Expand All @@ -603,6 +606,7 @@ func NewForTestByShardNumber(shardNumber int) *Config {
config.ReplicationTaskProcessorShardQPS = dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorShardQPS, 10000)
config.ReplicationTaskProcessorStartWait = dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorStartWait, time.Nanosecond)
config.EnableActivityLocalDispatchByDomain = dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableActivityLocalDispatchByDomain, true)
config.MaxActivityCountDispatchByDomain = dc.GetIntPropertyFilteredByDomain(dynamicconfig.MaxActivityCountDispatchByDomain, 0)
config.EnableCrossClusterOperations = dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableCrossClusterOperations, true)
config.NormalDecisionScheduleToStartMaxAttempts = dc.GetIntPropertyFilteredByDomain(dynamicconfig.NormalDecisionScheduleToStartMaxAttempts, 3)
return config
Expand Down
15 changes: 13 additions & 2 deletions service/history/decision/task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type (
domainCache cache.DomainCache
metricsClient metrics.Client
config *config.Config

activityCountToDispatch int
}

decisionResult struct {
Expand Down Expand Up @@ -117,6 +119,8 @@ func newDecisionTaskHandler(
domainCache: domainCache,
metricsClient: metricsClient,
config: config,

activityCountToDispatch: config.MaxActivityCountDispatchByDomain(domainEntry.GetInfo().Name),
}
}

Expand Down Expand Up @@ -254,13 +258,20 @@ func (handler *taskHandlerImpl) handleDecisionScheduleActivity(
return nil, err
}

event, ai, activityDispatchInfo, err := handler.mutableState.AddActivityTaskScheduledEvent(handler.decisionTaskCompletedID, attr)
event, ai, activityDispatchInfo, dispatched, started, err := handler.mutableState.AddActivityTaskScheduledEvent(
handler.decisionTaskCompletedID, attr, ctx, handler.activityCountToDispatch > 0)
if dispatched {
handler.activityCountToDispatch--
}
switch err.(type) {
case nil:
if activityDispatchInfo != nil {
if activityDispatchInfo != nil || started {
if _, err1 := handler.mutableState.AddActivityTaskStartedEvent(ai, event.ID, uuid.New(), handler.identity); err1 != nil {
return nil, err1
}
if started {
return nil, nil
}
token := &common.TaskToken{
DomainID: executionInfo.DomainID,
WorkflowID: executionInfo.WorkflowID,
Expand Down
4 changes: 2 additions & 2 deletions service/history/execution/history_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ func (s *historyBuilderSuite) addActivityTaskScheduledEvent(
retryPolicy *types.RetryPolicy,
requestLocalDispatch bool,
) (*types.HistoryEvent, *persistence.ActivityInfo, *types.ActivityLocalDispatchInfo) {
event, ai, activityDispatchInfo, err := s.msBuilder.AddActivityTaskScheduledEvent(decisionCompletedID,
event, ai, activityDispatchInfo, _, _, err := s.msBuilder.AddActivityTaskScheduledEvent(decisionCompletedID,
&types.ScheduleActivityTaskDecisionAttributes{
ActivityID: activityID,
ActivityType: &types.ActivityType{Name: activityType},
Expand All @@ -1054,7 +1054,7 @@ func (s *historyBuilderSuite) addActivityTaskScheduledEvent(
StartToCloseTimeoutSeconds: common.Int32Ptr(1),
RetryPolicy: retryPolicy,
RequestLocalDispatch: requestLocalDispatch,
},
}, nil, false,
)
s.Nil(err)
if domain == "" {
Expand Down
2 changes: 1 addition & 1 deletion service/history/execution/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type (
AddActivityTaskCanceledEvent(int64, int64, int64, []uint8, string) (*types.HistoryEvent, error)
AddActivityTaskCompletedEvent(int64, int64, *types.RespondActivityTaskCompletedRequest) (*types.HistoryEvent, error)
AddActivityTaskFailedEvent(int64, int64, *types.RespondActivityTaskFailedRequest) (*types.HistoryEvent, error)
AddActivityTaskScheduledEvent(int64, *types.ScheduleActivityTaskDecisionAttributes) (*types.HistoryEvent, *persistence.ActivityInfo, *types.ActivityLocalDispatchInfo, error)
AddActivityTaskScheduledEvent(int64, *types.ScheduleActivityTaskDecisionAttributes, context.Context, bool) (*types.HistoryEvent, *persistence.ActivityInfo, *types.ActivityLocalDispatchInfo, bool, bool, error)
AddActivityTaskStartedEvent(*persistence.ActivityInfo, int64, string, string) (*types.HistoryEvent, error)
AddActivityTaskTimedOutEvent(int64, int64, types.TimeoutType, []uint8) (*types.HistoryEvent, error)
AddCancelTimerFailedEvent(int64, *types.CancelTimerDecisionAttributes, string) (*types.HistoryEvent, error)
Expand Down
55 changes: 48 additions & 7 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2096,19 +2096,21 @@ func (e *mutableStateBuilder) ReplicateDecisionTaskFailedEvent() error {
func (e *mutableStateBuilder) AddActivityTaskScheduledEvent(
decisionCompletedEventID int64,
attributes *types.ScheduleActivityTaskDecisionAttributes,
) (*types.HistoryEvent, *persistence.ActivityInfo, *types.ActivityLocalDispatchInfo, error) {
ctx context.Context,
dispatch bool,
) (*types.HistoryEvent, *persistence.ActivityInfo, *types.ActivityLocalDispatchInfo, bool, bool, error) {

opTag := tag.WorkflowActionActivityTaskScheduled
if err := e.checkMutability(opTag); err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, false, err
}

_, ok := e.GetActivityByActivityID(attributes.GetActivityID())
if ok {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction)
return nil, nil, nil, e.createCallerError(opTag)
return nil, nil, nil, false, false, e.createCallerError(opTag)
}

event := e.hBuilder.AddActivityTaskScheduledEvent(decisionCompletedEventID, attributes)
Expand All @@ -2124,19 +2126,58 @@ func (e *mutableStateBuilder) AddActivityTaskScheduledEvent(

ai, err := e.ReplicateActivityTaskScheduledEvent(decisionCompletedEventID, event)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, false, false, err
}
if e.config.EnableActivityLocalDispatchByDomain(e.domainEntry.GetInfo().Name) && attributes.RequestLocalDispatch {
return event, ai, &types.ActivityLocalDispatchInfo{ActivityID: ai.ActivityID}, nil
return event, ai, &types.ActivityLocalDispatchInfo{ActivityID: ai.ActivityID}, false, false, nil
}
started := false
if dispatch {
started = e.tryDispatchActivityTask(event, ai, ctx)
}
if started {
return event, ai, nil, true, true, nil
}
// TODO merge active & passive task generation
if err := e.taskGenerator.GenerateActivityTransferTasks(
event,
); err != nil {
return nil, nil, nil, err
return nil, nil, nil, dispatch, false, err
}

return event, ai, nil, err
return event, ai, nil, dispatch, false, err
}

func (e *mutableStateBuilder) tryDispatchActivityTask(
scheduledEvent *types.HistoryEvent,
ai *persistence.ActivityInfo,
ctx context.Context,
) bool {
e.metricsClient.Scope(metrics.HistoryScheduleDecisionTaskScope).IncCounter(metrics.DecisionTypeScheduleActivityDispatchCounter)
err := e.shard.GetService().GetMatchingClient().AddActivityTask(ctx, &types.AddActivityTaskRequest{
DomainUUID: e.executionInfo.DomainID,
SourceDomainUUID: e.domainEntry.GetInfo().ID,
Execution: &types.WorkflowExecution{
WorkflowID: e.executionInfo.WorkflowID,
RunID: e.executionInfo.RunID,
},
TaskList: &types.TaskList{Name: ai.TaskList},
ScheduleID: scheduledEvent.ID,
ScheduleToStartTimeoutSeconds: common.Int32Ptr(ai.ScheduleToStartTimeout),
ActivityTaskDispatchInfo: &types.ActivityTaskDispatchInfo{
ScheduledEvent: scheduledEvent,
StartedTimestamp: common.Int64Ptr(e.timeSource.Now().UnixNano()),
WorkflowType: e.GetWorkflowType(),
WorkflowDomain: e.GetDomainEntry().GetInfo().Name,
ScheduledTimestampOfThisAttempt: common.Int64Ptr(ai.ScheduledTime.UnixNano()),
},
})

if err == nil {
e.metricsClient.Scope(metrics.HistoryScheduleDecisionTaskScope).IncCounter(metrics.DecisionTypeScheduleActivityDispatchSucceedCounter)
return true
}
return false
}

func (e *mutableStateBuilder) ReplicateActivityTaskScheduledEvent(
Expand Down
12 changes: 7 additions & 5 deletions service/history/execution/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions service/history/testing/events_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func AddActivityTaskScheduledEvent(
) (*types.HistoryEvent,
*persistence.ActivityInfo) {

event, ai, _, _ := builder.AddActivityTaskScheduledEvent(decisionCompletedID, &types.ScheduleActivityTaskDecisionAttributes{
event, ai, _, _, _, _ := builder.AddActivityTaskScheduledEvent(decisionCompletedID, &types.ScheduleActivityTaskDecisionAttributes{
ActivityID: activityID,
ActivityType: &types.ActivityType{Name: activityType},
TaskList: &types.TaskList{Name: taskList},
Expand All @@ -155,7 +155,7 @@ func AddActivityTaskScheduledEvent(
ScheduleToStartTimeoutSeconds: common.Int32Ptr(scheduleToStartTimeout),
StartToCloseTimeoutSeconds: common.Int32Ptr(startToCloseTimeout),
HeartbeatTimeoutSeconds: common.Int32Ptr(heartbeatTimeout),
},
}, nil, false,
)

return event, ai
Expand All @@ -176,7 +176,7 @@ func AddActivityTaskScheduledEventWithRetry(
retryPolicy *types.RetryPolicy,
) (*types.HistoryEvent, *persistence.ActivityInfo) {

event, ai, _, _ := builder.AddActivityTaskScheduledEvent(decisionCompletedID, &types.ScheduleActivityTaskDecisionAttributes{
event, ai, _, _, _, _ := builder.AddActivityTaskScheduledEvent(decisionCompletedID, &types.ScheduleActivityTaskDecisionAttributes{
ActivityID: activityID,
ActivityType: &types.ActivityType{Name: activityType},
TaskList: &types.TaskList{Name: taskList},
Expand All @@ -186,7 +186,7 @@ func AddActivityTaskScheduledEventWithRetry(
StartToCloseTimeoutSeconds: common.Int32Ptr(startToCloseTimeout),
HeartbeatTimeoutSeconds: common.Int32Ptr(heartbeatTimeout),
RetryPolicy: retryPolicy,
},
}, nil, false,
)

return event, ai
Expand Down

0 comments on commit e77b43d

Please sign in to comment.