Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dedup logic for standby activity heartbeat timer creation #3131

Merged
merged 3 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func (s *engine2Suite) TestRecordActivityTaskStartedSuccess() {
msBuilder := s.createExecutionStartedState(workflowExecution, tl, identity, true)
decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, int64(2), int64(3), nil, identity)
scheduledEvent, _ := addActivityTaskScheduledEvent(msBuilder, *decisionCompletedEvent.EventId, activityID,
activityType, tl, activityInput, 100, 10, 5)
activityType, tl, activityInput, 100, 10, 1, 5)

ms1 := createMutableState(msBuilder)
gwmsResponse1 := &p.GetWorkflowExecutionResponse{State: ms1}
Expand Down
90 changes: 49 additions & 41 deletions service/history/historyEngine_test.go

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions service/history/timerQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Fire() {
TaskType: persistence.TaskTypeUserTimer,
TimeoutType: int(workflow.TimeoutTypeStartToClose),
VisibilityTimestamp: task.(*persistence.UserTimerTask).GetVisibilityTimestamp(),
EventID: di.ScheduleID,
EventID: event.GetEventId(),
}

persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
Expand Down Expand Up @@ -272,7 +272,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Noop() {

timerID := "timer"
timerTimeout := 2 * time.Second
_, _ = addTimerStartedEvent(mutableState, event.GetEventId(), timerID, int64(timerTimeout.Seconds()))
event, _ = addTimerStartedEvent(mutableState, event.GetEventId(), timerID, int64(timerTimeout.Seconds()))

timerSequence := newTimerSequence(s.timeSource, mutableState)
mutableState.insertTimerTasks = nil
Expand All @@ -289,7 +289,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Noop() {
TaskType: persistence.TaskTypeUserTimer,
TimeoutType: int(workflow.TimeoutTypeStartToClose),
VisibilityTimestamp: task.(*persistence.UserTimerTask).GetVisibilityTimestamp(),
EventID: di.ScheduleID,
EventID: event.GetEventId(),
}

event = addTimerFiredEvent(mutableState, timerID)
Expand Down Expand Up @@ -345,6 +345,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_NoRetryPo
int32(timerTimeout.Seconds()),
int32(timerTimeout.Seconds()),
int32(timerTimeout.Seconds()),
int32(timerTimeout.Seconds()),
)

timerSequence := newTimerSequence(s.timeSource, mutableState)
Expand Down Expand Up @@ -422,6 +423,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_NoRetryPo
int32(timerTimeout.Seconds()),
int32(timerTimeout.Seconds()),
int32(timerTimeout.Seconds()),
int32(timerTimeout.Seconds()),
)
startedEvent := addActivityTaskStartedEvent(mutableState, scheduledEvent.GetEventId(), identity)

