Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix decision task leak with workflow timeout as ttl #373

Merged
merged 3 commits into from
Oct 17, 2017
Merged

Fix decision task leak with workflow timeout as ttl #373

merged 3 commits into from
Oct 17, 2017

Conversation

vancexu
Copy link
Contributor

@vancexu vancexu commented Oct 10, 2017

No description provided.

taskList := &workflow.TaskList{
Name: &task.TaskList,
}

// get workflow timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this value to be passed through transfer task instead of requiring loading of a mutable state?

Copy link
Contributor Author

@vancexu vancexu Oct 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loading mutable state here is not an additional call to DB, so I kept this solution, and optimized the code to avoid duplicate call.

};

ALTER TYPE workflow_execution ADD workflow_timeout int;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to call ALTER TYPE here. You can just directly update the workflow_execution TYPE to have the new column.

@@ -0,0 +1 @@
ALTER TYPE workflow_execution ADD workflow_timeout int;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give this file a meaningful name like workflow_timeout.cql.

@@ -73,11 +73,11 @@ func (s *cassandraPersistenceSuite) TestPersistenceStartWorkflow() {
WorkflowId: common.StringPtr("start-workflow-test"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also update existing test to assert workflow_timeout after reading from DB matches the expectation.
Also have a test after to validate value after update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -1070,6 +1071,7 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent(decisionCompletedEventID int
InitiatedID: initiatedID,
TaskList: newStateBuilder.executionInfo.TaskList,
WorkflowTypeName: newStateBuilder.executionInfo.WorkflowTypeName,
WorkflowTimeout: newStateBuilder.executionInfo.WorkflowTimeout,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you have a test to make sure WorkflowTimeout is valid after new run of execution is started.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will work on adding an integration test for this.

@@ -99,7 +99,7 @@ func (s *transferQueueProcessorSuite) TestSingleDecisionTask() {
workflowExecution := workflow.WorkflowExecution{WorkflowId: common.StringPtr("single-decisiontask-test"),
RunId: common.StringPtr("0d00698f-08e1-4d36-a3e2-3bf109f5d2d6")}
taskList := "single-decisiontask-queue"
task0, err0 := s.CreateWorkflowExecution(domainID, workflowExecution, taskList, "wType", 10, nil, 3, 0, 2, nil)
task0, err0 := s.CreateWorkflowExecution(domainID, workflowExecution, taskList, "wType", 20, 10, nil, 3, 0, 2, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an assertion to validate scheduleToStartTimeout is expected when AddDecisionTask is called on matching engine.

Copy link
Contributor Author

@vancexu vancexu Oct 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, line 112

Execution: &execution,
ScheduleId: &scheduleID,
TaskList: taskList,
ScheduleToStartTimeoutSeconds: common.Int32Ptr(1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could cause test flakiness if we are reading these tasks back. Do we use the same timeout for ActivityTask?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do use same timeout as ActivityTask, should be fine.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.002%) to 66.436% when pulling a8abbf1 on dtleak into 73ffdeb on master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.01%) to 66.371% when pulling adb45fd on dtleak into b388ca7 on master.

@vancexu vancexu merged commit cbc769d into master Oct 17, 2017
@vancexu vancexu deleted the dtleak branch October 17, 2017 22:35
@@ -1603,7 +1603,7 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() {
for i := 0; i < 10; i++ {
err := poller.pollAndProcessDecisionTask(false, false)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err, i)
s.Nil(err, string(i))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is most likely not what you want, string(3) != "3".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this! Fix in #387

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants