Skip to content

Commit

Permalink
Fix missing properties in continueasnew when workflow fail (#2883)
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu authored Dec 3, 2019
1 parent 38ba642 commit 1753126
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 53 deletions.
12 changes: 6 additions & 6 deletions host/continueasnew_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (
)

func (s *integrationSuite) TestContinueAsNewWorkflow() {
id := "interation-continue-as-new-workflow-test"
wt := "interation-continue-as-new-workflow-test-type"
tl := "interation-continue-as-new-workflow-test-tasklist"
id := "integration-continue-as-new-workflow-test"
wt := "integration-continue-as-new-workflow-test-type"
tl := "integration-continue-as-new-workflow-test-tasklist"
identity := "worker1"

workflowType := &workflow.WorkflowType{}
Expand Down Expand Up @@ -140,9 +140,9 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() {
}

func (s *integrationSuite) TestContinueAsNewWorkflow_Timeout() {
id := "interation-continue-as-new-workflow-timeout-test"
wt := "interation-continue-as-new-workflow-timeout-test-type"
tl := "interation-continue-as-new-workflow-timeout-test-tasklist"
id := "integration-continue-as-new-workflow-timeout-test"
wt := "integration-continue-as-new-workflow-timeout-test-type"
tl := "integration-continue-as-new-workflow-timeout-test-tasklist"
identity := "worker1"

workflowType := &workflow.WorkflowType{}
Expand Down
16 changes: 8 additions & 8 deletions host/decision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

func (s *integrationSuite) TestDecisionHeartbeatingWithEmptyResult() {
id := uuid.New()
wt := "interation-workflow-decision-heartbeating-local-activities"
wt := "integration-workflow-decision-heartbeating-local-activities"
tl := id
identity := "worker1"

Expand Down Expand Up @@ -140,7 +140,7 @@ func (s *integrationSuite) TestDecisionHeartbeatingWithEmptyResult() {

func (s *integrationSuite) TestDecisionHeartbeatingWithLocalActivitiesResult() {
id := uuid.New()
wt := "interation-workflow-decision-heartbeating-local-activities"
wt := "integration-workflow-decision-heartbeating-local-activities"
tl := id
identity := "worker1"

Expand Down Expand Up @@ -284,7 +284,7 @@ func (s *integrationSuite) TestDecisionHeartbeatingWithLocalActivitiesResult() {

func (s *integrationSuite) TestWorkflowTerminationSignalBeforeRegularDecisionStarted() {
id := uuid.New()
wt := "interation-workflow-transient-decision-test-type"
wt := "integration-workflow-transient-decision-test-type"
tl := id
identity := "worker1"

Expand Down Expand Up @@ -359,7 +359,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalBeforeRegularDecisionSta

func (s *integrationSuite) TestWorkflowTerminationSignalAfterRegularDecisionStarted() {
id := uuid.New()
wt := "interation-workflow-transient-decision-test-type"
wt := "integration-workflow-transient-decision-test-type"
tl := id
identity := "worker1"

Expand Down Expand Up @@ -434,7 +434,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalAfterRegularDecisionStar

func (s *integrationSuite) TestWorkflowTerminationSignalAfterRegularDecisionStartedAndFailDecision() {
id := uuid.New()
wt := "interation-workflow-transient-decision-test-type"
wt := "integration-workflow-transient-decision-test-type"
tl := id
identity := "worker1"

Expand Down Expand Up @@ -521,7 +521,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalAfterRegularDecisionStar

func (s *integrationSuite) TestWorkflowTerminationSignalBeforeTransientDecisionStarted() {
id := uuid.New()
wt := "interation-workflow-transient-decision-test-type"
wt := "integration-workflow-transient-decision-test-type"
tl := id
identity := "worker1"

Expand Down Expand Up @@ -626,7 +626,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalBeforeTransientDecisionS

func (s *integrationSuite) TestWorkflowTerminationSignalAfterTransientDecisionStarted() {
id := uuid.New()
wt := "interation-workflow-transient-decision-test-type"
wt := "integration-workflow-transient-decision-test-type"
tl := id
identity := "worker1"

Expand Down Expand Up @@ -728,7 +728,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalAfterTransientDecisionSt

func (s *integrationSuite) TestWorkflowTerminationSignalAfterTransientDecisionStartedAndFailDecision() {
id := uuid.New()
wt := "interation-workflow-transient-decision-test-type"
wt := "integration-workflow-transient-decision-test-type"
tl := id
identity := "worker1"

Expand Down
18 changes: 9 additions & 9 deletions host/gethistory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ import (
)

func (s *integrationSuite) TestGetWorkflowExecutionHistory_All() {
workflowID := "interation-get-workflow-history-events-long-poll-test-all"
workflowTypeName := "interation-get-workflow-history-events-long-poll-test-all-type"
tasklistName := "interation-get-workflow-history-events-long-poll-test-all-tasklist"
workflowID := "integration-get-workflow-history-events-long-poll-test-all"
workflowTypeName := "integration-get-workflow-history-events-long-poll-test-all-type"
tasklistName := "integration-get-workflow-history-events-long-poll-test-all-tasklist"
identity := "worker1"
activityName := "activity_type1"

Expand Down Expand Up @@ -204,9 +204,9 @@ func (s *integrationSuite) TestGetWorkflowExecutionHistory_All() {
}

func (s *integrationSuite) TestGetWorkflowExecutionHistory_Close() {
workflowID := "interation-get-workflow-history-events-long-poll-test-close"
workflowTypeName := "interation-get-workflow-history-events-long-poll-test-close-type"
tasklistName := "interation-get-workflow-history-events-long-poll-test-close-tasklist"
workflowID := "integration-get-workflow-history-events-long-poll-test-close"
workflowTypeName := "integration-get-workflow-history-events-long-poll-test-close-type"
tasklistName := "integration-get-workflow-history-events-long-poll-test-close-tasklist"
identity := "worker1"
activityName := "activity_type1"

Expand Down Expand Up @@ -363,9 +363,9 @@ func (s *integrationSuite) TestGetWorkflowExecutionHistory_Close() {
}

func (s *integrationSuite) TestGetWorkflowExecutionRawHistory_All() {
workflowID := "interation-get-workflow-history-raw-events-all"
workflowTypeName := "interation-get-workflow-history-raw-events-all-type"
tasklistName := "interation-get-workflow-history-raw-events-all-tasklist"
workflowID := "integration-get-workflow-history-raw-events-all"
workflowTypeName := "integration-get-workflow-history-raw-events-all-type"
tasklistName := "integration-get-workflow-history-raw-events-all-tasklist"
identity := "worker1"
activityName := "activity_type1"

Expand Down
121 changes: 115 additions & 6 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,9 @@ StartNewExecutionLoop:
}

func (s *integrationSuite) TestSequentialWorkflow() {
id := "interation-sequential-workflow-test"
wt := "interation-sequential-workflow-test-type"
tl := "interation-sequential-workflow-test-tasklist"
id := "integration-sequential-workflow-test"
wt := "integration-sequential-workflow-test-type"
tl := "integration-sequential-workflow-test-tasklist"
identity := "worker1"
activityName := "activity_type1"

Expand Down Expand Up @@ -389,9 +389,9 @@ func (s *integrationSuite) TestSequentialWorkflow() {
}

func (s *integrationSuite) TestCompleteDecisionTaskAndCreateNewOne() {
id := "interation-complete-decision-create-new-test"
wt := "interation-complete-decision-create-new-test-type"
tl := "interation-complete-decision-create-new-test-tasklist"
id := "integration-complete-decision-create-new-test"
wt := "integration-complete-decision-create-new-test-type"
tl := "integration-complete-decision-create-new-test-tasklist"
identity := "worker1"

workflowType := &workflow.WorkflowType{}
Expand Down Expand Up @@ -868,6 +868,9 @@ func (s *integrationSuite) TestCronWorkflow() {
memo := &workflow.Memo{
Fields: map[string][]byte{"memoKey": []byte("memoVal")},
}
searchAttr := &workflow.SearchAttributes{
IndexedFields: map[string][]byte{"CustomKeywordField": []byte("1")},
}

request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Expand All @@ -881,6 +884,7 @@ func (s *integrationSuite) TestCronWorkflow() {
Identity: common.StringPtr(identity),
CronSchedule: common.StringPtr(cronSchedule), //minimum interval by standard spec is 1m (* * * * *), use non-standard descriptor for short interval for test
Memo: memo,
SearchAttributes: searchAttr,
}

startWorkflowTS := time.Now()
Expand Down Expand Up @@ -978,6 +982,7 @@ func (s *integrationSuite) TestCronWorkflow() {
s.Equal("cron-test-error", attributes.GetFailureReason())
s.Equal(0, len(attributes.GetLastCompletionResult()))
s.Equal(memo, attributes.Memo)
s.Equal(searchAttr, attributes.SearchAttributes)

events = s.getHistory(s.domainName, executions[1])
lastEvent = events[len(events)-1]
Expand All @@ -987,6 +992,7 @@ func (s *integrationSuite) TestCronWorkflow() {
s.Equal("", attributes.GetFailureReason())
s.Equal("cron-test-result", string(attributes.GetLastCompletionResult()))
s.Equal(memo, attributes.Memo)
s.Equal(searchAttr, attributes.SearchAttributes)

events = s.getHistory(s.domainName, executions[2])
lastEvent = events[len(events)-1]
Expand All @@ -996,6 +1002,7 @@ func (s *integrationSuite) TestCronWorkflow() {
s.Equal("cron-test-error", attributes.GetFailureReason())
s.Equal("cron-test-result", string(attributes.GetLastCompletionResult()))
s.Equal(memo, attributes.Memo)
s.Equal(searchAttr, attributes.SearchAttributes)

startFilter.LatestTime = common.Int64Ptr(time.Now().UnixNano())
var closedExecutions []*workflow.WorkflowExecutionInfo
Expand Down Expand Up @@ -1046,6 +1053,108 @@ func (s *integrationSuite) TestCronWorkflow() {
}
}

func (s *integrationSuite) TestCronWorkflowTimeout() {
id := "integration-wf-cron-timeout-test"
wt := "integration-wf-cron-timeout-type"
tl := "integration-wf-cron-timeout-tasklist"
identity := "worker1"
cronSchedule := "@every 3s"

workflowType := &workflow.WorkflowType{}
workflowType.Name = common.StringPtr(wt)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

memo := &workflow.Memo{
Fields: map[string][]byte{"memoKey": []byte("memoVal")},
}
searchAttr := &workflow.SearchAttributes{
IndexedFields: map[string][]byte{"CustomKeywordField": []byte("1")},
}

request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1), // set workflow timeout to 1s
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Identity: common.StringPtr(identity),
CronSchedule: common.StringPtr(cronSchedule), //minimum interval by standard spec is 1m (* * * * *), use non-standard descriptor for short interval for test
Memo: memo,
SearchAttributes: searchAttr,
}

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)

s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(*we.RunId))

var executions []*workflow.WorkflowExecution
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {

executions = append(executions, execution)
return nil, []*workflow.Decision{
{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeStartTimer),

StartTimerDecisionAttributes: &workflow.StartTimerDecisionAttributes{
TimerId: common.StringPtr("timer-id"),
StartToFireTimeoutSeconds: common.Int64Ptr(5),
},
}}, nil
}

poller := &TaskPoller{
Engine: s.engine,
Domain: s.domainName,
TaskList: taskList,
Identity: identity,
DecisionHandler: dtHandler,
Logger: s.Logger,
T: s.T(),
}

_, err := poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil, err)

time.Sleep(1 * time.Second) // wait for workflow timeout

// check when workflow timeout, continueAsNew event contains expected fields
events := s.getHistory(s.domainName, executions[0])
lastEvent := events[len(events)-1]
s.Equal(workflow.EventTypeWorkflowExecutionContinuedAsNew, lastEvent.GetEventType())
attributes := lastEvent.WorkflowExecutionContinuedAsNewEventAttributes
s.Equal(workflow.ContinueAsNewInitiatorCronSchedule, attributes.GetInitiator())
s.Equal("cadenceInternal:Timeout START_TO_CLOSE", attributes.GetFailureReason())
s.Equal(memo, attributes.Memo)
s.Equal(searchAttr, attributes.SearchAttributes)

_, err = poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil, err)

// check new run contains expected fields
events = s.getHistory(s.domainName, executions[1])
firstEvent := events[0]
s.Equal(workflow.EventTypeWorkflowExecutionStarted, firstEvent.GetEventType())
startAttributes := firstEvent.WorkflowExecutionStartedEventAttributes
s.Equal(memo, startAttributes.Memo)
s.Equal(searchAttr, startAttributes.SearchAttributes)

// terminate cron
terminateErr := s.engine.TerminateWorkflowExecution(createContext(), &workflow.TerminateWorkflowExecutionRequest{
Domain: common.StringPtr(s.domainName),
WorkflowExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
},
})
s.NoError(terminateErr)
}

func (s *integrationSuite) TestSequential_UserTimers() {
id := "integration-sequential-user-timers-test"
wt := "integration-sequential-user-timers-test-type"
Expand Down
38 changes: 19 additions & 19 deletions host/queryworkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ import (
)

func (s *integrationSuite) TestQueryWorkflow_NonSticky() {
id := "interation-query-workflow-test-non-sticky"
wt := "interation-query-workflow-test-non-sticky-type"
tl := "interation-query-workflow-test-non-sticky-tasklist"
id := "integration-query-workflow-test-non-sticky"
wt := "integration-query-workflow-test-non-sticky-type"
tl := "integration-query-workflow-test-non-sticky-tasklist"
identity := "worker1"
activityName := "activity_type1"
queryType := "test-query"
Expand Down Expand Up @@ -244,9 +244,9 @@ func (s *integrationSuite) TestQueryWorkflow_NonSticky() {
}

func (s *integrationSuite) TestQueryWorkflow_Consistent_PiggybackQuery() {
id := "interation-query-workflow-test-consistent-piggyback-query"
wt := "interation-query-workflow-test-consistent-piggyback-query-type"
tl := "interation-query-workflow-test-consistent-piggyback-query-tasklist"
id := "integration-query-workflow-test-consistent-piggyback-query"
wt := "integration-query-workflow-test-consistent-piggyback-query-type"
tl := "integration-query-workflow-test-consistent-piggyback-query-tasklist"
identity := "worker1"
activityName := "activity_type1"
queryType := "test-query"
Expand Down Expand Up @@ -429,9 +429,9 @@ func (s *integrationSuite) TestQueryWorkflow_Consistent_PiggybackQuery() {
}

func (s *integrationSuite) TestQueryWorkflow_Consistent_Timeout() {
id := "interation-query-workflow-test-consistent-timeout"
wt := "interation-query-workflow-test-consistent-timeout-type"
tl := "interation-query-workflow-test-consistent-timeout-tasklist"
id := "integration-query-workflow-test-consistent-timeout"
wt := "integration-query-workflow-test-consistent-timeout-type"
tl := "integration-query-workflow-test-consistent-timeout-tasklist"
identity := "worker1"
activityName := "activity_type1"
queryType := "test-query"
Expand Down Expand Up @@ -590,9 +590,9 @@ func (s *integrationSuite) TestQueryWorkflow_Consistent_Timeout() {
}

func (s *integrationSuite) TestQueryWorkflow_Consistent_BlockedByStarted_NonSticky() {
id := "interation-query-workflow-test-consistent-blocked-by-started-non-sticky"
wt := "interation-query-workflow-test-consistent-blocked-by-started-non-sticky-type"
tl := "interation-query-workflow-test-consistent-blocked-by-started-non-sticky-tasklist"
id := "integration-query-workflow-test-consistent-blocked-by-started-non-sticky"
wt := "integration-query-workflow-test-consistent-blocked-by-started-non-sticky-type"
tl := "integration-query-workflow-test-consistent-blocked-by-started-non-sticky-tasklist"
identity := "worker1"
activityName := "activity_type1"
queryType := "test-query"
Expand Down Expand Up @@ -774,10 +774,10 @@ func (s *integrationSuite) TestQueryWorkflow_Consistent_BlockedByStarted_NonStic
}

func (s *integrationSuite) TestQueryWorkflow_Consistent_NewDecisionTask_Sticky() {
id := "interation-query-workflow-test-consistent-new-decision-task-sticky"
wt := "interation-query-workflow-test-consistent-new-decision-task-sticky-type"
tl := "interation-query-workflow-test-consistent-new-decision-task-sticky-tasklist"
stl := "interation-query-workflow-test-consistent-new-decision-task-sticky-tasklist-sticky"
id := "integration-query-workflow-test-consistent-new-decision-task-sticky"
wt := "integration-query-workflow-test-consistent-new-decision-task-sticky-type"
tl := "integration-query-workflow-test-consistent-new-decision-task-sticky-tasklist"
stl := "integration-query-workflow-test-consistent-new-decision-task-sticky-tasklist-sticky"
identity := "worker1"
activityName := "activity_type1"
queryType := "test-query"
Expand Down Expand Up @@ -993,9 +993,9 @@ func (s *integrationSuite) TestQueryWorkflow_Consistent_NewDecisionTask_Sticky()
}

func (s *integrationSuite) TestQueryWorkflow_BeforeFirstDecision() {
id := "interation-test-query-workflow-before-first-decision"
wt := "interation-test-query-workflow-before-first-decision-type"
tl := "interation-test-query-workflow-before-first-decision-tasklist"
id := "integration-test-query-workflow-before-first-decision"
wt := "integration-test-query-workflow-before-first-decision-type"
tl := "integration-test-query-workflow-before-first-decision-tasklist"
identity := "worker1"
queryType := "test-query"

Expand Down
Loading

0 comments on commit 1753126

Please sign in to comment.