Skip to content

Commit

Permalink
[Wf-Diagnostics] Improve identification of activity timeouts (#6232)
Browse files Browse the repository at this point in the history
  • Loading branch information
sankari165 authored Aug 16, 2024
1 parent beeac59 commit e23c5a6
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 17 deletions.
4 changes: 2 additions & 2 deletions service/worker/diagnostics/invariants/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func (t *timeout) Check(context.Context) ([]InvariantCheckResult, error) {
})
}
if event.ActivityTaskTimedOutEventAttributes != nil {
timeoutLimit, err := getActivityTaskConfiguredTimeout(event, events)
metadata, err := getActivityTaskMetadata(event, events)
if err != nil {
return nil, err
}
result = append(result, InvariantCheckResult{
InvariantType: TimeoutTypeActivity.String(),
Reason: event.GetActivityTaskTimedOutEventAttributes().GetTimeoutType().String(),
Metadata: timeoutLimitInBytes(timeoutLimit),
Metadata: marshalData(metadata),
})
}
if event.DecisionTaskTimedOutEventAttributes != nil {
Expand Down
53 changes: 45 additions & 8 deletions service/worker/diagnostics/invariants/timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
taskTimeoutSecond = int32(50)
testTimeStamp = int64(2547596872371000000)
timeUnit = time.Second
testTasklist = "test-tasklist"
)