Expand Down
8 changes: 7 additions & 1 deletion service/history/timerQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,14 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask(
if err != nil {
return nil, err
}

// NOTE: LastHeartbeatTimeoutVisibility is for deduping heartbeat timer creation as it's possible
// one heartbeat task was persisted multiple times with different taskIDs due to the retry logic
// for updating workflow execution. In that case, only one new heartbeat timeout task should be
// created.
isHeartBeatTask := timerTask.TimeoutType == int(workflow.TimeoutTypeHeartbeat)
if activityInfo, ok := mutableState.GetActivityInfo(timerTask.EventID); isHeartBeatTask && ok {
activityInfo, ok := mutableState.GetActivityInfo(timerTask.EventID)
if isHeartBeatTask && ok && activityInfo.LastHeartbeatTimeoutVisibility <= timerTask.VisibilityTimestamp.Unix() {
yycptt marked this conversation as resolved.
Show resolved Hide resolved
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ timerTaskStatusCreatedHeartbeat
if err := mutableState.UpdateActivity(activityInfo); err != nil {
return nil, err
Expand Down
107 changes: 86 additions & 21 deletions service/history/timerQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessUserTimerTimeout_Pending
TaskType: persistence.TaskTypeUserTimer,
TimeoutType: int(workflow.TimeoutTypeStartToClose),
VisibilityTimestamp: task.(*persistence.UserTimerTask).GetVisibilityTimestamp(),
EventID: di.ScheduleID,
EventID: event.GetEventId(),
}

persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
Expand Down Expand Up @@ -278,7 +278,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessUserTimerTimeout_Success

timerID := "timer"
timerTimeout := 2 * time.Second
_, _ = addTimerStartedEvent(mutableState, event.GetEventId(), timerID, int64(timerTimeout.Seconds()))
event, _ = addTimerStartedEvent(mutableState, event.GetEventId(), timerID, int64(timerTimeout.Seconds()))

timerSequence := newTimerSequence(s.timeSource, mutableState)
mutableState.insertTimerTasks = nil
Expand All @@ -295,7 +295,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessUserTimerTimeout_Success
TaskType: persistence.TaskTypeUserTimer,
TimeoutType: int(workflow.TimeoutTypeStartToClose),
VisibilityTimestamp: task.(*persistence.UserTimerTask).GetVisibilityTimestamp(),
EventID: di.ScheduleID,
EventID: event.GetEventId(),
}

event = addTimerFiredEvent(mutableState, timerID)
Expand Down Expand Up @@ -339,7 +339,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessUserTimerTimeout_Multipl

timerID1 := "timer-1"
timerTimeout1 := 2 * time.Second
_, _ = addTimerStartedEvent(mutableState, event.GetEventId(), timerID1, int64(timerTimeout1.Seconds()))
event, _ = addTimerStartedEvent(mutableState, event.GetEventId(), timerID1, int64(timerTimeout1.Seconds()))

timerID2 := "timer-2"
timerTimeout2 := 50 * time.Second
Expand All @@ -360,7 +360,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessUserTimerTimeout_Multipl
TaskType: persistence.TaskTypeUserTimer,
TimeoutType: int(workflow.TimeoutTypeStartToClose),
VisibilityTimestamp: task.(*persistence.UserTimerTask).GetVisibilityTimestamp(),
EventID: di.ScheduleID,
EventID: event.GetEventId(),
}

event = addTimerFiredEvent(mutableState, timerID1)
Expand Down Expand Up @@ -407,7 +407,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Pending(
activityType := "activity type"
timerTimeout := 2 * time.Second
scheduledEvent, _ := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, tasklist, []byte(nil),
int32(timerTimeout.Seconds()), int32(timerTimeout.Seconds()), int32(timerTimeout.Seconds()))
int32(timerTimeout.Seconds()), int32(timerTimeout.Seconds()), int32(timerTimeout.Seconds()), int32(timerTimeout.Seconds()))
nextEventID := scheduledEvent.GetEventId() + 1

timerSequence := newTimerSequence(s.timeSource, mutableState)
Expand All @@ -425,7 +425,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Pending(
TaskType: persistence.TaskTypeActivityTimeout,
TimeoutType: int(workflow.TimeoutTypeScheduleToClose),
VisibilityTimestamp: task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp(),
EventID: di.ScheduleID,
EventID: scheduledEvent.GetEventId(),
}

persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion())
Expand Down Expand Up @@ -483,9 +483,9 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Success(
activityID := "activity"
activityType := "activity type"
timerTimeout := 2 * time.Second
scheduleEvent, _ := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, tasklist, []byte(nil),
int32(timerTimeout.Seconds()), int32(timerTimeout.Seconds()), int32(timerTimeout.Seconds()))
startedEvent := addActivityTaskStartedEvent(mutableState, scheduleEvent.GetEventId(), identity)
scheduledEvent, _ := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, tasklist, []byte(nil),
int32(timerTimeout.Seconds()), int32(timerTimeout.Seconds()), int32(timerTimeout.Seconds()), int32(timerTimeout.Seconds()))
startedEvent := addActivityTaskStartedEvent(mutableState, scheduledEvent.GetEventId(), identity)

timerSequence := newTimerSequence(s.timeSource, mutableState)
mutableState.insertTimerTasks = nil
Expand All @@ -502,10 +502,10 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Success(
TaskType: persistence.TaskTypeActivityTimeout,
TimeoutType: int(workflow.TimeoutTypeScheduleToClose),
VisibilityTimestamp: task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp(),
EventID: di.ScheduleID,
EventID: scheduledEvent.GetEventId(),
}

