Skip to content

Commit

Permalink
Fix persistence test for task completion (uber#4245)
Browse files Browse the repository at this point in the history
- Fix execution persistence tests for testing completing transfer and replication tasks
- Remove logic in persistenceTestBase regarding recording transfer/replication task load progress
- Fix rangeCompleteReplicationTasks method in sqlExecutionManager
  • Loading branch information
yycptt authored Jun 2, 2021
1 parent 532dfb1 commit b660112
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 51 deletions.
9 changes: 4 additions & 5 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2268,6 +2268,7 @@ func (s *ExecutionManagerSuite) TestReplicationTasks() {
}
}

// TestCrossClusterTasks test
func (s *ExecutionManagerSuite) TestCrossClusterTasks() {
if s.TestBase.Config().DefaultStore != config.StoreTypeCassandra {
// TODO: remove this check once cross cluster queue related methods is impelmented for SQL
Expand Down Expand Up @@ -2447,7 +2448,7 @@ func (s *ExecutionManagerSuite) TestTransferTasksComplete() {
targetDomainID := "8bfb47be-5b57-4d66-9109-5fb35e20b1d0"
targetWorkflowID := "some random target domain ID"
targetRunID := uuid.New()
currentTransferID := s.GetTransferReadLevel()
currentTransferID := task1.TaskID
now := time.Now()
tasks := []p.Task{
&p.ActivityTask{now, currentTransferID + 10001, domainID, tasklist, scheduleID, 111},
Expand Down Expand Up @@ -2555,7 +2556,7 @@ func (s *ExecutionManagerSuite) TestTransferTasksRangeComplete() {
targetDomainID := "8bfb47be-5b57-4d66-9109-5fb35e20b1d0"
targetWorkflowID := "some random target domain ID"
targetRunID := uuid.New()
currentTransferID := s.GetTransferReadLevel()
currentTransferID := task1.TaskID
now := time.Now()
tasks := []p.Task{
&p.ActivityTask{now, currentTransferID + 10001, domainID, tasklist, scheduleID, 111},
Expand Down Expand Up @@ -3305,7 +3306,6 @@ func (s *ExecutionManagerSuite) TestReplicationTransferTaskTasks() {
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()

s.ClearReplicationQueue()
domainID := "2466d7de-6602-4ad8-b939-fb8f8c36c711"
workflowExecution := types.WorkflowExecution{
WorkflowID: "replication-transfer-task-test",
Expand Down Expand Up @@ -3357,7 +3357,7 @@ func (s *ExecutionManagerSuite) TestReplicationTransferTaskTasks() {
s.Equal(int64(3), task1.NextEventID)
s.Equal(int64(9), task1.Version)

err = s.CompleteTransferTask(ctx, task1.TaskID)
err = s.CompleteReplicationTask(ctx, task1.TaskID)
s.NoError(err)
tasks2, err := s.GetReplicationTasks(ctx, 1, false)
s.NoError(err)
Expand All @@ -3369,7 +3369,6 @@ func (s *ExecutionManagerSuite) TestReplicationTransferTaskRangeComplete() {
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()

s.ClearReplicationQueue()
domainID := uuid.New()
workflowExecution := types.WorkflowExecution{
WorkflowID: "replication-transfer-task--range-complete-test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowCreation() {
versionHistory := p.NewVersionHistory([]byte{}, []*p.VersionHistoryItem{
{decisionScheduleID, common.EmptyVersion},
})
verisonHistories := p.NewVersionHistories(versionHistory)
versionHistories := p.NewVersionHistories(versionHistory)
_, err0 := s.ExecutionManager.CreateWorkflowExecution(ctx, &p.CreateWorkflowExecutionRequest{
NewWorkflowSnapshot: p.WorkflowSnapshot{
ExecutionInfo: &p.WorkflowExecutionInfo{
Expand Down Expand Up @@ -145,7 +145,7 @@ func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowCreation() {
},
TimerTasks: nil,
Checksum: csum,
VersionHistories: verisonHistories,
VersionHistories: versionHistories,
},
RangeID: s.ShardInfo.RangeID,
})
Expand Down Expand Up @@ -174,7 +174,7 @@ func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowCreation() {
}}
updatedInfo.BranchToken = []byte("branchToken2")

err2 := s.UpdateWorkflowExecution(ctx, updatedInfo, updatedStats, verisonHistories, []int64{int64(4)}, nil, int64(3), nil, nil, nil, timerInfos, nil)
err2 := s.UpdateWorkflowExecution(ctx, updatedInfo, updatedStats, versionHistories, []int64{int64(4)}, nil, int64(3), nil, nil, nil, timerInfos, nil)
s.NoError(err2)

state, err1 := s.GetWorkflowExecutionInfo(ctx, domainID, workflowExecution)
Expand All @@ -188,7 +188,7 @@ func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowCreation() {
s.Equal(int64(5), state.TimerInfos[timerID].StartedID)
s.assertChecksumsEqual(testWorkflowChecksum, state.Checksum)

err2 = s.UpdateWorkflowExecution(ctx, updatedInfo, updatedStats, verisonHistories, nil, nil, int64(5), nil, nil, nil, nil, []string{timerID})
err2 = s.UpdateWorkflowExecution(ctx, updatedInfo, updatedStats, versionHistories, nil, nil, int64(5), nil, nil, nil, nil, []string{timerID})
s.NoError(err2)

state, err2 = s.GetWorkflowExecutionInfo(ctx, domainID, workflowExecution)
Expand Down Expand Up @@ -325,7 +325,7 @@ func (s *ExecutionManagerSuiteForEventsV2) TestContinueAsNew() {
versionHistory := p.NewVersionHistory([]byte{}, []*p.VersionHistoryItem{
{decisionScheduleID, common.EmptyVersion},
})
verisonHistories := p.NewVersionHistories(versionHistory)
versionHistories := p.NewVersionHistories(versionHistory)

newWorkflowExecution := types.WorkflowExecution{
WorkflowID: "continue-as-new-workflow-test",
Expand All @@ -350,7 +350,7 @@ func (s *ExecutionManagerSuiteForEventsV2) TestContinueAsNew() {
DeleteActivityInfos: nil,
UpsertTimerInfos: nil,
DeleteTimerInfos: nil,
VersionHistories: verisonHistories,
VersionHistories: versionHistories,
},
NewWorkflowSnapshot: &p.WorkflowSnapshot{
ExecutionInfo: &p.WorkflowExecutionInfo{
Expand All @@ -375,7 +375,7 @@ func (s *ExecutionManagerSuiteForEventsV2) TestContinueAsNew() {
ExecutionStats: &p.ExecutionStats{},
TransferTasks: nil,
TimerTasks: nil,
VersionHistories: verisonHistories,
VersionHistories: versionHistories,
},
RangeID: s.ShardInfo.RangeID,
Encoding: pickRandomEncoding(),
Expand Down Expand Up @@ -445,7 +445,7 @@ func (s *ExecutionManagerSuiteForEventsV2) createWorkflowExecution(
versionHistory := p.NewVersionHistory([]byte{}, []*p.VersionHistoryItem{
{decisionScheduleID, common.EmptyVersion},
})
verisonHistories := p.NewVersionHistories(versionHistory)
versionHistories := p.NewVersionHistories(versionHistory)
response, err := s.ExecutionManager.CreateWorkflowExecution(ctx, &p.CreateWorkflowExecutionRequest{
NewWorkflowSnapshot: p.WorkflowSnapshot{
ExecutionInfo: &p.WorkflowExecutionInfo{
Expand All @@ -471,7 +471,7 @@ func (s *ExecutionManagerSuiteForEventsV2) createWorkflowExecution(
TransferTasks: transferTasks,
ReplicationTasks: replicationTasks,
Checksum: testWorkflowChecksum,
VersionHistories: verisonHistories,
VersionHistories: versionHistories,
},
RangeID: s.ShardInfo.RangeID,
})
Expand Down
43 changes: 8 additions & 35 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ type (
ShardInfo *p.ShardInfo
TaskIDGenerator TransferTaskIDGenerator
ClusterMetadata cluster.Metadata
ReadLevel int64
ReplicationReadLevel int64
DefaultTestCluster PersistenceTestCluster
VisibilityTestCluster PersistenceTestCluster
Logger log.Logger
Expand Down Expand Up @@ -217,9 +215,6 @@ func (s *TestBase) Setup() {
s.ExecutionManager, err = factory.NewExecutionManager(shardID)
s.fatalOnError("NewExecutionManager", err)

s.ReadLevel = 0
s.ReplicationReadLevel = 0

domainFilter := &types.DomainFilter{
DomainIDs: []string{},
ReverseMatch: true,
Expand All @@ -234,7 +229,7 @@ func (s *TestBase) Setup() {
},
},
}
transferPQS := types.ProcessingQueueStates{transferPQSMap}
transferPQS := types.ProcessingQueueStates{StatesByCluster: transferPQSMap}
timerPQSMap := map[string][]*types.ProcessingQueueState{
s.ClusterMetadata.GetCurrentClusterName(): {
&types.ProcessingQueueState{
Expand Down Expand Up @@ -1287,16 +1282,15 @@ func (s *TestBase) DeleteCurrentWorkflowExecution(ctx context.Context, info *p.W
}

// GetTransferTasks is a utility method to get tasks from transfer task queue
// Note: this method will save the load progress and continue from the save progress upon later invocations
func (s *TestBase) GetTransferTasks(ctx context.Context, batchSize int, getAll bool) ([]*p.TransferTaskInfo, error) {
result := []*p.TransferTaskInfo{}
var token []byte

Loop:
for {
response, err := s.ExecutionManager.GetTransferTasks(ctx, &p.GetTransferTasksRequest{
ReadLevel: s.GetTransferReadLevel(),
MaxReadLevel: int64(math.MaxInt64),
ReadLevel: 0,
MaxReadLevel: math.MaxInt64,
BatchSize: batchSize,
NextPageToken: token,
})
Expand All @@ -1311,10 +1305,6 @@ Loop:
}
}

for _, task := range result {
atomic.StoreInt64(&s.ReadLevel, task.TaskID)
}

return result, nil
}

Expand Down Expand Up @@ -1346,16 +1336,15 @@ func (s *TestBase) GetCrossClusterTasks(ctx context.Context, targetCluster strin
}

// GetReplicationTasks is a utility method to get tasks from replication task queue
// Note: this method will save the load progress and continue from the save progress upon later invocations
func (s *TestBase) GetReplicationTasks(ctx context.Context, batchSize int, getAll bool) ([]*p.ReplicationTaskInfo, error) {
result := []*p.ReplicationTaskInfo{}
var token []byte

Loop:
for {
response, err := s.ExecutionManager.GetReplicationTasks(ctx, &p.GetReplicationTasksRequest{
ReadLevel: s.GetReplicationReadLevel(),
MaxReadLevel: int64(math.MaxInt64),
ReadLevel: 0,
MaxReadLevel: math.MaxInt64,
BatchSize: batchSize,
NextPageToken: token,
})
Expand All @@ -1370,10 +1359,6 @@ Loop:
}
}

for _, task := range result {
atomic.StoreInt64(&s.ReplicationReadLevel, task.TaskID)
}

return result, nil
}

Expand Down Expand Up @@ -1683,16 +1668,6 @@ func (s *TestBase) GetNextSequenceNumber() int64 {
return taskID
}

// GetTransferReadLevel returns the current read level for shard
func (s *TestBase) GetTransferReadLevel() int64 {
return atomic.LoadInt64(&s.ReadLevel)
}

// GetReplicationReadLevel returns the current read level for shard
func (s *TestBase) GetReplicationReadLevel() int64 {
return atomic.LoadInt64(&s.ReplicationReadLevel)
}

// ClearTasks completes all transfer tasks and replication tasks
func (s *TestBase) ClearTasks() {
s.ClearTransferQueue()
Expand All @@ -1701,7 +1676,7 @@ func (s *TestBase) ClearTasks() {

// ClearTransferQueue completes all tasks in transfer queue
func (s *TestBase) ClearTransferQueue() {
s.Logger.Info("Clearing transfer tasks", tag.ShardRangeID(s.ShardInfo.RangeID), tag.ReadLevel(s.GetTransferReadLevel()))
s.Logger.Info("Clearing transfer tasks", tag.ShardRangeID(s.ShardInfo.RangeID))
tasks, err := s.GetTransferTasks(context.Background(), 100, true)
if err != nil {
s.Logger.Fatal("Error during cleanup", tag.Error(err))
Expand All @@ -1715,12 +1690,11 @@ func (s *TestBase) ClearTransferQueue() {
}

s.Logger.Info("Deleted transfer tasks.", tag.Counter(counter))
atomic.StoreInt64(&s.ReadLevel, 0)
}

// ClearReplicationQueue completes all tasks in replication queue
func (s *TestBase) ClearReplicationQueue() {
s.Logger.Info("Clearing replication tasks", tag.ShardRangeID(s.ShardInfo.RangeID), tag.ReadLevel(s.GetReplicationReadLevel()))
s.Logger.Info("Clearing replication tasks", tag.ShardRangeID(s.ShardInfo.RangeID))
tasks, err := s.GetReplicationTasks(context.Background(), 100, true)
if err != nil {
s.Logger.Fatal("Error during cleanup", tag.Error(err))
Expand All @@ -1734,7 +1708,6 @@ func (s *TestBase) ClearReplicationQueue() {
}

s.Logger.Info("Deleted replication tasks.", tag.Counter(counter))
atomic.StoreInt64(&s.ReplicationReadLevel, 0)
}

// EqualTimesWithPrecision assertion that two times are equal within precision
Expand All @@ -1755,7 +1728,7 @@ func (s *TestBase) validateTimeRange(t time.Time, expectedDuration time.Duration
currentTime := time.Now()
diff := time.Duration(currentTime.UnixNano() - t.UnixNano())
if diff > expectedDuration {
s.Logger.Info("Check Current time, Application time, Differenrce", tag.Timestamp(t), tag.CursorTimestamp(currentTime), tag.Number(int64(diff)))
s.Logger.Info("Check Current time, Application time, Difference", tag.Timestamp(t), tag.CursorTimestamp(currentTime), tag.Number(int64(diff)))
return false
}
return true
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,8 +928,8 @@ func (m *sqlExecutionManager) RangeCompleteReplicationTask(
) error {

if _, err := m.db.RangeDeleteFromReplicationTasks(ctx, &sqlplugin.ReplicationTasksFilter{
ShardID: m.shardID,
TaskID: request.InclusiveEndTaskID,
ShardID: m.shardID,
InclusiveEndTaskID: request.InclusiveEndTaskID,
}); err != nil {
return convertCommonErrors(m.db, "RangeCompleteReplicationTask", "", err)
}
Expand Down

0 comments on commit b660112

Please sign in to comment.