Skip to content

Commit

Permalink
Workflow retry incorrectly set attempt on start event of new run (ube…
Browse files Browse the repository at this point in the history
…r#1486)

Workflow execution with retry policy does not update attempt on
new run and always sets it to 1.  This results in retry policy to
never fail the execution based on attempt.  Fixing the mutablestate
update to use correct value for attempt from the new start
attributes.
Also added integration tests to validate attemp on start attributes
and also test bailout using attempt or non-retryable errors from
the policy.
  • Loading branch information
samarabbas authored Feb 24, 2019
1 parent 3e0ad4b commit 7d1ad39
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 5 deletions.
162 changes: 158 additions & 4 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ func (s *integrationSuite) TestWorkflowRetry() {
Identity: common.StringPtr(identity),
RetryPolicy: &workflow.RetryPolicy{
InitialIntervalInSeconds: common.Int32Ptr(1),
MaximumAttempts: common.Int32Ptr(3),
MaximumAttempts: common.Int32Ptr(5),
MaximumIntervalInSeconds: common.Int32Ptr(1),
NonRetriableErrorReasons: []string{"bad-bug"},
BackoffCoefficient: common.Float64Ptr(1),
Expand All @@ -1019,7 +1019,7 @@ func (s *integrationSuite) TestWorkflowRetry() {
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {
executions = append(executions, execution)
attemptCount++
if attemptCount == 3 {
if attemptCount == 5 {
return nil, []*workflow.Decision{
{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
Expand Down Expand Up @@ -1050,23 +1050,177 @@ func (s *integrationSuite) TestWorkflowRetry() {

_, err := poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil, err)
events := s.getHistory(s.domainName, executions[0])
s.Equal(workflow.EventTypeWorkflowExecutionContinuedAsNew, events[len(events)-1].GetEventType())
s.Equal(int32(0), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt())

_, err = poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil, err)
events = s.getHistory(s.domainName, executions[1])
s.Equal(workflow.EventTypeWorkflowExecutionContinuedAsNew, events[len(events)-1].GetEventType())
s.Equal(int32(1), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt())

_, err = poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil, err)
events = s.getHistory(s.domainName, executions[2])
s.Equal(workflow.EventTypeWorkflowExecutionContinuedAsNew, events[len(events)-1].GetEventType())
s.Equal(int32(2), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt())

_, err = poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil, err)
events = s.getHistory(s.domainName, executions[3])
s.Equal(workflow.EventTypeWorkflowExecutionContinuedAsNew, events[len(events)-1].GetEventType())
s.Equal(int32(3), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt())

s.Equal(3, attemptCount)
_, err = poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil, err)
events = s.getHistory(s.domainName, executions[4])
s.Equal(workflow.EventTypeWorkflowExecutionCompleted, events[len(events)-1].GetEventType())
s.Equal(int32(4), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt())
}

func (s *integrationSuite) TestWorkflowRetryFailures() {
id := "integration-wf-retry-failures-test"
wt := "integration-wf-retry-failures-type"
tl := "integration-wf-retry-failures-tasklist"
identity := "worker1"

workflowType := &workflow.WorkflowType{}
workflowType.Name = common.StringPtr(wt)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

workflowImpl := func(attempts int, errorReason string, executions *[]*workflow.WorkflowExecution) decisionTaskHandler {
attemptCount := 0

dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {
*executions = append(*executions, execution)
attemptCount++
if attemptCount == attempts {
return nil, []*workflow.Decision{
{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("succeed-after-retry"),
},
}}, nil
}
return nil, []*workflow.Decision{
{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeFailWorkflowExecution),
FailWorkflowExecutionDecisionAttributes: &workflow.FailWorkflowExecutionDecisionAttributes{
//Reason: common.StringPtr("retryable-error"),
Reason: common.StringPtr(errorReason),
Details: nil,
},
}}, nil
}

return dtHandler
}

// Fail using attempt
request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Identity: common.StringPtr(identity),
RetryPolicy: &workflow.RetryPolicy{
InitialIntervalInSeconds: common.Int32Ptr(1),
MaximumAttempts: common.Int32Ptr(3),
MaximumIntervalInSeconds: common.Int32Ptr(1),
NonRetriableErrorReasons: []string{"bad-bug"},
BackoffCoefficient: common.Float64Ptr(1),
ExpirationIntervalInSeconds: common.Int32Ptr(100),
},
}

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)

s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId)

executions := []*workflow.WorkflowExecution{}
dtHandler := workflowImpl(5, "retryable-error", &executions)
poller := &TaskPoller{
Engine: s.engine,
Domain: s.domainName,
TaskList: taskList,
Identity: identity,
DecisionHandler: dtHandler,
Logger: s.logger,
T: s.T(),
}

_, err := poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil, err)
events := s.getHistory(s.domainName, executions[0])
s.Equal(workflow.EventTypeWorkflowExecutionContinuedAsNew, events[len(events)-1].GetEventType())
s.Equal(int32(0), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt())

_, err = poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil, err)
events = s.getHistory(s.domainName, executions[1])
s.Equal(workflow.EventTypeWorkflowExecutionContinuedAsNew, events[len(events)-1].GetEventType())
s.Equal(int32(1), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt())

_, err = poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil, err)
events = s.getHistory(s.domainName, executions[2])
s.Equal(workflow.EventTypeWorkflowExecutionCompleted, events[len(events)-1].GetEventType())
s.Equal(workflow.EventTypeWorkflowExecutionFailed, events[len(events)-1].GetEventType())
s.Equal(int32(2), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt())

// Fail error reason
request = &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Identity: common.StringPtr(identity),
RetryPolicy: &workflow.RetryPolicy{
InitialIntervalInSeconds: common.Int32Ptr(1),
MaximumAttempts: common.Int32Ptr(3),
MaximumIntervalInSeconds: common.Int32Ptr(1),
NonRetriableErrorReasons: []string{"bad-bug"},
BackoffCoefficient: common.Float64Ptr(1),
ExpirationIntervalInSeconds: common.Int32Ptr(100),
},
}

we, err0 = s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)

s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId)

executions = []*workflow.WorkflowExecution{}
dtHandler = workflowImpl(5, "bad-bug", &executions)
poller = &TaskPoller{
Engine: s.engine,
Domain: s.domainName,
TaskList: taskList,
Identity: identity,
DecisionHandler: dtHandler,
Logger: s.logger,
T: s.T(),
}

_, err = poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil, err)
events = s.getHistory(s.domainName, executions[0])
s.Equal(workflow.EventTypeWorkflowExecutionFailed, events[len(events)-1].GetEventType())
s.Equal(int32(0), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt())

}

func (s *integrationSuite) TestCronWorkflow() {
Expand Down
2 changes: 1 addition & 1 deletion service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2405,7 +2405,7 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionContinuedAsNewEvent(sour

if continueAsNewAttributes.GetInitiator() == workflow.ContinueAsNewInitiatorRetryPolicy {
// retry
continueAsNew.Attempt = continueAsNew.Attempt + 1
continueAsNew.Attempt = startedAttributes.GetAttempt()
continueAsNew.ExpirationTime = e.executionInfo.ExpirationTime
} else {
// by cron or decider
Expand Down

0 comments on commit 7d1ad39

Please sign in to comment.