Skip to content

Commit

Permalink
Dedup activity heartbeat timeout timer task creation (#3032)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Feb 10, 2020
1 parent 873f32c commit 36d7dd3
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 7 deletions.
2 changes: 1 addition & 1 deletion service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5309,7 +5309,7 @@ func copyActivityInfo(sourceInfo *persistence.ActivityInfo) *persistence.Activit
LastWorkerIdentity: sourceInfo.LastWorkerIdentity,
LastFailureDetails: sourceInfo.LastFailureDetails,
//// Not written to database - This is used only for deduping heartbeat timer creation
// LastHeartbeatTimeoutVisibility: sourceInfo.LastHeartbeatTimeoutVisibility,
LastHeartbeatTimeoutVisibility: sourceInfo.LastHeartbeatTimeoutVisibility,
}
}

Expand Down
7 changes: 6 additions & 1 deletion service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,13 @@ func (t *timerQueueActiveProcessorImpl) processActivityTimeout(
scheduleDecision := false

// need to clear activity heartbeat timer task mask for new activity timer task creation
// NOTE: LastHeartbeatTimeoutVisibility is for deduping heartbeat timer creation as it's possible
// one heartbeat task was persisted multiple times with different taskIDs due to the retry logic
// for updating workflow execution. In that case, only one new heartbeat timeout task should be
// created.
isHeartBeatTask := task.TimeoutType == int(workflow.TimeoutTypeHeartbeat)
if activityInfo, ok := mutableState.GetActivityInfo(task.EventID); isHeartBeatTask && ok {
activityInfo, ok := mutableState.GetActivityInfo(task.EventID)
if isHeartBeatTask && ok && activityInfo.LastHeartbeatTimeoutVisibility <= task.VisibilityTimestamp.Unix() {
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ timerTaskStatusCreatedHeartbeat
if err := mutableState.UpdateActivity(activityInfo); err != nil {
return err
Expand Down
93 changes: 88 additions & 5 deletions service/history/timerQueueActiveProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (s *timerQueueActiveProcessorSuite) TestProcessActivityTimeout_NoRetryPolic
TaskType: persistence.TaskTypeActivityTimeout,
TimeoutType: int(workflow.TimeoutTypeScheduleToClose),
VisibilityTimestamp: task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp(),
EventID: di.ScheduleID,
EventID: scheduledEvent.GetEventId(),
}

persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion())
Expand Down Expand Up @@ -432,7 +432,7 @@ func (s *timerQueueActiveProcessorSuite) TestProcessActivityTimeout_NoRetryPolic
TaskType: persistence.TaskTypeActivityTimeout,
TimeoutType: int(workflow.TimeoutTypeScheduleToClose),
VisibilityTimestamp: task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp(),
EventID: di.ScheduleID,
EventID: scheduledEvent.GetEventId(),
}

completeEvent := addActivityTaskCompletedEvent(mutableState, scheduledEvent.GetEventId(), startedEvent.GetEventId(), []byte(nil), identity)
Expand Down Expand Up @@ -517,7 +517,7 @@ func (s *timerQueueActiveProcessorSuite) TestProcessActivityTimeout_RetryPolicy_
TaskType: persistence.TaskTypeActivityTimeout,
TimeoutType: int(workflow.TimeoutTypeScheduleToClose),
VisibilityTimestamp: task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp(),
EventID: di.ScheduleID,
EventID: scheduledEvent.GetEventId(),
}

persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion())
Expand Down Expand Up @@ -605,7 +605,7 @@ func (s *timerQueueActiveProcessorSuite) TestProcessActivityTimeout_RetryPolicy_
TaskType: persistence.TaskTypeActivityTimeout,
TimeoutType: int(workflow.TimeoutTypeScheduleToClose),
VisibilityTimestamp: task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp(),
EventID: di.ScheduleID,
EventID: scheduledEvent.GetEventId(),
}

persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion())
Expand Down Expand Up @@ -692,7 +692,7 @@ func (s *timerQueueActiveProcessorSuite) TestProcessActivityTimeout_RetryPolicy_
TaskType: persistence.TaskTypeActivityTimeout,
TimeoutType: int(workflow.TimeoutTypeScheduleToClose),
VisibilityTimestamp: task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp(),
EventID: di.ScheduleID,
EventID: scheduledEvent.GetEventId(),
}

completeEvent := addActivityTaskCompletedEvent(mutableState, scheduledEvent.GetEventId(), common.TransientEventID, []byte(nil), identity)
Expand All @@ -705,6 +705,89 @@ func (s *timerQueueActiveProcessorSuite) TestProcessActivityTimeout_RetryPolicy_
s.NoError(err)
}

func (s *timerQueueActiveProcessorSuite) TestProcessActivityTimeout_Heartbeat_Noop() {
execution := workflow.WorkflowExecution{
WorkflowId: common.StringPtr("some random workflow ID"),
RunId: common.StringPtr(uuid.New()),
}
workflowType := "some random workflow type"
taskListName := "some random task list"

mutableState := newMutableStateBuilderWithReplicationStateWithEventV2(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetRunId())
_, err := mutableState.AddWorkflowExecutionStartedEvent(
execution,
&history.StartWorkflowExecutionRequest{
DomainUUID: common.StringPtr(s.domainID),
StartRequest: &workflow.StartWorkflowExecutionRequest{
WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(workflowType)},
TaskList: &workflow.TaskList{Name: common.StringPtr(taskListName)},
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(2),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
},
},
)
s.Nil(err)

di := addDecisionTaskScheduledEvent(mutableState)
event := addDecisionTaskStartedEvent(mutableState, di.ScheduleID, taskListName, uuid.New())
di.StartedID = event.GetEventId()
event = addDecisionTaskCompletedEvent(mutableState, di.ScheduleID, di.StartedID, nil, "some random identity")

identity := "identity"
tasklist := "tasklist"
activityID := "activity"
activityType := "activity type"
timerTimeout := 2 * time.Second
heartbeatTimerTimeout := time.Second
scheduledEvent, _ := addActivityTaskScheduledEventWithRetry(
mutableState,
event.GetEventId(),
activityID,
activityType,
tasklist,
[]byte(nil),
int32(timerTimeout.Seconds()),
int32(timerTimeout.Seconds()),
int32(timerTimeout.Seconds()),
int32(heartbeatTimerTimeout.Seconds()),
&workflow.RetryPolicy{
InitialIntervalInSeconds: common.Int32Ptr(1),
BackoffCoefficient: common.Float64Ptr(1.2),
MaximumIntervalInSeconds: common.Int32Ptr(5),
MaximumAttempts: common.Int32Ptr(5),
NonRetriableErrorReasons: []string{"(╯' - ')╯ ┻━┻ "},
ExpirationIntervalInSeconds: common.Int32Ptr(999),
},
)
startedEvent := addActivityTaskStartedEvent(mutableState, scheduledEvent.GetEventId(), identity)
s.Nil(startedEvent)

timerSequence := newTimerSequence(s.timeSource, mutableState)
mutableState.insertTimerTasks = nil
modified, err := timerSequence.createNextActivityTimer()
s.NoError(err)
s.True(modified)
task := mutableState.insertTimerTasks[0]
s.Equal(int(timerTypeHeartbeat), task.(*persistence.ActivityTimeoutTask).TimeoutType)
timerTask := &persistence.TimerTaskInfo{
Version: s.version,
DomainID: s.domainID,
WorkflowID: execution.GetWorkflowId(),
RunID: execution.GetRunId(),
TaskID: int64(100),
TaskType: persistence.TaskTypeActivityTimeout,
TimeoutType: int(workflow.TimeoutTypeHeartbeat),
VisibilityTimestamp: task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp().Add(-time.Second),
EventID: scheduledEvent.GetEventId(),
}

persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion())
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)

_, err = s.timerQueueActiveProcessor.process(newTaskInfo(nil, timerTask, s.logger))
s.NoError(err)
}

func (s *timerQueueActiveProcessorSuite) TestDecisionTimeout_Fire() {

execution := workflow.WorkflowExecution{
Expand Down
3 changes: 3 additions & 0 deletions service/history/timerSequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ func (t *timerSequenceImpl) createNextActivityTimer() (bool, error) {
}
// mark timer task mask as indication that timer task is generated
activityInfo.TimerTaskStatus |= timerTypeToTimerMask(firstTimerTask.timerType)
if firstTimerTask.timerType == timerTypeHeartbeat {
activityInfo.LastHeartbeatTimeoutVisibility = firstTimerTask.timestamp.Unix()
}
if err := t.mutableState.UpdateActivity(activityInfo); err != nil {
return false, err
}
Expand Down
45 changes: 45 additions & 0 deletions service/history/timerSequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,51 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_NotCreated() {
s.True(modified)
}

func (s *timerSequenceSuite) TestCreateNextActivityTimer_HeartbeatTimer() {
now := time.Now()
currentVersion := int64(999)
activityInfo := &persistence.ActivityInfo{
Version: 123,
ScheduleID: 234,
ScheduledTime: now,
StartedID: 345,
StartedTime: now.Add(200 * time.Millisecond),
ActivityID: "some random activity ID",
ScheduleToStartTimeout: 10,
ScheduleToCloseTimeout: 1000,
StartToCloseTimeout: 100,
HeartbeatTimeout: 1,
LastHeartBeatUpdatedTime: time.Time{},
TimerTaskStatus: timerTaskStatusNone,
Attempt: 12,
}
activityInfos := map[int64]*persistence.ActivityInfo{activityInfo.ScheduleID: activityInfo}
s.mockMutableState.EXPECT().GetPendingActivityInfos().Return(activityInfos).Times(1)
s.mockMutableState.EXPECT().GetActivityInfo(activityInfo.ScheduleID).Return(activityInfo, true).Times(1)

taskVisibilityTimestamp := activityInfo.StartedTime.Add(
time.Duration(activityInfo.HeartbeatTimeout) * time.Second,
)

var activityInfoUpdated = *activityInfo // make a copy
activityInfoUpdated.TimerTaskStatus = timerTaskStatusCreatedHeartbeat
activityInfoUpdated.LastHeartbeatTimeoutVisibility = taskVisibilityTimestamp.Unix()
s.mockMutableState.EXPECT().UpdateActivity(&activityInfoUpdated).Return(nil).Times(1)
s.mockMutableState.EXPECT().GetCurrentVersion().Return(currentVersion).Times(1)
s.mockMutableState.EXPECT().AddTimerTasks(&persistence.ActivityTimeoutTask{
// TaskID is set by shard
VisibilityTimestamp: taskVisibilityTimestamp,
TimeoutType: int(shared.TimeoutTypeHeartbeat),
EventID: activityInfo.ScheduleID,
Attempt: int64(activityInfo.Attempt),
Version: currentVersion,
}).Times(1)

modified, err := s.timerSequence.createNextActivityTimer()
s.NoError(err)
s.True(modified)
}

func (s *timerSequenceSuite) TestLoadAndSortUserTimers_None() {
timerInfos := map[string]*persistence.TimerInfo{}
s.mockMutableState.EXPECT().GetPendingTimerInfos().Return(timerInfos).Times(1)
Expand Down

0 comments on commit 36d7dd3

Please sign in to comment.