func Test__Check(t *testing.T) {
Expand All @@ -61,10 +62,27 @@ func Test__Check(t *testing.T) {
RunID: "abc",
},
}
activityTimeoutData := ActivityTimeoutMetadata{
TimeoutType: types.TimeoutTypeScheduleToStart.Ptr(),
ConfiguredTimeout: 50 * time.Second,
TimeElapsed: 50 * time.Second,
RetryPolicy: nil,
HeartBeatTimeout: 0,
Tasklist: &types.TaskList{
Name: testTasklist,
Kind: nil,
},
}
workflowTimeoutDataInBytes, err := json.Marshal(workflowTimeoutData)
require.NoError(t, err)
childWfTimeoutDataInBytes, err := json.Marshal(childWfTimeoutData)
require.NoError(t, err)
activityTimeoutDataInBytes, err := json.Marshal(activityTimeoutData)
require.NoError(t, err)
activityTimeoutData.TimeoutType = types.TimeoutTypeHeartbeat.Ptr()
activityTimeoutData.HeartBeatTimeout = 50 * time.Second
activityHeartBeatTimeoutDataInBytes, err := json.Marshal(activityTimeoutData)
require.NoError(t, err)
taskTimeoutSecondInBytes, err := json.Marshal(taskTimeoutSecond)
require.NoError(t, err)
testCases := []struct {
Expand Down Expand Up @@ -104,12 +122,12 @@ func Test__Check(t *testing.T) {
{
InvariantType: TimeoutTypeActivity.String(),
Reason: "SCHEDULE_TO_START",
Metadata: taskTimeoutSecondInBytes,
Metadata: activityTimeoutDataInBytes,
},
{
InvariantType: TimeoutTypeActivity.String(),
Reason: "HEARTBEAT",
Metadata: taskTimeoutSecondInBytes,
Metadata: activityHeartBeatTimeoutDataInBytes,
},
},
err: nil,
Expand Down Expand Up @@ -205,28 +223,47 @@ func activityTimeoutHistory() *types.GetWorkflowExecutionHistoryResponse {
History: &types.History{
Events: []*types.HistoryEvent{
{
ID: 5,
ID: 1,
Timestamp: common.Int64Ptr(testTimeStamp),
ActivityTaskScheduledEventAttributes: &types.ActivityTaskScheduledEventAttributes{
ScheduleToStartTimeoutSeconds: common.Int32Ptr(taskTimeoutSecond),
TaskList: &types.TaskList{
Name: testTasklist,
Kind: nil,
},
},
},
{
ID: 2,
Timestamp: common.Int64Ptr(testTimeStamp + int64(taskTimeoutSecond)*timeUnit.Nanoseconds()),
ActivityTaskTimedOutEventAttributes: &types.ActivityTaskTimedOutEventAttributes{
ScheduledEventID: 5,
StartedEventID: 6,
ScheduledEventID: 1,
TimeoutType: types.TimeoutTypeScheduleToStart.Ptr(),
},
},
{
ID: 21,
ID: 3,
ActivityTaskScheduledEventAttributes: &types.ActivityTaskScheduledEventAttributes{
HeartbeatTimeoutSeconds: common.Int32Ptr(taskTimeoutSecond),
TaskList: &types.TaskList{
Name: testTasklist,
Kind: nil,
},
},
},
{
ActivityTaskTimedOutEventAttributes: &types.ActivityTaskTimedOutEventAttributes{
ID: 4,
Timestamp: common.Int64Ptr(testTimeStamp),
ActivityTaskStartedEventAttributes: &types.ActivityTaskStartedEventAttributes{
ScheduledEventID: 21,
StartedEventID: 22,
},
},
{
ID: 5,
Timestamp: common.Int64Ptr(testTimeStamp + int64(taskTimeoutSecond)*timeUnit.Nanoseconds()),
ActivityTaskTimedOutEventAttributes: &types.ActivityTaskTimedOutEventAttributes{
ScheduledEventID: 3,
StartedEventID: 4,
TimeoutType: types.TimeoutTypeHeartbeat.Ptr(),
},
},
Expand Down
30 changes: 23 additions & 7 deletions service/worker/diagnostics/invariants/timeout_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,43 @@ func getWorkflowExecutionConfiguredTimeout(events []*types.HistoryEvent) int32 {
return 0
}

func getActivityTaskConfiguredTimeout(e *types.HistoryEvent, events []*types.HistoryEvent) (int32, error) {
func getActivityTaskMetadata(e *types.HistoryEvent, events []*types.HistoryEvent) (ActivityTimeoutMetadata, error) {
eventScheduledID := e.GetActivityTaskTimedOutEventAttributes().GetScheduledEventID()
eventstartedID := e.GetActivityTaskTimedOutEventAttributes().StartedEventID
timeoutType := e.GetActivityTaskTimedOutEventAttributes().GetTimeoutType()
var configuredTimeout int32
var timeElapsed time.Duration
for _, event := range events {
if event.ID == eventScheduledID {
attr := event.GetActivityTaskScheduledEventAttributes()
switch timeoutType {
case types.TimeoutTypeHeartbeat:
return attr.GetHeartbeatTimeoutSeconds(), nil
configuredTimeout = attr.GetHeartbeatTimeoutSeconds()
timeElapsed = getExecutionTime(eventstartedID, e.ID, events)
case types.TimeoutTypeScheduleToClose:
return attr.GetScheduleToCloseTimeoutSeconds(), nil
configuredTimeout = attr.GetScheduleToCloseTimeoutSeconds()
timeElapsed = getExecutionTime(eventScheduledID, e.ID, events)
case types.TimeoutTypeScheduleToStart:
return attr.GetScheduleToStartTimeoutSeconds(), nil
configuredTimeout = attr.GetScheduleToStartTimeoutSeconds()
timeElapsed = getExecutionTime(eventScheduledID, e.ID, events)
case types.TimeoutTypeStartToClose:
return attr.GetStartToCloseTimeoutSeconds(), nil
configuredTimeout = attr.GetStartToCloseTimeoutSeconds()
timeElapsed = getExecutionTime(eventstartedID, e.ID, events)
default:
return 0, fmt.Errorf("unknown timeout type")
return ActivityTimeoutMetadata{}, fmt.Errorf("unknown timeout type")
}
return ActivityTimeoutMetadata{
TimeoutType: timeoutType.Ptr(),
ConfiguredTimeout: time.Duration(configuredTimeout) * time.Second,
TimeElapsed: timeElapsed,
RetryPolicy: attr.RetryPolicy,
HeartBeatTimeout: time.Duration(attr.GetHeartbeatTimeoutSeconds()) * time.Second,
Tasklist: attr.TaskList,
}, nil
}

}
return 0, fmt.Errorf("activity scheduled event not found")
return ActivityTimeoutMetadata{}, fmt.Errorf("activity scheduled event not found")
}

func getDecisionTaskConfiguredTimeout(eventScheduledID int64, events []*types.HistoryEvent) int32 {
Expand Down
9 changes: 9 additions & 0 deletions service/worker/diagnostics/invariants/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,12 @@ type ChildWfTimeoutMetadata struct {
ConfiguredTimeout time.Duration
Execution *types.WorkflowExecution
}

type ActivityTimeoutMetadata struct {
TimeoutType *types.TimeoutType
ConfiguredTimeout time.Duration
TimeElapsed time.Duration
RetryPolicy *types.RetryPolicy
HeartBeatTimeout time.Duration
Tasklist *types.TaskList
}

0 comments on commit e23c5a6

Please sign in to comment.