From 175312665a9bacc465caa8ff90d952d9bcbb88f0 Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Tue, 3 Dec 2019 15:22:38 -0800 Subject: [PATCH] Fix missing properties in continueasnew when workflow fail (#2883) --- host/continueasnew_test.go | 12 +- host/decision_test.go | 16 +-- host/gethistory_test.go | 18 +-- host/integration_test.go | 121 ++++++++++++++++++- host/queryworkflow_test.go | 38 +++--- host/signalworkflow_test.go | 6 +- service/history/timerQueueActiveProcessor.go | 6 +- 7 files changed, 164 insertions(+), 53 deletions(-) diff --git a/host/continueasnew_test.go b/host/continueasnew_test.go index 92e31f41d9a..29cbff55767 100644 --- a/host/continueasnew_test.go +++ b/host/continueasnew_test.go @@ -34,9 +34,9 @@ import ( ) func (s *integrationSuite) TestContinueAsNewWorkflow() { - id := "interation-continue-as-new-workflow-test" - wt := "interation-continue-as-new-workflow-test-type" - tl := "interation-continue-as-new-workflow-test-tasklist" + id := "integration-continue-as-new-workflow-test" + wt := "integration-continue-as-new-workflow-test-type" + tl := "integration-continue-as-new-workflow-test-tasklist" identity := "worker1" workflowType := &workflow.WorkflowType{} @@ -140,9 +140,9 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() { } func (s *integrationSuite) TestContinueAsNewWorkflow_Timeout() { - id := "interation-continue-as-new-workflow-timeout-test" - wt := "interation-continue-as-new-workflow-timeout-test-type" - tl := "interation-continue-as-new-workflow-timeout-test-tasklist" + id := "integration-continue-as-new-workflow-timeout-test" + wt := "integration-continue-as-new-workflow-timeout-test-type" + tl := "integration-continue-as-new-workflow-timeout-test-tasklist" identity := "worker1" workflowType := &workflow.WorkflowType{} diff --git a/host/decision_test.go b/host/decision_test.go index fe1dc32c655..92a28c7ca3d 100644 --- a/host/decision_test.go +++ b/host/decision_test.go @@ -33,7 +33,7 @@ import ( func (s *integrationSuite) TestDecisionHeartbeatingWithEmptyResult() { id := uuid.New() - wt := "interation-workflow-decision-heartbeating-local-activities" + wt := "integration-workflow-decision-heartbeating-local-activities" tl := id identity := "worker1" @@ -140,7 +140,7 @@ func (s *integrationSuite) TestDecisionHeartbeatingWithEmptyResult() { func (s *integrationSuite) TestDecisionHeartbeatingWithLocalActivitiesResult() { id := uuid.New() - wt := "interation-workflow-decision-heartbeating-local-activities" + wt := "integration-workflow-decision-heartbeating-local-activities" tl := id identity := "worker1" @@ -284,7 +284,7 @@ func (s *integrationSuite) TestDecisionHeartbeatingWithLocalActivitiesResult() { func (s *integrationSuite) TestWorkflowTerminationSignalBeforeRegularDecisionStarted() { id := uuid.New() - wt := "interation-workflow-transient-decision-test-type" + wt := "integration-workflow-transient-decision-test-type" tl := id identity := "worker1" @@ -359,7 +359,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalBeforeRegularDecisionSta func (s *integrationSuite) TestWorkflowTerminationSignalAfterRegularDecisionStarted() { id := uuid.New() - wt := "interation-workflow-transient-decision-test-type" + wt := "integration-workflow-transient-decision-test-type" tl := id identity := "worker1" @@ -434,7 +434,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalAfterRegularDecisionStar func (s *integrationSuite) TestWorkflowTerminationSignalAfterRegularDecisionStartedAndFailDecision() { id := uuid.New() - wt := "interation-workflow-transient-decision-test-type" + wt := "integration-workflow-transient-decision-test-type" tl := id identity := "worker1" @@ -521,7 +521,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalAfterRegularDecisionStar func (s *integrationSuite) TestWorkflowTerminationSignalBeforeTransientDecisionStarted() { id := uuid.New() - wt := "interation-workflow-transient-decision-test-type" + wt := "integration-workflow-transient-decision-test-type" tl := id identity := "worker1" @@ -626,7 +626,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalBeforeTransientDecisionS func (s *integrationSuite) TestWorkflowTerminationSignalAfterTransientDecisionStarted() { id := uuid.New() - wt := "interation-workflow-transient-decision-test-type" + wt := "integration-workflow-transient-decision-test-type" tl := id identity := "worker1" @@ -728,7 +728,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalAfterTransientDecisionSt func (s *integrationSuite) TestWorkflowTerminationSignalAfterTransientDecisionStartedAndFailDecision() { id := uuid.New() - wt := "interation-workflow-transient-decision-test-type" + wt := "integration-workflow-transient-decision-test-type" tl := id identity := "worker1" diff --git a/host/gethistory_test.go b/host/gethistory_test.go index 2691b6aaaeb..1be6f75c6ff 100644 --- a/host/gethistory_test.go +++ b/host/gethistory_test.go @@ -36,9 +36,9 @@ import ( ) func (s *integrationSuite) TestGetWorkflowExecutionHistory_All() { - workflowID := "interation-get-workflow-history-events-long-poll-test-all" - workflowTypeName := "interation-get-workflow-history-events-long-poll-test-all-type" - tasklistName := "interation-get-workflow-history-events-long-poll-test-all-tasklist" + workflowID := "integration-get-workflow-history-events-long-poll-test-all" + workflowTypeName := "integration-get-workflow-history-events-long-poll-test-all-type" + tasklistName := "integration-get-workflow-history-events-long-poll-test-all-tasklist" identity := "worker1" activityName := "activity_type1" @@ -204,9 +204,9 @@ func (s *integrationSuite) TestGetWorkflowExecutionHistory_All() { } func (s *integrationSuite) TestGetWorkflowExecutionHistory_Close() { - workflowID := "interation-get-workflow-history-events-long-poll-test-close" - workflowTypeName := "interation-get-workflow-history-events-long-poll-test-close-type" - tasklistName := "interation-get-workflow-history-events-long-poll-test-close-tasklist" + workflowID := "integration-get-workflow-history-events-long-poll-test-close" + workflowTypeName := "integration-get-workflow-history-events-long-poll-test-close-type" + tasklistName := "integration-get-workflow-history-events-long-poll-test-close-tasklist" identity := "worker1" activityName := "activity_type1" @@ -363,9 +363,9 @@ func (s *integrationSuite) TestGetWorkflowExecutionHistory_Close() { } func (s *integrationSuite) TestGetWorkflowExecutionRawHistory_All() { - workflowID := "interation-get-workflow-history-raw-events-all" - workflowTypeName := "interation-get-workflow-history-raw-events-all-type" - tasklistName := "interation-get-workflow-history-raw-events-all-tasklist" + workflowID := "integration-get-workflow-history-raw-events-all" + workflowTypeName := "integration-get-workflow-history-raw-events-all-type" + tasklistName := "integration-get-workflow-history-raw-events-all-tasklist" identity := "worker1" activityName := "activity_type1" diff --git a/host/integration_test.go b/host/integration_test.go index 3c9f1703036..fd2d0bde682 100644 --- a/host/integration_test.go +++ b/host/integration_test.go @@ -279,9 +279,9 @@ StartNewExecutionLoop: } func (s *integrationSuite) TestSequentialWorkflow() { - id := "interation-sequential-workflow-test" - wt := "interation-sequential-workflow-test-type" - tl := "interation-sequential-workflow-test-tasklist" + id := "integration-sequential-workflow-test" + wt := "integration-sequential-workflow-test-type" + tl := "integration-sequential-workflow-test-tasklist" identity := "worker1" activityName := "activity_type1" @@ -389,9 +389,9 @@ func (s *integrationSuite) TestSequentialWorkflow() { } func (s *integrationSuite) TestCompleteDecisionTaskAndCreateNewOne() { - id := "interation-complete-decision-create-new-test" - wt := "interation-complete-decision-create-new-test-type" - tl := "interation-complete-decision-create-new-test-tasklist" + id := "integration-complete-decision-create-new-test" + wt := "integration-complete-decision-create-new-test-type" + tl := "integration-complete-decision-create-new-test-tasklist" identity := "worker1" workflowType := &workflow.WorkflowType{} @@ -868,6 +868,9 @@ func (s *integrationSuite) TestCronWorkflow() { memo := &workflow.Memo{ Fields: map[string][]byte{"memoKey": []byte("memoVal")}, } + searchAttr := &workflow.SearchAttributes{ + IndexedFields: map[string][]byte{"CustomKeywordField": []byte("1")}, + } request := &workflow.StartWorkflowExecutionRequest{ RequestId: common.StringPtr(uuid.New()), @@ -881,6 +884,7 @@ func (s *integrationSuite) TestCronWorkflow() { Identity: common.StringPtr(identity), CronSchedule: common.StringPtr(cronSchedule), //minimum interval by standard spec is 1m (* * * * *), use non-standard descriptor for short interval for test Memo: memo, + SearchAttributes: searchAttr, } startWorkflowTS := time.Now() @@ -978,6 +982,7 @@ func (s *integrationSuite) TestCronWorkflow() { s.Equal("cron-test-error", attributes.GetFailureReason()) s.Equal(0, len(attributes.GetLastCompletionResult())) s.Equal(memo, attributes.Memo) + s.Equal(searchAttr, attributes.SearchAttributes) events = s.getHistory(s.domainName, executions[1]) lastEvent = events[len(events)-1] @@ -987,6 +992,7 @@ func (s *integrationSuite) TestCronWorkflow() { s.Equal("", attributes.GetFailureReason()) s.Equal("cron-test-result", string(attributes.GetLastCompletionResult())) s.Equal(memo, attributes.Memo) + s.Equal(searchAttr, attributes.SearchAttributes) events = s.getHistory(s.domainName, executions[2]) lastEvent = events[len(events)-1] @@ -996,6 +1002,7 @@ func (s *integrationSuite) TestCronWorkflow() { s.Equal("cron-test-error", attributes.GetFailureReason()) s.Equal("cron-test-result", string(attributes.GetLastCompletionResult())) s.Equal(memo, attributes.Memo) + s.Equal(searchAttr, attributes.SearchAttributes) startFilter.LatestTime = common.Int64Ptr(time.Now().UnixNano()) var closedExecutions []*workflow.WorkflowExecutionInfo @@ -1046,6 +1053,108 @@ func (s *integrationSuite) TestCronWorkflow() { } } +func (s *integrationSuite) TestCronWorkflowTimeout() { + id := "integration-wf-cron-timeout-test" + wt := "integration-wf-cron-timeout-type" + tl := "integration-wf-cron-timeout-tasklist" + identity := "worker1" + cronSchedule := "@every 3s" + + workflowType := &workflow.WorkflowType{} + workflowType.Name = common.StringPtr(wt) + + taskList := &workflow.TaskList{} + taskList.Name = common.StringPtr(tl) + + memo := &workflow.Memo{ + Fields: map[string][]byte{"memoKey": []byte("memoVal")}, + } + searchAttr := &workflow.SearchAttributes{ + IndexedFields: map[string][]byte{"CustomKeywordField": []byte("1")}, + } + + request := &workflow.StartWorkflowExecutionRequest{ + RequestId: common.StringPtr(uuid.New()), + Domain: common.StringPtr(s.domainName), + WorkflowId: common.StringPtr(id), + WorkflowType: workflowType, + TaskList: taskList, + Input: nil, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1), // set workflow timeout to 1s + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1), + Identity: common.StringPtr(identity), + CronSchedule: common.StringPtr(cronSchedule), //minimum interval by standard spec is 1m (* * * * *), use non-standard descriptor for short interval for test + Memo: memo, + SearchAttributes: searchAttr, + } + + we, err0 := s.engine.StartWorkflowExecution(createContext(), request) + s.Nil(err0) + + s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(*we.RunId)) + + var executions []*workflow.WorkflowExecution + dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, + previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) { + + executions = append(executions, execution) + return nil, []*workflow.Decision{ + { + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeStartTimer), + + StartTimerDecisionAttributes: &workflow.StartTimerDecisionAttributes{ + TimerId: common.StringPtr("timer-id"), + StartToFireTimeoutSeconds: common.Int64Ptr(5), + }, + }}, nil + } + + poller := &TaskPoller{ + Engine: s.engine, + Domain: s.domainName, + TaskList: taskList, + Identity: identity, + DecisionHandler: dtHandler, + Logger: s.Logger, + T: s.T(), + } + + _, err := poller.PollAndProcessDecisionTask(false, false) + s.True(err == nil, err) + + time.Sleep(1 * time.Second) // wait for workflow timeout + + // check when workflow timeout, continueAsNew event contains expected fields + events := s.getHistory(s.domainName, executions[0]) + lastEvent := events[len(events)-1] + s.Equal(workflow.EventTypeWorkflowExecutionContinuedAsNew, lastEvent.GetEventType()) + attributes := lastEvent.WorkflowExecutionContinuedAsNewEventAttributes + s.Equal(workflow.ContinueAsNewInitiatorCronSchedule, attributes.GetInitiator()) + s.Equal("cadenceInternal:Timeout START_TO_CLOSE", attributes.GetFailureReason()) + s.Equal(memo, attributes.Memo) + s.Equal(searchAttr, attributes.SearchAttributes) + + _, err = poller.PollAndProcessDecisionTask(false, false) + s.True(err == nil, err) + + // check new run contains expected fields + events = s.getHistory(s.domainName, executions[1]) + firstEvent := events[0] + s.Equal(workflow.EventTypeWorkflowExecutionStarted, firstEvent.GetEventType()) + startAttributes := firstEvent.WorkflowExecutionStartedEventAttributes + s.Equal(memo, startAttributes.Memo) + s.Equal(searchAttr, startAttributes.SearchAttributes) + + // terminate cron + terminateErr := s.engine.TerminateWorkflowExecution(createContext(), &workflow.TerminateWorkflowExecutionRequest{ + Domain: common.StringPtr(s.domainName), + WorkflowExecution: &workflow.WorkflowExecution{ + WorkflowId: common.StringPtr(id), + }, + }) + s.NoError(terminateErr) +} + func (s *integrationSuite) TestSequential_UserTimers() { id := "integration-sequential-user-timers-test" wt := "integration-sequential-user-timers-test-type" diff --git a/host/queryworkflow_test.go b/host/queryworkflow_test.go index 91ac7601cd4..cb61dc1de60 100644 --- a/host/queryworkflow_test.go +++ b/host/queryworkflow_test.go @@ -37,9 +37,9 @@ import ( ) func (s *integrationSuite) TestQueryWorkflow_NonSticky() { - id := "interation-query-workflow-test-non-sticky" - wt := "interation-query-workflow-test-non-sticky-type" - tl := "interation-query-workflow-test-non-sticky-tasklist" + id := "integration-query-workflow-test-non-sticky" + wt := "integration-query-workflow-test-non-sticky-type" + tl := "integration-query-workflow-test-non-sticky-tasklist" identity := "worker1" activityName := "activity_type1" queryType := "test-query" @@ -244,9 +244,9 @@ func (s *integrationSuite) TestQueryWorkflow_NonSticky() { } func (s *integrationSuite) TestQueryWorkflow_Consistent_PiggybackQuery() { - id := "interation-query-workflow-test-consistent-piggyback-query" - wt := "interation-query-workflow-test-consistent-piggyback-query-type" - tl := "interation-query-workflow-test-consistent-piggyback-query-tasklist" + id := "integration-query-workflow-test-consistent-piggyback-query" + wt := "integration-query-workflow-test-consistent-piggyback-query-type" + tl := "integration-query-workflow-test-consistent-piggyback-query-tasklist" identity := "worker1" activityName := "activity_type1" queryType := "test-query" @@ -429,9 +429,9 @@ func (s *integrationSuite) TestQueryWorkflow_Consistent_PiggybackQuery() { } func (s *integrationSuite) TestQueryWorkflow_Consistent_Timeout() { - id := "interation-query-workflow-test-consistent-timeout" - wt := "interation-query-workflow-test-consistent-timeout-type" - tl := "interation-query-workflow-test-consistent-timeout-tasklist" + id := "integration-query-workflow-test-consistent-timeout" + wt := "integration-query-workflow-test-consistent-timeout-type" + tl := "integration-query-workflow-test-consistent-timeout-tasklist" identity := "worker1" activityName := "activity_type1" queryType := "test-query" @@ -590,9 +590,9 @@ func (s *integrationSuite) TestQueryWorkflow_Consistent_Timeout() { } func (s *integrationSuite) TestQueryWorkflow_Consistent_BlockedByStarted_NonSticky() { - id := "interation-query-workflow-test-consistent-blocked-by-started-non-sticky" - wt := "interation-query-workflow-test-consistent-blocked-by-started-non-sticky-type" - tl := "interation-query-workflow-test-consistent-blocked-by-started-non-sticky-tasklist" + id := "integration-query-workflow-test-consistent-blocked-by-started-non-sticky" + wt := "integration-query-workflow-test-consistent-blocked-by-started-non-sticky-type" + tl := "integration-query-workflow-test-consistent-blocked-by-started-non-sticky-tasklist" identity := "worker1" activityName := "activity_type1" queryType := "test-query" @@ -774,10 +774,10 @@ func (s *integrationSuite) TestQueryWorkflow_Consistent_BlockedByStarted_NonStic } func (s *integrationSuite) TestQueryWorkflow_Consistent_NewDecisionTask_Sticky() { - id := "interation-query-workflow-test-consistent-new-decision-task-sticky" - wt := "interation-query-workflow-test-consistent-new-decision-task-sticky-type" - tl := "interation-query-workflow-test-consistent-new-decision-task-sticky-tasklist" - stl := "interation-query-workflow-test-consistent-new-decision-task-sticky-tasklist-sticky" + id := "integration-query-workflow-test-consistent-new-decision-task-sticky" + wt := "integration-query-workflow-test-consistent-new-decision-task-sticky-type" + tl := "integration-query-workflow-test-consistent-new-decision-task-sticky-tasklist" + stl := "integration-query-workflow-test-consistent-new-decision-task-sticky-tasklist-sticky" identity := "worker1" activityName := "activity_type1" queryType := "test-query" @@ -993,9 +993,9 @@ func (s *integrationSuite) TestQueryWorkflow_Consistent_NewDecisionTask_Sticky() } func (s *integrationSuite) TestQueryWorkflow_BeforeFirstDecision() { - id := "interation-test-query-workflow-before-first-decision" - wt := "interation-test-query-workflow-before-first-decision-type" - tl := "interation-test-query-workflow-before-first-decision-tasklist" + id := "integration-test-query-workflow-before-first-decision" + wt := "integration-test-query-workflow-before-first-decision-type" + tl := "integration-test-query-workflow-before-first-decision-tasklist" identity := "worker1" queryType := "test-query" diff --git a/host/signalworkflow_test.go b/host/signalworkflow_test.go index 12fd36f9fad..f0d37cb0d08 100644 --- a/host/signalworkflow_test.go +++ b/host/signalworkflow_test.go @@ -226,9 +226,9 @@ func (s *integrationSuite) TestSignalWorkflow() { } func (s *integrationSuite) TestSignalWorkflow_DuplicateRequest() { - id := "interation-signal-workflow-test-duplicate" - wt := "interation-signal-workflow-test-duplicate-type" - tl := "interation-signal-workflow-test-duplicate-tasklist" + id := "integration-signal-workflow-test-duplicate" + wt := "integration-signal-workflow-test-duplicate-type" + tl := "integration-signal-workflow-test-duplicate-tasklist" identity := "worker1" activityName := "activity_type1" diff --git a/service/history/timerQueueActiveProcessor.go b/service/history/timerQueueActiveProcessor.go index bbb1e00a165..42a38b24096 100644 --- a/service/history/timerQueueActiveProcessor.go +++ b/service/history/timerQueueActiveProcessor.go @@ -697,15 +697,17 @@ func (t *timerQueueActiveProcessorImpl) processWorkflowTimeout( continueAsnewAttributes := &workflow.ContinueAsNewWorkflowExecutionDecisionAttributes{ WorkflowType: startAttributes.WorkflowType, TaskList: startAttributes.TaskList, - RetryPolicy: startAttributes.RetryPolicy, Input: startAttributes.Input, - Header: startAttributes.Header, ExecutionStartToCloseTimeoutSeconds: startAttributes.ExecutionStartToCloseTimeoutSeconds, TaskStartToCloseTimeoutSeconds: startAttributes.TaskStartToCloseTimeoutSeconds, BackoffStartIntervalInSeconds: common.Int32Ptr(int32(backoffInterval.Seconds())), + RetryPolicy: startAttributes.RetryPolicy, Initiator: continueAsNewInitiator.Ptr(), FailureReason: common.StringPtr(timeoutReason), CronSchedule: common.StringPtr(mutableState.GetExecutionInfo().CronSchedule), + Header: startAttributes.Header, + Memo: startAttributes.Memo, + SearchAttributes: startAttributes.SearchAttributes, } newMutableState, err := retryWorkflow( mutableState,