-
Notifications
You must be signed in to change notification settings - Fork 809
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix decision task leak with workflow timeout as ttl #373
Conversation
taskList := &workflow.TaskList{ | ||
Name: &task.TaskList, | ||
} | ||
|
||
// get workflow timeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this value to be passed through transfer task instead of requiring loading of a mutable state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Loading mutable state here is not an additional call to DB, so I kept this solution, and optimized the code to avoid duplicate call.
schema/cadence/schema.cql
Outdated
}; | ||
|
||
ALTER TYPE workflow_execution ADD workflow_timeout int; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to call ALTER TYPE here. You can just directly update the workflow_execution TYPE to have the new column.
@@ -0,0 +1 @@ | |||
ALTER TYPE workflow_execution ADD workflow_timeout int; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you give this file a meaningful name like workflow_timeout.cql.
@@ -73,11 +73,11 @@ func (s *cassandraPersistenceSuite) TestPersistenceStartWorkflow() { | |||
WorkflowId: common.StringPtr("start-workflow-test"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also update existing test to assert workflow_timeout after reading from DB matches the expectation.
Also have a test after to validate value after update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -1070,6 +1071,7 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent(decisionCompletedEventID int | |||
InitiatedID: initiatedID, | |||
TaskList: newStateBuilder.executionInfo.TaskList, | |||
WorkflowTypeName: newStateBuilder.executionInfo.WorkflowTypeName, | |||
WorkflowTimeout: newStateBuilder.executionInfo.WorkflowTimeout, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you have a test to make sure WorkflowTimeout is valid after new run of execution is started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will work on adding an integration test for this.
@@ -99,7 +99,7 @@ func (s *transferQueueProcessorSuite) TestSingleDecisionTask() { | |||
workflowExecution := workflow.WorkflowExecution{WorkflowId: common.StringPtr("single-decisiontask-test"), | |||
RunId: common.StringPtr("0d00698f-08e1-4d36-a3e2-3bf109f5d2d6")} | |||
taskList := "single-decisiontask-queue" | |||
task0, err0 := s.CreateWorkflowExecution(domainID, workflowExecution, taskList, "wType", 10, nil, 3, 0, 2, nil) | |||
task0, err0 := s.CreateWorkflowExecution(domainID, workflowExecution, taskList, "wType", 20, 10, nil, 3, 0, 2, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add an assertion to validate scheduleToStartTimeout is expected when AddDecisionTask is called on matching engine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, line 112
Execution: &execution, | ||
ScheduleId: &scheduleID, | ||
TaskList: taskList, | ||
ScheduleToStartTimeoutSeconds: common.Int32Ptr(1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could cause test flakiness if we are reading these tasks back. Do we use the same timeout for ActivityTask?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do use same timeout as ActivityTask, should be fine.
@@ -1603,7 +1603,7 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() { | |||
for i := 0; i < 10; i++ { | |||
err := poller.pollAndProcessDecisionTask(false, false) | |||
s.logger.Infof("pollAndProcessDecisionTask: %v", err) | |||
s.Nil(err, i) | |||
s.Nil(err, string(i)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is most likely not what you want, string(3) != "3".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this! Fix in #387
No description provided.