completeEvent := addActivityTaskCompletedEvent(mutableState, scheduleEvent.GetEventId(), startedEvent.GetEventId(), []byte(nil), identity)
completeEvent := addActivityTaskCompletedEvent(mutableState, scheduledEvent.GetEventId(), startedEvent.GetEventId(), []byte(nil), identity)

persistenceMutableState := s.createPersistenceMutableState(mutableState, completeEvent.GetEventId(), completeEvent.GetVersion())
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil).Once()
Expand All @@ -515,6 +515,71 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Success(
s.Nil(err)
}

func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Heartbeat_Noop() {
execution := workflow.WorkflowExecution{
WorkflowId: common.StringPtr("some random workflow ID"),
RunId: common.StringPtr(uuid.New()),
}
workflowType := "some random workflow type"
taskListName := "some random task list"

mutableState := newMutableStateBuilderWithReplicationStateWithEventV2(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetRunId())
_, err := mutableState.AddWorkflowExecutionStartedEvent(
execution,
&history.StartWorkflowExecutionRequest{
DomainUUID: common.StringPtr(s.domainID),
StartRequest: &workflow.StartWorkflowExecutionRequest{
WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(workflowType)},
TaskList: &workflow.TaskList{Name: common.StringPtr(taskListName)},
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(2),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
},
},
)
s.Nil(err)

di := addDecisionTaskScheduledEvent(mutableState)
event := addDecisionTaskStartedEvent(mutableState, di.ScheduleID, taskListName, uuid.New())
di.StartedID = event.GetEventId()
event = addDecisionTaskCompletedEvent(mutableState, di.ScheduleID, di.StartedID, nil, "some random identity")

identity := "identity"
tasklist := "tasklist"
activityID := "activity"
activityType := "activity type"
timerTimeout := 2 * time.Second
heartbeatTimerTimeout := time.Second
scheduledEvent, _ := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, tasklist, []byte(nil),
int32(timerTimeout.Seconds()), int32(timerTimeout.Seconds()), int32(timerTimeout.Seconds()), int32(heartbeatTimerTimeout.Seconds()))
startedEvent := addActivityTaskStartedEvent(mutableState, scheduledEvent.GetEventId(), identity)

timerSequence := newTimerSequence(s.timeSource, mutableState)
mutableState.insertTimerTasks = nil
modified, err := timerSequence.createNextActivityTimer()
s.NoError(err)
s.True(modified)
task := mutableState.insertTimerTasks[0]
s.Equal(int(timerTypeHeartbeat), task.(*persistence.ActivityTimeoutTask).TimeoutType)
timerTask := &persistence.TimerTaskInfo{
Version: s.version,
DomainID: s.domainID,
WorkflowID: execution.GetWorkflowId(),
RunID: execution.GetRunId(),
TaskID: int64(100),
TaskType: persistence.TaskTypeActivityTimeout,
TimeoutType: int(workflow.TimeoutTypeHeartbeat),
VisibilityTimestamp: task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp().Add(-time.Second),
EventID: scheduledEvent.GetEventId(),
}

persistenceMutableState := s.createPersistenceMutableState(mutableState, startedEvent.GetEventId(), startedEvent.GetVersion())
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil).Once()

s.mockShard.SetCurrentTime(s.clusterName, s.now)
err = s.timerQueueStandbyTaskExecutor.execute(timerTask, true)
s.Nil(err)
}

func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Multiple_CanUpdate() {

execution := workflow.WorkflowExecution{
Expand Down Expand Up @@ -549,17 +614,17 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Multiple
activityID1 := "activity 1"
activityType1 := "activity type 1"
timerTimeout1 := 2 * time.Second
scheduleEvent1, _ := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID1, activityType1, tasklist, []byte(nil),
int32(timerTimeout1.Seconds()), int32(timerTimeout1.Seconds()), int32(timerTimeout1.Seconds()))
startedEvent1 := addActivityTaskStartedEvent(mutableState, scheduleEvent1.GetEventId(), identity)
scheduledEvent1, _ := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID1, activityType1, tasklist, []byte(nil),
int32(timerTimeout1.Seconds()), int32(timerTimeout1.Seconds()), int32(timerTimeout1.Seconds()), int32(timerTimeout1.Seconds()))
startedEvent1 := addActivityTaskStartedEvent(mutableState, scheduledEvent1.GetEventId(), identity)

