Skip to content

Commit

Permalink
V0.3.12 patch (uber#829)
Browse files Browse the repository at this point in the history
* make transfer / timer / replication persistence layer accept page token

* get task API on the cassandra side should use page size, not limit

* apply pagination token to timer ack manager

* move duplicated transfer code to one place

* modify test to force pagination

* bugfix: queue ack manager should call processor's complete task fn

* do not keep the lock when doing query
  • Loading branch information
wxing1292 authored Jun 9, 2018
1 parent 476c9bb commit 8d47f5a
Show file tree
Hide file tree
Showing 21 changed files with 653 additions and 449 deletions.
25 changes: 17 additions & 8 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ const (
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id > ? ` +
`and task_id <= ? LIMIT ?`
`and task_id <= ?`

templateGetReplicationTasksQuery = `SELECT replication ` +
`FROM executions ` +
Expand All @@ -697,7 +697,7 @@ const (
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id > ? ` +
`and task_id <= ? LIMIT ?`
`and task_id <= ?`

templateCompleteTransferTaskQuery = `DELETE FROM executions ` +
`WHERE shard_id = ? ` +
Expand All @@ -716,7 +716,7 @@ const (
`and workflow_id = ?` +
`and run_id = ?` +
`and visibility_ts >= ? ` +
`and visibility_ts < ? LIMIT ?`
`and visibility_ts < ?`

templateCompleteTimerTaskQuery = `DELETE FROM executions ` +
`WHERE shard_id = ? ` +
Expand All @@ -742,7 +742,7 @@ const (
`and task_list_type = ? ` +
`and type = ? ` +
`and task_id > ? ` +
`and task_id <= ? LIMIT ?`
`and task_id <= ?`

templateCompleteTaskQuery = `DELETE FROM tasks ` +
`WHERE domain_id = ? ` +
Expand Down Expand Up @@ -1881,7 +1881,7 @@ func (d *cassandraPersistence) GetTransferTasks(request *GetTransferTasksRequest
defaultVisibilityTimestamp,
request.ReadLevel,
request.MaxReadLevel,
request.BatchSize)
).PageSize(request.BatchSize).PageState(request.NextPageToken)

iter := query.Iter()
if iter == nil {
Expand All @@ -1899,6 +1899,9 @@ func (d *cassandraPersistence) GetTransferTasks(request *GetTransferTasksRequest

response.Tasks = append(response.Tasks, t)
}
nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)

if err := iter.Close(); err != nil {
return nil, &workflow.InternalServiceError{
Expand All @@ -1922,7 +1925,7 @@ func (d *cassandraPersistence) GetReplicationTasks(request *GetReplicationTasksR
defaultVisibilityTimestamp,
request.ReadLevel,
request.MaxReadLevel,
request.BatchSize)
).PageSize(request.BatchSize).PageState(request.NextPageToken)

iter := query.Iter()
if iter == nil {
Expand All @@ -1940,6 +1943,9 @@ func (d *cassandraPersistence) GetReplicationTasks(request *GetReplicationTasksR

response.Tasks = append(response.Tasks, t)
}
nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)

if err := iter.Close(); err != nil {
return nil, &workflow.InternalServiceError{
Expand Down Expand Up @@ -2275,7 +2281,7 @@ func (d *cassandraPersistence) GetTasks(request *GetTasksRequest) (*GetTasksResp
rowTypeTask,
request.ReadLevel,
request.MaxReadLevel,
request.BatchSize)
).PageSize(request.BatchSize)

iter := query.Iter()
if iter == nil {
Expand Down Expand Up @@ -2348,7 +2354,7 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksReq
rowTypeTimerRunID,
minTimestamp,
maxTimestamp,
request.BatchSize)
).PageSize(request.BatchSize).PageState(request.NextPageToken)

iter := query.Iter()
if iter == nil {
Expand All @@ -2366,6 +2372,9 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksReq

response.Timers = append(response.Timers, t)
}
nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)

if err := iter.Close(); err != nil {
if isThrottlingError(err) {
Expand Down
102 changes: 83 additions & 19 deletions common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func (s *cassandraPersistenceSuite) TestTransferTasksThroughUpdate() {
s.Nil(err0, "No error expected.")
s.NotEmpty(task0, "Expected non empty task identifier.")

tasks1, err1 := s.GetTransferTasks(1)
tasks1, err1 := s.GetTransferTasks(1, false)
s.Nil(err1, "No error expected.")
s.NotNil(tasks1, "expected valid list of tasks.")
s.Equal(1, len(tasks1), "Expected 1 decision task.")
Expand All @@ -429,7 +429,7 @@ func (s *cassandraPersistenceSuite) TestTransferTasksThroughUpdate() {
err2 := s.UpdateWorkflowExecution(updatedInfo, nil, []int64{int64(4)}, int64(3), nil, nil, nil, nil, nil, nil)
s.Nil(err2, "No error expected.")

tasks2, err1 := s.GetTransferTasks(1)
tasks2, err1 := s.GetTransferTasks(1, false)
s.Nil(err1, "No error expected.")
s.NotNil(tasks2, "expected valid list of tasks.")
s.Equal(1, len(tasks2), "Expected 1 decision task.")
Expand Down Expand Up @@ -461,7 +461,7 @@ func (s *cassandraPersistenceSuite) TestTransferTasksThroughUpdate() {
s.Nil(err6)
s.Equal(*workflowExecution.RunId, runID6)

tasks3, err7 := s.GetTransferTasks(1)
tasks3, err7 := s.GetTransferTasks(1, false)
s.Nil(err7, "No error expected.")
s.NotNil(tasks3, "expected valid list of tasks.")
s.Equal(1, len(tasks3), "Expected 1 decision task.")
Expand Down Expand Up @@ -493,7 +493,7 @@ func (s *cassandraPersistenceSuite) TestCancelTransferTaskTasks() {
s.Nil(err, "No error expected.")
s.NotEmpty(task0, "Expected non empty task identifier.")

taskD, err := s.GetTransferTasks(1)
taskD, err := s.GetTransferTasks(1, false)
s.Equal(1, len(taskD), "Expected 1 decision task.")
err = s.CompleteTransferTask(taskD[0].TaskID)
s.Nil(err)
Expand All @@ -519,7 +519,7 @@ func (s *cassandraPersistenceSuite) TestCancelTransferTaskTasks() {
err = s.UpdateWorkflowExecutionWithTransferTasks(updatedInfo1, int64(3), transferTasks, nil)
s.Nil(err, "No error expected.")

tasks1, err := s.GetTransferTasks(1)
tasks1, err := s.GetTransferTasks(1, false)
s.Nil(err, "No error expected.")
s.NotNil(tasks1, "expected valid list of tasks.")
s.Equal(1, len(tasks1), "Expected 1 cancel task.")
Expand Down Expand Up @@ -558,7 +558,7 @@ func (s *cassandraPersistenceSuite) TestCancelTransferTaskTasks() {
err = s.UpdateWorkflowExecutionWithTransferTasks(updatedInfo2, int64(3), transferTasks, nil)
s.Nil(err, "No error expected.")

tasks2, err := s.GetTransferTasks(1)
tasks2, err := s.GetTransferTasks(1, false)
s.Nil(err, "No error expected.")
s.NotNil(tasks2, "expected valid list of tasks.")
s.Equal(1, len(tasks2), "Expected 1 cancel task.")
Expand Down Expand Up @@ -587,7 +587,7 @@ func (s *cassandraPersistenceSuite) TestSignalTransferTaskTasks() {
s.Nil(err, "No error expected.")
s.NotEmpty(task0, "Expected non empty task identifier.")

taskD, err := s.GetTransferTasks(1)
taskD, err := s.GetTransferTasks(1, false)
s.Equal(1, len(taskD), "Expected 1 decision task.")
err = s.CompleteTransferTask(taskD[0].TaskID)
s.Nil(err)
Expand All @@ -613,7 +613,7 @@ func (s *cassandraPersistenceSuite) TestSignalTransferTaskTasks() {
err = s.UpdateWorkflowExecutionWithTransferTasks(updatedInfo1, int64(3), transferTasks, nil)
s.Nil(err, "No error expected.")

tasks1, err := s.GetTransferTasks(1)
tasks1, err := s.GetTransferTasks(1, false)
s.Nil(err, "No error expected.")
s.NotNil(tasks1, "expected valid list of tasks.")
s.Equal(1, len(tasks1), "Expected 1 cancel task.")
Expand Down Expand Up @@ -652,7 +652,7 @@ func (s *cassandraPersistenceSuite) TestSignalTransferTaskTasks() {
err = s.UpdateWorkflowExecutionWithTransferTasks(updatedInfo2, int64(3), transferTasks, nil)
s.Nil(err, "No error expected.")

tasks2, err := s.GetTransferTasks(1)
tasks2, err := s.GetTransferTasks(1, false)
s.Nil(err, "No error expected.")
s.NotNil(tasks2, "expected valid list of tasks.")
s.Equal(1, len(tasks2), "Expected 1 cancel task.")
Expand Down Expand Up @@ -826,6 +826,70 @@ func (s *cassandraPersistenceSuite) TestLeaseAndUpdateTaskList_Sticky() {
s.NoError(err) // because update with ttl doesn't check rangeID
}

func (s *cassandraPersistenceSuite) TestReplicationTasks() {
domainID := "2466d7de-6602-4ad8-b939-fb8f8c36c711"
workflowExecution := gen.WorkflowExecution{
WorkflowId: common.StringPtr("get-replication-tasks-test"),
RunId: common.StringPtr("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"),
}

task0, err := s.CreateWorkflowExecution(domainID, workflowExecution, "queue1", "wType", 20, 13, nil, 3, 0, 2, nil)
s.Nil(err, "No error expected.")
s.NotEmpty(task0, "Expected non empty task identifier.")
taskD, err := s.GetTransferTasks(1, false)
s.Equal(1, len(taskD), "Expected 1 decision task.")
err = s.CompleteTransferTask(taskD[0].TaskID)
s.Nil(err)

state1, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.Nil(err, "No error expected.")
info1 := state1.ExecutionInfo
s.NotNil(info1, "Valid Workflow info expected.")
updatedInfo1 := copyWorkflowExecutionInfo(info1)

replicationTasks := []Task{
&HistoryReplicationTask{
TaskID: s.GetNextSequenceNumber(),
FirstEventID: int64(1),
NextEventID: int64(3),
Version: 123,
LastReplicationInfo: map[string]*ReplicationInfo{
"dc1": &ReplicationInfo{
Version: int64(3),
LastEventID: int64(1),
},
},
},
&HistoryReplicationTask{
TaskID: s.GetNextSequenceNumber(),
FirstEventID: int64(1),
NextEventID: int64(3),
Version: 456,
LastReplicationInfo: map[string]*ReplicationInfo{
"dc1": &ReplicationInfo{
Version: int64(3),
LastEventID: int64(1),
},
},
},
}
err = s.UpdateWorklowStateAndReplication(updatedInfo1, nil, nil, nil, int64(3), replicationTasks)
s.Nil(err, "No error expected.")

repTasks, err := s.GetReplicationTasks(1, true)
s.Nil(err)
s.Equal(len(replicationTasks), len(repTasks))

for index := range replicationTasks {
s.Equal(replicationTasks[index].GetTaskID(), repTasks[index].GetTaskID())
s.Equal(replicationTasks[index].GetType(), repTasks[index].GetTaskType())
s.Equal(replicationTasks[index].GetVersion(), repTasks[index].GetVersion())

err = s.CompleteReplicationTask(repTasks[index].GetTaskID())
s.Nil(err, "No error expected.")
}
}

func (s *cassandraPersistenceSuite) TestTransferTasks() {
domainID := "8bfb47be-5b57-4d55-9109-5fb35e20b1d7"
workflowExecution := gen.WorkflowExecution{
Expand All @@ -838,7 +902,7 @@ func (s *cassandraPersistenceSuite) TestTransferTasks() {
s.Nil(err0, "No error expected.")
s.NotEmpty(task0, "Expected non empty task identifier.")

tasks1, err1 := s.GetTransferTasks(1)
tasks1, err1 := s.GetTransferTasks(1, false)
s.Nil(err1, "No error expected.")
s.NotNil(tasks1, "expected valid list of tasks.")
s.Equal(1, len(tasks1), "Expected 1 decision task.")
Expand Down Expand Up @@ -878,7 +942,7 @@ func (s *cassandraPersistenceSuite) TestTransferTasks() {
err2 := s.UpdateWorklowStateAndReplication(updatedInfo, nil, nil, nil, int64(3), tasks)
s.Nil(err2, "No error expected.")

txTasks, err1 := s.GetTransferTasks(100)
txTasks, err1 := s.GetTransferTasks(1, true) // use page size one to force pagination
s.Nil(err1, "No error expected.")
s.NotNil(txTasks, "expected valid list of tasks.")
s.Equal(len(tasks), len(txTasks))
Expand Down Expand Up @@ -913,7 +977,7 @@ func (s *cassandraPersistenceSuite) TestTransferTasks() {
err2 = s.CompleteTransferTask(txTasks[5].TaskID)
s.Nil(err2, "No error expected.")

txTasks, err2 = s.GetTransferTasks(100)
txTasks, err2 = s.GetTransferTasks(100, false)
s.Nil(err2, "No error expected.")
s.Empty(txTasks, "expected empty task list.")
}
Expand Down Expand Up @@ -947,7 +1011,7 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
err2 := s.UpdateWorkflowExecution(updatedInfo, []int64{int64(4)}, nil, int64(3), tasks, nil, nil, nil, nil, nil)
s.Nil(err2, "No error expected.")

timerTasks, err1 := s.GetTimerIndexTasks()
timerTasks, err1 := s.GetTimerIndexTasks(1, true) // use page size one to force pagination
s.Nil(err1, "No error expected.")
s.NotNil(timerTasks, "expected valid list of tasks.")
s.Equal(len(tasks), len(timerTasks))
Expand Down Expand Up @@ -978,7 +1042,7 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
err2 = s.CompleteTimerTask(timerTasks[4].VisibilityTimestamp, timerTasks[4].TaskID)
s.Nil(err2, "No error expected.")

timerTasks2, err2 := s.GetTimerIndexTasks()
timerTasks2, err2 := s.GetTimerIndexTasks(100, false)
s.Nil(err2, "No error expected.")
s.Empty(timerTasks2, "expected empty task list.")
}
Expand Down Expand Up @@ -1590,7 +1654,7 @@ func (s *cassandraPersistenceSuite) TestReplicationTransferTaskTasks() {
s.Nil(err, "No error expected.")
s.NotEmpty(task0, "Expected non empty task identifier.")

taskD, err := s.GetTransferTasks(1)
taskD, err := s.GetTransferTasks(1, false)
s.Equal(1, len(taskD), "Expected 1 decision task.")
err = s.CompleteTransferTask(taskD[0].TaskID)
s.Nil(err)
Expand Down Expand Up @@ -1620,7 +1684,7 @@ func (s *cassandraPersistenceSuite) TestReplicationTransferTaskTasks() {
err = s.UpdateWorklowStateAndReplication(updatedInfo1, nil, nil, nil, int64(3), replicationTasks)
s.Nil(err, "No error expected.")

tasks1, err := s.GetReplicationTasks(1)
tasks1, err := s.GetReplicationTasks(1, false)
s.Nil(err, "No error expected.")
s.NotNil(tasks1, "expected valid list of tasks.")
s.Equal(1, len(tasks1), "Expected 1 replication task.")
Expand Down Expand Up @@ -1696,13 +1760,13 @@ func (s *cassandraPersistenceSuite) TestWorkflowReplicationState() {
s.Nil(err0, "No error expected.")
s.NotEmpty(task0, "Expected non empty task identifier.")

taskD, err := s.GetTransferTasks(2)
taskD, err := s.GetTransferTasks(2, false)
s.Equal(1, len(taskD), "Expected 1 decision task.")
s.Equal(TransferTaskTypeDecisionTask, taskD[0].TaskType)
err = s.CompleteTransferTask(taskD[0].TaskID)
s.Nil(err)

taskR, err := s.GetReplicationTasks(1)
taskR, err := s.GetReplicationTasks(1, false)
s.Equal(1, len(taskR), "Expected 1 replication task.")
tsk := taskR[0]
s.Equal(ReplicationTaskTypeHistory, tsk.TaskType)
Expand Down Expand Up @@ -1791,7 +1855,7 @@ func (s *cassandraPersistenceSuite) TestWorkflowReplicationState() {
err2 := s.UpdateWorklowStateAndReplication(updatedInfo, updatedReplicationState, nil, nil, int64(3), replicationTasks1)
s.Nil(err2, "No error expected.")

taskR1, err := s.GetReplicationTasks(1)
taskR1, err := s.GetReplicationTasks(1, false)
s.Equal(1, len(taskR1), "Expected 1 replication task.")
tsk1 := taskR1[0]
s.Equal(ReplicationTaskTypeHistory, tsk1.TaskType)
Expand Down
Loading

0 comments on commit 8d47f5a

Please sign in to comment.