Skip to content

Commit

Permalink
Record error info for retried activities (#1873) (#3116)
Browse files Browse the repository at this point in the history
* Record error info for retried activities (#1873)

Exposes the lastFailureReason and lastFailureDetails, which exist in the mutable state, through
the ActivityTaskStartedEvent in the history. The fields were added to ActivityTaskStartedEvent
as part of uber/cadence-idl#15

* Add tests to verify error info in retried activities
  • Loading branch information
emrahs authored Mar 20, 2020
1 parent 1e52f96 commit 74c2c69
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 54 deletions.
102 changes: 94 additions & 8 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion idls
Submodule idls updated 2 files
+1 −0 .gitignore
+2 −0 thrift/shared.thrift
25 changes: 20 additions & 5 deletions service/history/historyBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,16 @@ func (b *historyBuilder) AddActivityTaskScheduledEvent(decisionCompletedEventID
return b.addEventToHistory(event)
}

func (b *historyBuilder) AddActivityTaskStartedEvent(scheduleEventID int64, attempt int32, requestID string,
identity string) *workflow.HistoryEvent {
event := b.newActivityTaskStartedEvent(scheduleEventID, attempt, requestID, identity)
func (b *historyBuilder) AddActivityTaskStartedEvent(
scheduleEventID int64,
attempt int32,
requestID string,
identity string,
lastFailureReason string,
lastFailureDetails []byte,
) *workflow.HistoryEvent {
event := b.newActivityTaskStartedEvent(scheduleEventID, attempt, requestID, identity, lastFailureReason,
lastFailureDetails)

return b.addEventToHistory(event)
}
Expand Down Expand Up @@ -591,14 +598,22 @@ func (b *historyBuilder) newActivityTaskScheduledEvent(decisionTaskCompletedEven
return historyEvent
}

func (b *historyBuilder) newActivityTaskStartedEvent(scheduledEventID int64, attempt int32, requestID string,
identity string) *workflow.HistoryEvent {
func (b *historyBuilder) newActivityTaskStartedEvent(
scheduledEventID int64,
attempt int32,
requestID string,
identity string,
lastFailureReason string,
lastFailureDetails []byte,
) *workflow.HistoryEvent {
historyEvent := b.msBuilder.CreateNewHistoryEvent(workflow.EventTypeActivityTaskStarted)
attributes := &workflow.ActivityTaskStartedEventAttributes{}
attributes.ScheduledEventId = common.Int64Ptr(scheduledEventID)
attributes.Attempt = common.Int32Ptr(attempt)
attributes.Identity = common.StringPtr(identity)
attributes.RequestId = common.StringPtr(requestID)
attributes.LastFailureReason = common.StringPtr(lastFailureReason)
attributes.LastFailureDetails = lastFailureDetails
historyEvent.ActivityTaskStartedEventAttributes = attributes

return historyEvent
Expand Down
159 changes: 121 additions & 38 deletions service/history/historyBuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,18 @@ func (s *historyBuilderSuite) SetupTest() {
NewDynamicConfigForTest(),
)

s.mockDomainCache = s.mockShard.resource.DomainCache
s.mockEventsCache = s.mockShard.mockEventsCache
s.mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(s.domainEntry, nil).AnyTimes()
s.mockEventsCache.EXPECT().putEvent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

s.logger = s.mockShard.GetLogger()
s.logger = log.NewNoop()

// Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil
s.domainID = testDomainID
s.domainEntry = cache.NewLocalDomainCacheEntryForTest(&persistence.DomainInfo{ID: s.domainID}, &persistence.DomainConfig{}, "", nil)

s.mockDomainCache = s.mockShard.resource.DomainCache
s.mockEventsCache = s.mockShard.mockEventsCache
s.mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(s.domainEntry, nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(s.domainEntry, nil).AnyTimes()
s.mockEventsCache.EXPECT().putEvent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

s.msBuilder = newMutableStateBuilder(s.mockShard, s.mockEventsCache,
s.logger, testLocalDomainEntry)
s.builder = newHistoryBuilder(s.msBuilder, s.logger)
Expand Down Expand Up @@ -148,7 +149,7 @@ func (s *historyBuilderSuite) TestHistoryBuilderDynamicSuccess() {
activity1Input := []byte("dynamic-historybuilder-success-activity1-input")
activity1Result := []byte("dynamic-historybuilder-success-activity1-result")
activity1ScheduledEvent, _ := s.addActivityTaskScheduledEvent(4, activity1ID, activity1Type,
activityTaskList, activity1Input, activityTimeout, queueTimeout, hearbeatTimeout)
activityTaskList, activity1Input, activityTimeout, queueTimeout, hearbeatTimeout, nil)
s.validateActivityTaskScheduledEvent(activity1ScheduledEvent, 5, 4, activity1ID, activity1Type,
activityTaskList, activity1Input, activityTimeout, queueTimeout, hearbeatTimeout)
s.Equal(int64(6), s.getNextEventID())
Expand All @@ -163,7 +164,7 @@ func (s *historyBuilderSuite) TestHistoryBuilderDynamicSuccess() {
activity2Reason := "dynamic-historybuilder-success-activity2-failed"
activity2Details := []byte("dynamic-historybuilder-success-activity2-callstack")
activity2ScheduledEvent, _ := s.addActivityTaskScheduledEvent(4, activity2ID, activity2Type,
activityTaskList, activity2Input, activityTimeout, queueTimeout, hearbeatTimeout)
activityTaskList, activity2Input, activityTimeout, queueTimeout, hearbeatTimeout, nil)
s.validateActivityTaskScheduledEvent(activity2ScheduledEvent, 6, 4, activity2ID, activity2Type,
activityTaskList, activity2Input, activityTimeout, queueTimeout, hearbeatTimeout)
s.Equal(int64(7), s.getNextEventID())
Expand All @@ -172,57 +173,123 @@ func (s *historyBuilderSuite) TestHistoryBuilderDynamicSuccess() {
s.Equal(common.EmptyEventID, ai2.StartedID)
s.Equal(int64(3), s.getPreviousDecisionStartedEventID())

activity3ID := "activity3"
activity3Type := "dynamic-historybuilder-success-activity3-type"
activity3Input := []byte("dynamic-historybuilder-success-activity3-input")
activity3RetryPolicy := &workflow.RetryPolicy{
InitialIntervalInSeconds: common.Int32Ptr(1),
MaximumAttempts: common.Int32Ptr(3),
MaximumIntervalInSeconds: common.Int32Ptr(1),
NonRetriableErrorReasons: []string{"bad-bug"},
BackoffCoefficient: common.Float64Ptr(1),
ExpirationIntervalInSeconds: common.Int32Ptr(100),
}
activity3ScheduledEvent, _ := s.addActivityTaskScheduledEvent(4, activity3ID, activity3Type,
activityTaskList, activity3Input, activityTimeout, queueTimeout, hearbeatTimeout, activity3RetryPolicy)
s.validateActivityTaskScheduledEvent(activity3ScheduledEvent, 7, 4, activity3ID, activity3Type,
activityTaskList, activity3Input, activityTimeout, queueTimeout, hearbeatTimeout)
s.Equal(int64(8), s.getNextEventID())
ai2, activity3Running0 := s.msBuilder.GetActivityInfo(6)
s.True(activity3Running0)
s.Equal(common.EmptyEventID, ai2.StartedID)
s.Equal(int64(3), s.getPreviousDecisionStartedEventID())

activityStartedEvent := s.addActivityTaskStartedEvent(5, activityTaskList, identity)
s.validateActivityTaskStartedEvent(activityStartedEvent, common.BufferedEventID, 5, identity)
s.validateActivityTaskStartedEvent(activityStartedEvent, common.BufferedEventID, 5, identity,
0, "", nil)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskStartedEvent(activityStartedEvent, 7, 5, identity)
s.Equal(int64(8), s.getNextEventID())
s.validateActivityTaskStartedEvent(activityStartedEvent, 8, 5, identity,
0, "", nil)
s.Equal(int64(9), s.getNextEventID())
ai3, activity1Running1 := s.msBuilder.GetActivityInfo(5)
s.True(activity1Running1)
s.Equal(int64(7), ai3.StartedID)
s.Equal(int64(8), ai3.StartedID)
s.Equal(int64(3), s.getPreviousDecisionStartedEventID())

activityCompletedEvent := s.addActivityTaskCompletedEvent(5, 7, activity1Result, identity)
s.validateActivityTaskCompletedEvent(activityCompletedEvent, common.BufferedEventID, 5, 7, activity1Result,
activityCompletedEvent := s.addActivityTaskCompletedEvent(5, 8, activity1Result, identity)
s.validateActivityTaskCompletedEvent(activityCompletedEvent, common.BufferedEventID, 5, 8, activity1Result,
identity)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskCompletedEvent(activityCompletedEvent, 8, 5, 7, activity1Result,
s.validateActivityTaskCompletedEvent(activityCompletedEvent, 9, 5, 8, activity1Result,
identity)
s.Equal(int64(9), s.getNextEventID())
s.Equal(int64(10), s.getNextEventID())
_, activity1Running2 := s.msBuilder.GetActivityInfo(5)
s.False(activity1Running2)
s.Equal(int64(3), s.getPreviousDecisionStartedEventID())

di2 := s.addDecisionTaskScheduledEvent()
s.validateDecisionTaskScheduledEvent(di2, 9, tl, taskTimeout)
s.Equal(int64(10), s.getNextEventID())
di3, decisionRunning3 := s.msBuilder.GetDecisionInfo(9)
s.validateDecisionTaskScheduledEvent(di2, 10, tl, taskTimeout)
s.Equal(int64(11), s.getNextEventID())
di3, decisionRunning3 := s.msBuilder.GetDecisionInfo(10)
s.True(decisionRunning3)
s.Equal(common.EmptyEventID, di3.StartedID)
s.Equal(int64(3), s.getPreviousDecisionStartedEventID())

activity2StartedEvent := s.addActivityTaskStartedEvent(6, activityTaskList, identity)
s.validateActivityTaskStartedEvent(activity2StartedEvent, common.BufferedEventID, 6, identity)
s.validateActivityTaskStartedEvent(activity2StartedEvent, common.BufferedEventID, 6, identity,
0, "", nil)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskStartedEvent(activity2StartedEvent, 10, 6, identity)
s.Equal(int64(11), s.getNextEventID())
s.validateActivityTaskStartedEvent(activity2StartedEvent, 11, 6, identity,
0, "", nil)
s.Equal(int64(12), s.getNextEventID())
ai4, activity2Running1 := s.msBuilder.GetActivityInfo(6)
s.True(activity2Running1)
s.Equal(int64(10), ai4.StartedID)
s.Equal(int64(11), ai4.StartedID)
s.Equal(int64(3), s.getPreviousDecisionStartedEventID())

activity2FailedEvent := s.addActivityTaskFailedEvent(6, 10, activity2Reason, activity2Details,
activity2FailedEvent := s.addActivityTaskFailedEvent(6, 11, activity2Reason, activity2Details,
identity)
s.validateActivityTaskFailedEvent(activity2FailedEvent, common.BufferedEventID, 6, 10, activity2Reason,
s.validateActivityTaskFailedEvent(activity2FailedEvent, common.BufferedEventID, 6, 11, activity2Reason,
activity2Details, identity)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskFailedEvent(activity2FailedEvent, 11, 6, 10, activity2Reason,
s.validateActivityTaskFailedEvent(activity2FailedEvent, 12, 6, 11, activity2Reason,
activity2Details, identity)
s.Equal(int64(12), s.getNextEventID())
_, activity2Running2 := s.msBuilder.GetActivityInfo(6)
s.False(activity2Running2)
s.Equal(int64(13), s.getNextEventID())
_, activity2Running3 := s.msBuilder.GetActivityInfo(6)
s.False(activity2Running3)
s.Equal(int64(3), s.getPreviousDecisionStartedEventID())

activity3StartedEvent := s.addActivityTaskStartedEvent(7, activityTaskList, identity)
s.validateTransientActivityTaskStartedEvent(activity3StartedEvent, common.TransientEventID, 7, identity)
s.Equal(int64(13), s.getNextEventID())
ai5, activity3Running1 := s.msBuilder.GetActivityInfo(7)
s.True(activity3Running1)
s.Equal(common.TransientEventID, ai5.StartedID)
s.Equal(int64(3), s.getPreviousDecisionStartedEventID())

activity3Reason := "dynamic-historybuilder-success-activity3-failed"
activity3Details := []byte("dynamic-historybuilder-success-activity3-callstack")
s.msBuilder.RetryActivity(ai5, activity3Reason, activity3Details)
ai6, activity3Running2 := s.msBuilder.GetActivityInfo(7)
s.Equal(activity3Reason, ai6.LastFailureReason)
s.Equal(activity3Details, ai6.LastFailureDetails)
s.True(activity3Running2)

activity3StartedEvent2 := s.addActivityTaskStartedEvent(7, activityTaskList, identity)
s.validateTransientActivityTaskStartedEvent(activity3StartedEvent2, common.TransientEventID, 7, identity)
s.Equal(int64(13), s.getNextEventID())
ai7, activity3Running3 := s.msBuilder.GetActivityInfo(7)
s.True(activity3Running3)
s.Equal(common.TransientEventID, ai7.StartedID)
s.Equal(int64(3), s.getPreviousDecisionStartedEventID())

activity3Result := []byte("dynamic-historybuilder-success-activity1-result")
activity3CompletedEvent := s.addActivityTaskCompletedEvent(7, common.TransientEventID, activity3Result, identity)
s.validateActivityTaskCompletedEvent(activity3CompletedEvent, common.BufferedEventID, 7, common.TransientEventID,
activity3Result, identity)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskCompletedEvent(activity3CompletedEvent, 14, 7, 13, activity3Result, identity)
s.Equal(int64(15), s.getNextEventID())
ai8, activity3Running4 := s.msBuilder.GetActivityInfo(7)
s.Nil(ai8)
s.False(activity3Running4)
s.Equal(int64(3), s.getPreviousDecisionStartedEventID())

// Verify the last ActivityTaskStartedEvent which should show the error from the first attempt
historyEvents := s.msBuilder.GetHistoryBuilder().GetHistory().GetEvents()
s.Len(historyEvents, 14)
s.validateActivityTaskStartedEvent(historyEvents[12], 13, 7, identity, 1, activity3Reason, activity3Details)

markerDetails := []byte("dynamic-historybuilder-success-marker-details")
markerHeaderField1 := []byte("dynamic-historybuilder-success-marker-header1")
markerHeaderField2 := []byte("dynamic-historybuilder-success-marker-header2")
Expand All @@ -232,9 +299,9 @@ func (s *historyBuilderSuite) TestHistoryBuilderDynamicSuccess() {
}
markerEvent := s.addMarkerRecordedEvent(4, "testMarker", markerDetails,
&markerHeader)
s.validateMarkerRecordedEvent(markerEvent, 12, 4, "testMarker", markerDetails, &markerHeader)
s.validateMarkerRecordedEvent(markerEvent, 15, 4, "testMarker", markerDetails, &markerHeader)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.Equal(int64(13), s.getNextEventID())
s.Equal(int64(16), s.getNextEventID())
s.Equal(int64(3), s.getPreviousDecisionStartedEventID())
}

Expand Down Expand Up @@ -437,7 +504,7 @@ func (s *historyBuilderSuite) TestHistoryBuilderFlushBufferedEvents() {
activity1Input := []byte("flush-buffered-events-activity1-input")
activity1Result := []byte("flush-buffered-events-activity1-result")
activity1ScheduledEvent, _ := s.addActivityTaskScheduledEvent(4, activity1ID, activity1Type,
activityTaskList, activity1Input, activityTimeout, queueTimeout, hearbeatTimeout)
activityTaskList, activity1Input, activityTimeout, queueTimeout, hearbeatTimeout, nil)
s.validateActivityTaskScheduledEvent(activity1ScheduledEvent, 5, 4, activity1ID, activity1Type,
activityTaskList, activity1Input, activityTimeout, queueTimeout, hearbeatTimeout)
s.Equal(int64(6), s.getNextEventID())
Expand All @@ -451,7 +518,7 @@ func (s *historyBuilderSuite) TestHistoryBuilderFlushBufferedEvents() {
activity2Type := "flush-buffered-events-activity2-type"
activity2Input := []byte("flush-buffered-events-activity2-input")
activity2ScheduledEvent, _ := s.addActivityTaskScheduledEvent(4, activity2ID, activity2Type,
activityTaskList, activity2Input, activityTimeout, queueTimeout, hearbeatTimeout)
activityTaskList, activity2Input, activityTimeout, queueTimeout, hearbeatTimeout, nil)
s.validateActivityTaskScheduledEvent(activity2ScheduledEvent, 6, 4, activity2ID, activity2Type,
activityTaskList, activity2Input, activityTimeout, queueTimeout, hearbeatTimeout)
s.Equal(int64(7), s.getNextEventID())
Expand All @@ -462,9 +529,11 @@ func (s *historyBuilderSuite) TestHistoryBuilderFlushBufferedEvents() {

// 7 activity1 started
activityStartedEvent := s.addActivityTaskStartedEvent(5, activityTaskList, identity)
s.validateActivityTaskStartedEvent(activityStartedEvent, common.BufferedEventID, 5, identity)
s.validateActivityTaskStartedEvent(activityStartedEvent, common.BufferedEventID, 5, identity,
0, "", nil)
s.Nil(s.msBuilder.FlushBufferedEvents())
s.validateActivityTaskStartedEvent(activityStartedEvent, 7, 5, identity)
s.validateActivityTaskStartedEvent(activityStartedEvent, 7, 5, identity,
0, "", nil)
s.Equal(int64(8), s.getNextEventID())
ai3, activity1Running1 := s.msBuilder.GetActivityInfo(5)
s.True(activity1Running1)
Expand Down Expand Up @@ -503,7 +572,8 @@ func (s *historyBuilderSuite) TestHistoryBuilderFlushBufferedEvents() {

// 11 (buffered) activity2 started
activity2StartedEvent := s.addActivityTaskStartedEvent(6, activityTaskList, identity)
s.validateActivityTaskStartedEvent(activity2StartedEvent, common.BufferedEventID, 6, identity)
s.validateActivityTaskStartedEvent(activity2StartedEvent, common.BufferedEventID, 6, identity,
0, "", nil)
s.Equal(int64(11), s.getNextEventID())
ai4, activity2Running := s.msBuilder.GetActivityInfo(6)
s.True(activity2Running)
Expand Down Expand Up @@ -756,7 +826,7 @@ func (s *historyBuilderSuite) addDecisionTaskCompletedEvent(scheduleID, startedI
}

func (s *historyBuilderSuite) addActivityTaskScheduledEvent(decisionCompletedID int64, activityID, activityType,
taskList string, input []byte, timeout, queueTimeout, hearbeatTimeout int32) (*workflow.HistoryEvent,
taskList string, input []byte, timeout, queueTimeout, hearbeatTimeout int32, retryPolicy *workflow.RetryPolicy) (*workflow.HistoryEvent,
*persistence.ActivityInfo) {
event, ai, err := s.msBuilder.AddActivityTaskScheduledEvent(decisionCompletedID,
&workflow.ScheduleActivityTaskDecisionAttributes{
Expand All @@ -768,6 +838,7 @@ func (s *historyBuilderSuite) addActivityTaskScheduledEvent(decisionCompletedID
ScheduleToStartTimeoutSeconds: common.Int32Ptr(queueTimeout),
HeartbeatTimeoutSeconds: common.Int32Ptr(hearbeatTimeout),
StartToCloseTimeoutSeconds: common.Int32Ptr(1),
RetryPolicy: retryPolicy,
})
s.Nil(err)
return event, ai
Expand Down Expand Up @@ -922,14 +993,26 @@ func (s *historyBuilderSuite) validateActivityTaskScheduledEvent(event *workflow
}

func (s *historyBuilderSuite) validateActivityTaskStartedEvent(event *workflow.HistoryEvent, eventID, scheduleID int64,
identity string) {
identity string, attempt int64, lastFailureReason string, lastFailureDetails []byte) {
s.NotNil(event)
s.Equal(workflow.EventTypeActivityTaskStarted, *event.EventType)
s.Equal(eventID, *event.EventId)
attributes := event.ActivityTaskStartedEventAttributes
s.NotNil(attributes)
s.Equal(scheduleID, *attributes.ScheduledEventId)
s.Equal(identity, *attributes.Identity)
s.Equal(lastFailureReason, *attributes.LastFailureReason)
s.Equal(lastFailureDetails, attributes.LastFailureDetails)
}

func (s *historyBuilderSuite) validateTransientActivityTaskStartedEvent(event *workflow.HistoryEvent, eventID, scheduleID int64,
identity string) {
s.Nil(event)
ai, ok := s.msBuilder.GetPendingActivityInfos()[scheduleID]
s.True(ok)
s.NotNil(ai)
s.Equal(scheduleID, ai.ScheduleID)
s.Equal(identity, ai.StartedIdentity)
}

func (s *historyBuilderSuite) validateActivityTaskCompletedEvent(event *workflow.HistoryEvent, eventID,
Expand Down
6 changes: 4 additions & 2 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2195,7 +2195,8 @@ func (e *mutableStateBuilder) addTransientActivityStartedEvent(
}

// activity task was started (as transient event), we need to add it now.
event := e.hBuilder.AddActivityTaskStartedEvent(scheduleEventID, ai.Attempt, ai.RequestID, ai.StartedIdentity)
event := e.hBuilder.AddActivityTaskStartedEvent(scheduleEventID, ai.Attempt, ai.RequestID, ai.StartedIdentity,
ai.LastFailureReason, ai.LastFailureDetails)
if !ai.StartedTime.IsZero() {
// overwrite started event time to the one recorded in ActivityInfo
event.Timestamp = common.Int64Ptr(ai.StartedTime.UnixNano())
Expand All @@ -2216,7 +2217,8 @@ func (e *mutableStateBuilder) AddActivityTaskStartedEvent(
}

if !ai.HasRetryPolicy {
event := e.hBuilder.AddActivityTaskStartedEvent(scheduleEventID, ai.Attempt, requestID, identity)
event := e.hBuilder.AddActivityTaskStartedEvent(scheduleEventID, ai.Attempt, requestID, identity,
ai.LastFailureReason, ai.LastFailureDetails)
if err := e.ReplicateActivityTaskStartedEvent(event); err != nil {
return nil, err
}
Expand Down

0 comments on commit 74c2c69

Please sign in to comment.