Skip to content

Commit

Permalink
Reduce tombstones for open workflow executions (uber#395)
Browse files Browse the repository at this point in the history
  • Loading branch information
madhuravi authored Nov 1, 2017
1 parent df770f5 commit b3d8ab3
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 16 deletions.
5 changes: 4 additions & 1 deletion common/persistence/cassandraVisibilityPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ import (
const (
domainPartition = 0
defaultCloseTTLSeconds = 86400
openExecutionTTLBuffer = int64(20)
)

const (
templateCreateWorkflowExecutionStarted = `INSERT INTO open_executions (` +
`domain_id, domain_partition, workflow_id, run_id, start_time, workflow_type_name) ` +
`VALUES (?, ?, ?, ?, ?, ?)`
`VALUES (?, ?, ?, ?, ?, ?) using TTL ?`

templateDeleteWorkflowExecutionStarted = `DELETE FROM open_executions ` +
`WHERE domain_id = ? ` +
Expand Down Expand Up @@ -149,13 +150,15 @@ func (v *cassandraVisibilityPersistence) Close() {

func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted(
request *RecordWorkflowExecutionStartedRequest) error {
ttl := request.WorkflowTimeout + openExecutionTTLBuffer
query := v.session.Query(templateCreateWorkflowExecutionStarted,
request.DomainUUID,
domainPartition,
*request.Execution.WorkflowId,
*request.Execution.RunId,
common.UnixNanoToCQLTimestamp(request.StartTimestamp),
request.WorkflowTypeName,
ttl,
)
query = query.WithTimestamp(common.UnixNanoToCQLTimestamp(request.StartTimestamp))
err := query.Exec()
Expand Down
1 change: 1 addition & 0 deletions common/persistence/visibilityInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type (
Execution s.WorkflowExecution
WorkflowTypeName string
StartTimestamp int64
WorkflowTimeout int64
}

// RecordWorkflowExecutionClosedRequest is used to add a record of a newly
Expand Down
5 changes: 3 additions & 2 deletions schema/visibility/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ CREATE TABLE open_executions (
PRIMARY KEY ((domain_id, domain_partition), start_time, run_id)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND COMPACTION = {
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy',
'tombstone_threshold': 0.6
}
AND GC_GRACE_SECONDS = 172800;
AND GC_GRACE_SECONDS = 60;


CREATE INDEX open_by_workflow_id ON open_executions (workflow_id);
Expand Down
8 changes: 8 additions & 0 deletions schema/visibility/versioned/v0.2/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"CurrVersion": "0.2",
"MinCompatibleVersion": "0.2",
"Description": "lower gc_grace_seconds and tweak compaction for open_executions table",
"SchemaUpdateCqlFiles": [
"reduce_open_workflow_tombstones.cql"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE open_executions WITH gc_grace_seconds=60 AND compaction = {
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy',
'tombstone_threshold': 0.4
};
1 change: 1 addition & 0 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2580,6 +2580,7 @@ func copyWorkflowExecutionInfo(sourceInfo *persistence.WorkflowExecutionInfo) *p
CompletionEvent: sourceInfo.CompletionEvent,
TaskList: sourceInfo.TaskList,
WorkflowTypeName: sourceInfo.WorkflowTypeName,
WorkflowTimeout: sourceInfo.WorkflowTimeout,
DecisionTimeoutValue: sourceInfo.DecisionTimeoutValue,
ExecutionContext: sourceInfo.ExecutionContext,
State: sourceInfo.State,
Expand Down
8 changes: 5 additions & 3 deletions service/history/transferQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (t *transferQueueProcessorImpl) processDecisionTask(task *persistence.Trans
}

if task.ScheduleID == firstEventID+1 {
err = t.recordWorkflowExecutionStarted(execution, task, wfTypeName, startTimestamp)
err = t.recordWorkflowExecutionStarted(execution, task, wfTypeName, startTimestamp, timeout)
}

if err != nil {
Expand Down Expand Up @@ -695,13 +695,15 @@ func (t *transferQueueProcessorImpl) processStartChildExecution(task *persistenc
}

func (t *transferQueueProcessorImpl) recordWorkflowExecutionStarted(
execution workflow.WorkflowExecution, task *persistence.TransferTaskInfo, wfTypeName string, startTimestamp time.Time) error {

execution workflow.WorkflowExecution, task *persistence.TransferTaskInfo, wfTypeName string,
startTimestamp time.Time, timeout int32,
) error {
err := t.visibilityManager.RecordWorkflowExecutionStarted(&persistence.RecordWorkflowExecutionStartedRequest{
DomainUUID: task.DomainID,
Execution: execution,
WorkflowTypeName: wfTypeName,
StartTimestamp: startTimestamp.UnixNano(),
WorkflowTimeout: int64(timeout),
})

return err
Expand Down
30 changes: 21 additions & 9 deletions service/history/transferQueueProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ func (s *transferQueueProcessorSuite) TestDeleteExecutionTransferTasks() {
}
taskList := "delete-execution-transfertasks-queue"
identity := "delete-execution-transfertasks-test"
task0, err0 := s.CreateWorkflowExecution(domainID, workflowExecution, taskList, "wType", 20, 10, nil, 3, 0, 2, nil)
wtimeout := int32(20)
task0, err0 := s.CreateWorkflowExecution(
domainID, workflowExecution, taskList, "wType", wtimeout, 10, nil, 3, 0, 2, nil,
)
s.Nil(err0, "No error expected.")
s.NotEmpty(task0, "Expected non empty task identifier.")

Expand All @@ -205,9 +208,13 @@ workerPump:
select {
case task := <-tasksCh:
if task.TaskType == persistence.TransferTaskTypeDecisionTask {
s.mockMatching.On("AddDecisionTask", mock.Anything, createAddRequestFromTask(task, 0)).Once().Return(nil)
s.mockMatching.On("AddDecisionTask", mock.Anything, createAddRequestFromTask(task, wtimeout)).Once().Return(nil)
if task.ScheduleID == firstEventID+1 {
s.mockVisibilityMgr.On("RecordWorkflowExecutionStarted", mock.Anything).Once().Return(nil)
s.mockVisibilityMgr.On("RecordWorkflowExecutionStarted", mock.MatchedBy(
func(request *persistence.RecordWorkflowExecutionStartedRequest) bool {
return request.WorkflowTimeout == int64(wtimeout)
},
)).Once().Return(nil)
}
} else if task.TaskType == persistence.TransferTaskTypeDeleteExecution {
s.mockMetadataMgr.On("GetDomain", mock.Anything).Once().Return(&persistence.GetDomainResponse{
Expand Down Expand Up @@ -238,7 +245,8 @@ func (s *transferQueueProcessorSuite) TestDeleteExecutionTransferTasksDomainNotE
}
taskList := "delete-execution-transfertasks-domain-queue"
identity := "delete-execution-transfertasks-domain-test"
task0, err0 := s.CreateWorkflowExecution(domainID, workflowExecution, taskList, "wType", 20, 10, nil, 3, 0, 2, nil)
wtimeout := int32(20)
task0, err0 := s.CreateWorkflowExecution(domainID, workflowExecution, taskList, "wType", wtimeout, 10, nil, 3, 0, 2, nil)
s.Nil(err0, "No error expected.")
s.NotEmpty(task0, "Expected non empty task identifier.")

Expand All @@ -263,7 +271,7 @@ workerPump:
select {
case task := <-tasksCh:
if task.TaskType == persistence.TransferTaskTypeDecisionTask {
s.mockMatching.On("AddDecisionTask", mock.Anything, createAddRequestFromTask(task, 0)).Once().Return(nil)
s.mockMatching.On("AddDecisionTask", mock.Anything, createAddRequestFromTask(task, wtimeout)).Once().Return(nil)
if task.ScheduleID == firstEventID+1 {
s.mockVisibilityMgr.On("RecordWorkflowExecutionStarted", mock.Anything).Once().Return(nil)
}
Expand Down Expand Up @@ -292,7 +300,8 @@ func (s *transferQueueProcessorSuite) TestCancelRemoteExecutionTransferTasks() {
targetDomain := "f2bfaab6-7e8b-4fac-9a62-17da8d37becb"
targetWorkflowID := "target-workflow_id"
targetRunID := "0d00698f-08e1-4d36-a3e2-3bf109f5d2d6"
task0, err0 := s.CreateWorkflowExecution(domainID, workflowExecution, taskList, "wType", 20, 10, nil, 3, 0, 2, nil)
wtimeout := int32(20)
task0, err0 := s.CreateWorkflowExecution(domainID, workflowExecution, taskList, "wType", wtimeout, 10, nil, 3, 0, 2, nil)
s.Nil(err0, "No error expected.")
s.NotEmpty(task0, "Expected non empty task identifier.")

Expand Down Expand Up @@ -324,7 +333,7 @@ workerPump:
case task := <-tasksCh:
s.logger.Infof("Processing transfer task type: %v", task.TaskType)
if task.TaskType == persistence.TransferTaskTypeDecisionTask {
s.mockMatching.On("AddDecisionTask", mock.Anything, createAddRequestFromTask(task, 0)).Once().Return(nil)
s.mockMatching.On("AddDecisionTask", mock.Anything, createAddRequestFromTask(task, wtimeout)).Once().Return(nil)
if task.ScheduleID == firstEventID+1 {
s.mockVisibilityMgr.On("RecordWorkflowExecutionStarted", mock.Anything).Once().Return(nil)
}
Expand All @@ -351,7 +360,10 @@ func (s *transferQueueProcessorSuite) TestCancelRemoteExecutionTransferTask_Requ
targetWorkflowID := "target-workflow_id"
targetRunID := "0d00698f-08e1-4d36-a3e2-3bf109f5d2d6"

task0, err0 := s.CreateWorkflowExecution(domainID, workflowExecution, taskList, "wType", 20, 10, nil, 3, 0, 2, nil)
wtimeout := int32(20)
task0, err0 := s.CreateWorkflowExecution(
domainID, workflowExecution, taskList, "wType", wtimeout, 10, nil, 3, 0, 2, nil,
)
s.Nil(err0, "No error expected.")
s.NotEmpty(task0, "Expected non empty task identifier.")

Expand Down Expand Up @@ -384,7 +396,7 @@ workerPump:
s.logger.Infof("Processing transfer task type: %v, TaskID: %v, Task.ScheduleID: %v", task.TaskType,
task.TaskID, task.ScheduleID)
if task.TaskType == persistence.TransferTaskTypeDecisionTask {
s.mockMatching.On("AddDecisionTask", mock.Anything, createAddRequestFromTask(task, 0)).Once().Return(nil)
s.mockMatching.On("AddDecisionTask", mock.Anything, createAddRequestFromTask(task, wtimeout)).Once().Return(nil)
if task.ScheduleID == firstEventID+1 {
s.mockVisibilityMgr.On("RecordWorkflowExecutionStarted", mock.Anything).Once().Return(nil)
}
Expand Down
2 changes: 1 addition & 1 deletion tools/cassandra/updateTask.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,6 @@ func readSchemaDir(dir string, startVer string, endVer string) ([]string, error)
// sets up a temporary dryrun keyspace for
// executing the cassandra schema update
func setupDryrunKeyspace(config *UpdateSchemaConfig) error {

client, err := newCQLClient(config.CassHosts, config.CassPort, config.CassUser, config.CassPassword, systemKeyspace)
if err != nil {
return err
Expand All @@ -372,6 +371,7 @@ func setupDryrunKeyspace(config *UpdateSchemaConfig) error {
setupConfig := &SetupSchemaConfig{
BaseConfig: BaseConfig{
CassHosts: config.CassHosts,
CassPort: config.CassPort,
CassKeyspace: dryrunKeyspace,
},
Overwrite: true,
Expand Down

0 comments on commit b3d8ab3

Please sign in to comment.