activityID2 := "activity 2"
activityType2 := "activity type 2"
timerTimeout2 := 20 * time.Second
scheduleEvent2, _ := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID2, activityType2, tasklist, []byte(nil),
int32(timerTimeout2.Seconds()), int32(timerTimeout2.Seconds()), int32(timerTimeout2.Seconds()))
addActivityTaskStartedEvent(mutableState, scheduleEvent2.GetEventId(), identity)
activityInfo2 := mutableState.pendingActivityInfoIDs[scheduleEvent2.GetEventId()]
scheduledEvent2, _ := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID2, activityType2, tasklist, []byte(nil),
int32(timerTimeout2.Seconds()), int32(timerTimeout2.Seconds()), int32(timerTimeout2.Seconds()), int32(timerTimeout2.Seconds()))
addActivityTaskStartedEvent(mutableState, scheduledEvent2.GetEventId(), identity)
activityInfo2 := mutableState.pendingActivityInfoIDs[scheduledEvent2.GetEventId()]
activityInfo2.TimerTaskStatus |= timerTaskStatusCreatedHeartbeat
activityInfo2.LastHeartBeatUpdatedTime = time.Now()

Expand All @@ -577,10 +642,10 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Multiple
TaskType: persistence.TaskTypeActivityTimeout,
TimeoutType: int(workflow.TimeoutTypeHeartbeat),
VisibilityTimestamp: activityInfo2.LastHeartBeatUpdatedTime.Add(-5 * time.Second),
EventID: scheduleEvent2.GetEventId(),
EventID: scheduledEvent2.GetEventId(),
}

completeEvent1 := addActivityTaskCompletedEvent(mutableState, scheduleEvent1.GetEventId(), startedEvent1.GetEventId(), []byte(nil), identity)
completeEvent1 := addActivityTaskCompletedEvent(mutableState, scheduledEvent1.GetEventId(), startedEvent1.GetEventId(), []byte(nil), identity)

persistenceMutableState := s.createPersistenceMutableState(mutableState, completeEvent1.GetEventId(), completeEvent1.GetVersion())
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil).Once()
Expand Down
4 changes: 2 additions & 2 deletions service/history/transferQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessActivityTask_Success()
taskID := int64(59)
activityID := "activity-1"
activityType := "some random activity type"
event, ai := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, taskListName, []byte{}, 1, 1, 1)
event, ai := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, taskListName, []byte{}, 1, 1, 1, 1)

transferTask := &persistence.TransferTaskInfo{
Version: s.version,
Expand Down Expand Up @@ -293,7 +293,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessActivityTask_Duplicati
taskID := int64(59)
activityID := "activity-1"
activityType := "some random activity type"
event, ai := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, taskListName, []byte{}, 1, 1, 1)
event, ai := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, taskListName, []byte{}, 1, 1, 1, 1)

transferTask := &persistence.TransferTaskInfo{
Version: s.version,
Expand Down
6 changes: 3 additions & 3 deletions service/history/transferQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessActivityTask_Pending(
taskID := int64(59)
activityID := "activity-1"
activityType := "some random activity type"
event, _ = addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, taskListName, []byte{}, 1, 1, 1)
event, _ = addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, taskListName, []byte{}, 1, 1, 1, 1)

now := time.Now()
transferTask := &persistence.TransferTaskInfo{
Expand Down Expand Up @@ -254,7 +254,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessActivityTask_Pending_
taskID := int64(59)
activityID := "activity-1"
activityType := "some random activity type"
event, _ = addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, taskListName, []byte{}, 1, 1, 1)
event, _ = addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, taskListName, []byte{}, 1, 1, 1, 1)

now := time.Now()
s.mockShard.SetCurrentTime(s.clusterName, now.Add(s.fetchHistoryDuration))
Expand Down Expand Up @@ -311,7 +311,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessActivityTask_Success(
taskID := int64(59)
activityID := "activity-1"
activityType := "some random activity type"
event, _ = addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, taskListName, []byte{}, 1, 1, 1)
event, _ = addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, taskListName, []byte{}, 1, 1, 1, 1)

now := time.Now()
transferTask := &persistence.TransferTaskInfo{
Expand Down