Skip to content

Commit

Permalink
sql: delete current_executions row as part of workflow deletion (uber…
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Feb 14, 2019
1 parent fc01b09 commit 6876051
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 28 deletions.
66 changes: 60 additions & 6 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ func (s *ExecutionManagerSuite) TestDeleteWorkflow() {
// TestDeleteCurrentWorkflow test
func (s *ExecutionManagerSuite) TestDeleteCurrentWorkflow() {
if s.ExecutionManager.GetName() != "cassandra" {
s.T().Skip("SQL doesn't support retention yet")
s.T().Skip("this test is only applicable for cassandra (uses TTL based deletes)")
}
finishedCurrentExecutionRetentionTTL := int32(3) // 3 seconds
domainID := "54d15308-e20e-4b91-a00f-a518a3892790"
Expand Down Expand Up @@ -864,6 +864,60 @@ func (s *ExecutionManagerSuite) TestDeleteCurrentWorkflow() {
s.NoError(err2)
}

// TestUpdateDeleteWorkflow verifies that an update workflow (with FinishExecution set to true)
// followed by DeleteWorkflowExecution clears all state associated with the workflow. The
// reason for having this test is because cassandra deletes current_executions row with TTL
// as part of the UpdateWFExecution API whereas SQL deletes current_executions entry as part of
// timer task / DeleteWorkflowExecution API - so, an update followed by delete should clear
// all state in both sql/cassandra
func (s *ExecutionManagerSuite) TestUpdateDeleteWorkflow() {
finishedCurrentExecutionRetentionTTL := int32(2)
domainID := "54d15308-e20e-4b91-a00f-a518a3892790"
workflowExecution := gen.WorkflowExecution{
WorkflowId: common.StringPtr("update-delete-workflow-test"),
RunId: common.StringPtr("6cae4054-6ba7-46d3-8755-e3c2db6f74ea"),
}

task0, err0 := s.CreateWorkflowExecution(domainID, workflowExecution, "queue1", "wType", 20, 13, nil, 3, 0, 2, nil)
s.NoError(err0)
s.NotNil(task0, "Expected non empty task identifier.")

runID0, err1 := s.GetCurrentWorkflowRunID(domainID, *workflowExecution.WorkflowId)
s.NoError(err1)
s.Equal(*workflowExecution.RunId, runID0)

info0, err2 := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err2)

updatedInfo1 := copyWorkflowExecutionInfo(info0.ExecutionInfo)
updatedInfo1.NextEventID = int64(6)
updatedInfo1.LastProcessedEvent = int64(2)
err3 := s.UpdateWorkflowExecutionAndFinish(updatedInfo1, int64(3), finishedCurrentExecutionRetentionTTL)
s.NoError(err3)

runID4, err4 := s.GetCurrentWorkflowRunID(domainID, *workflowExecution.WorkflowId)
s.NoError(err4)
s.Equal(*workflowExecution.RunId, runID4)

// simulate a timer_task deleting execution after retention
err5 := s.DeleteWorkflowExecution(info0.ExecutionInfo)
s.NoError(err5)

time.Sleep(time.Duration(finishedCurrentExecutionRetentionTTL*2) * time.Second)

runID0, err1 = s.GetCurrentWorkflowRunID(domainID, *workflowExecution.WorkflowId)
s.Error(err1)
s.Empty(runID0)
_, ok := err1.(*gen.EntityNotExistsError)
s.True(ok)

// execution record should still be there
info0, err2 = s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.Error(err2)
_, ok = err2.(*gen.EntityNotExistsError)
s.True(ok)
}

// TestGetCurrentWorkflow test
func (s *ExecutionManagerSuite) TestGetCurrentWorkflow() {
domainID := "54d15308-e20e-4b91-a00f-a518a3892790"
Expand Down Expand Up @@ -983,14 +1037,14 @@ func (s *ExecutionManagerSuite) TestTransferTasksThroughUpdate() {
s.Equal(p.TransferTaskTypeCloseExecution, task3.TaskType)
s.Equal("", task3.TargetRunID)

err8 := s.DeleteWorkflowExecution(info1)
err8 := s.CompleteTransferTask(task3.TaskID)
s.NoError(err8)

err9 := s.CompleteTransferTask(task3.TaskID)
s.NoError(err9)
_, err9 := s.CreateWorkflowExecution(domainID, newExecution, "queue1", "wType", 20, 13, nil, 3, 0, 2, nil)
s.Error(err9, "createWFExecution (brand_new) must fail when there is a previous instance of workflow state already in DB")

_, err10 := s.CreateWorkflowExecution(domainID, newExecution, "queue1", "wType", 20, 13, nil, 3, 0, 2, nil)
s.Error(err10, "Error expected.")
err10 := s.DeleteWorkflowExecution(info1)
s.NoError(err10)
}

// TestCancelTransferTaskTasks test
Expand Down
37 changes: 22 additions & 15 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,10 +566,6 @@ func (m *sqlExecutionManager) updateWorkflowExecutionTx(tx sqldb.Tx, request *p.
startVersion = request.ReplicationState.StartVersion
lastWriteVersion = request.ReplicationState.LastWriteVersion
}
if request.FinishExecution {
m.logger.Info("Finish Execution")
// TODO when finish execution, the current record should be marked with a TTL
} //else {
// this is only to update the current record
if err := continueAsNew(tx,
m.shardID,
Expand All @@ -586,7 +582,6 @@ func (m *sqlExecutionManager) updateWorkflowExecutionTx(tx sqldb.Tx, request *p.
Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Failed to update current execution. Error: %v", err),
}
}
//}
}
return nil
}
Expand Down Expand Up @@ -838,17 +833,29 @@ func (m *sqlExecutionManager) resetMutableStateTx(tx sqldb.Tx, request *p.Intern
}

func (m *sqlExecutionManager) DeleteWorkflowExecution(request *p.DeleteWorkflowExecutionRequest) error {
if _, err := m.db.DeleteFromExecutions(&sqldb.ExecutionsFilter{
ShardID: m.shardID,
DomainID: sqldb.MustParseUUID(request.DomainID),
WorkflowID: request.WorkflowID,
RunID: sqldb.MustParseUUID(request.RunID),
}); err != nil {
return &workflow.InternalServiceError{
Message: fmt.Sprintf("DeleteWorkflowExecution operation failed. Error: %v", err),
domainID := sqldb.MustParseUUID(request.DomainID)
runID := sqldb.MustParseUUID(request.RunID)
return m.txExecute("deleteWorkflowExecution", func(tx sqldb.Tx) error {
if _, err := tx.DeleteFromExecutions(&sqldb.ExecutionsFilter{
ShardID: m.shardID,
DomainID: domainID,
WorkflowID: request.WorkflowID,
RunID: runID,
}); err != nil {
return err
}
}
return nil
// its possible for a new run of the same workflow to have started after the run we are deleting
// here was finished. In that case, current_executions table will have the same workflowID but different
// runID. The following code will delete the row from current_executions if and only if the runID is
// same as the one we are trying to delete here
_, err := tx.DeleteFromCurrentExecutions(&sqldb.CurrentExecutionsFilter{
ShardID: int64(m.shardID),
DomainID: domainID,
WorkflowID: request.WorkflowID,
RunID: runID,
})
return err
})
}

func (m *sqlExecutionManager) GetCurrentExecution(request *p.GetCurrentExecutionRequest) (*p.GetCurrentExecutionResponse, error) {
Expand Down
15 changes: 8 additions & 7 deletions common/persistence/sql/storage/mysql/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,16 +277,17 @@ task_id <= ?
(shard_id, domain_id, workflow_id, run_id, create_request_id, state, close_status, start_version, last_write_version) VALUES
(:shard_id, :domain_id, :workflow_id, :run_id, :create_request_id, :state, :close_status, :start_version, :last_write_version)`

deleteCurrentExecutionQry = "DELETE FROM current_executions WHERE shard_id=? AND domain_id=? AND workflow_id=?"
deleteCurrentExecutionQry = "DELETE FROM current_executions WHERE shard_id=? AND domain_id=? AND workflow_id=? AND run_id=?"

getCurrentExecutionQry = `SELECT
shard_id, domain_id, workflow_id, run_id, create_request_id, state, close_status, start_version, last_write_version
FROM current_executions WHERE shard_id = ? AND domain_id = ? AND workflow_id = ?`

lockCurrentExecutionJoinExecutionsQry = `SELECT
ce.shard_id, ce.domain_id, ce.workflow_id, ce.run_id, ce.create_request_id, ce.state, ce.close_status, ce.start_version, e.last_write_version
FROM current_executions ce
INNER JOIN executions e ON e.shard_id = ce.shard_id AND e.domain_id = ce.domain_id AND e.workflow_id = ce.workflow_id AND e.run_id = ce.run_id
WHERE ce.shard_id = ? AND ce.domain_id = ? AND ce.workflow_id = ?
`

getCurrentExecutionQryForUpdate = getCurrentExecutionQry + " FOR UPDATE"
WHERE ce.shard_id = ? AND ce.domain_id = ? AND ce.workflow_id = ? FOR UPDATE`

lockCurrentExecutionQry = `SELECT run_id FROM current_executions WHERE
shard_id = ? AND
Expand Down Expand Up @@ -458,7 +459,7 @@ func (mdb *DB) SelectFromCurrentExecutions(filter *sqldb.CurrentExecutionsFilter

// DeleteFromCurrentExecutions deletes a single row in current_executions table
func (mdb *DB) DeleteFromCurrentExecutions(filter *sqldb.CurrentExecutionsFilter) (sql.Result, error) {
return mdb.conn.Exec(deleteCurrentExecutionQry, filter.ShardID, filter.DomainID, filter.WorkflowID)
return mdb.conn.Exec(deleteCurrentExecutionQry, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID)
}

// LockCurrentExecutions acquires a write lock on a single row in current_executions table
Expand All @@ -472,7 +473,7 @@ func (mdb *DB) LockCurrentExecutions(filter *sqldb.CurrentExecutionsFilter) (sql
// write lock on the result
func (mdb *DB) LockCurrentExecutionsJoinExecutions(filter *sqldb.CurrentExecutionsFilter) ([]sqldb.CurrentExecutionsRow, error) {
var rows []sqldb.CurrentExecutionsRow
err := mdb.conn.Select(&rows, getCurrentExecutionQryForUpdate, filter.ShardID, filter.DomainID, filter.WorkflowID)
err := mdb.conn.Select(&rows, lockCurrentExecutionJoinExecutionsQry, filter.ShardID, filter.DomainID, filter.WorkflowID)
return rows, err
}

Expand Down
8 changes: 8 additions & 0 deletions common/persistence/sql/storage/sqldb/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ type (
ShardID int64
DomainID UUID
WorkflowID string
RunID UUID
}

// BufferedEventsRow represents a row in buffered_events table
Expand Down Expand Up @@ -606,7 +607,14 @@ type (

InsertIntoCurrentExecutions(row *CurrentExecutionsRow) (sql.Result, error)
UpdateCurrentExecutions(row *CurrentExecutionsRow) (sql.Result, error)
// SelectFromCurrentExecutions returns one or more rows from current_executions table
// Required params - {shardID, domainID, workflowID}
SelectFromCurrentExecutions(filter *CurrentExecutionsFilter) (*CurrentExecutionsRow, error)
// DeleteFromCurrentExecutions deletes a single row that matches the filter criteria
// If a row exist, that row will be deleted and this method will return success
// If there is no row matching the filter criteria, this method will still return success
// Callers can check the output of Result.RowsAffected() to see if a row was deleted or not
// Required params - {shardID, domainID, workflowID, runID}
DeleteFromCurrentExecutions(filter *CurrentExecutionsFilter) (sql.Result, error)
LockCurrentExecutions(filter *CurrentExecutionsFilter) (UUID, error)

Expand Down

0 comments on commit 6876051

Please sign in to comment.