Skip to content

Commit

Permalink
Fix incorrect history length in visibility (#1361)
Browse files Browse the repository at this point in the history
* Fix incorrect history length in visibility

* Address comments
  • Loading branch information
vancexu authored Dec 28, 2018
1 parent 39f0f40 commit e5f00c2
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 5 deletions.
9 changes: 6 additions & 3 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3452,7 +3452,7 @@ func (s *integrationSuite) TestVisibility() {
s.Nil(err1)

// wait until the start workflow is done
var nexttoken []byte
var nextToken []byte
historyEventFilterType := workflow.HistoryEventFilterTypeCloseEvent
for {
historyResponse, historyErr := s.engine.GetWorkflowExecutionHistory(createContext(), &workflow.GetWorkflowExecutionHistoryRequest{
Expand All @@ -3463,14 +3463,14 @@ func (s *integrationSuite) TestVisibility() {
},
WaitForNewEvent: common.BoolPtr(true),
HistoryEventFilterType: &historyEventFilterType,
NextPageToken: nexttoken,
NextPageToken: nextToken,
})
s.Nil(historyErr)
if len(historyResponse.NextPageToken) == 0 {
break
}

nexttoken = historyResponse.NextPageToken
nextToken = historyResponse.NextPageToken
}

startRequest = &workflow.StartWorkflowExecutionRequest{
Expand All @@ -3495,6 +3495,7 @@ func (s *integrationSuite) TestVisibility() {
closedCount := 0
openCount := 0

var historyLength int64
for i := 0; i < 10; i++ {
resp, err3 := s.engine.ListClosedWorkflowExecutions(createContext(), &workflow.ListClosedWorkflowExecutionsRequest{
Domain: common.StringPtr(s.domainName),
Expand All @@ -3504,12 +3505,14 @@ func (s *integrationSuite) TestVisibility() {
s.Nil(err3)
closedCount = len(resp.Executions)
if closedCount == 1 {
historyLength = *(resp.Executions[0].HistoryLength)
break
}
s.logger.Info("Closed WorkflowExecution is not yet visible")
time.Sleep(100 * time.Millisecond)
}
s.Equal(1, closedCount)
s.Equal(int64(5), historyLength)

for i := 0; i < 10; i++ {
resp, err4 := s.engine.ListOpenWorkflowExecutions(createContext(), &workflow.ListOpenWorkflowExecutionsRequest{
Expand Down
2 changes: 1 addition & 1 deletion service/history/transferQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (t *transferQueueActiveProcessorImpl) processCloseExecution(task *persisten
workflowStartTimestamp := executionInfo.StartTimestamp.UnixNano()
workflowCloseTimestamp := msBuilder.GetLastUpdatedTimestamp()
workflowCloseStatus := getWorkflowExecutionCloseStatus(executionInfo.CloseStatus)
workflowHistoryLength := msBuilder.GetNextEventID()
workflowHistoryLength := msBuilder.GetNextEventID() - 1

// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
Expand Down
2 changes: 1 addition & 1 deletion service/history/transferQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (t *transferQueueStandbyProcessorImpl) processCloseExecution(transferTask *
workflowStartTimestamp := executionInfo.StartTimestamp.UnixNano()
workflowCloseTimestamp := msBuilder.GetLastUpdatedTimestamp()
workflowCloseStatus := getWorkflowExecutionCloseStatus(executionInfo.CloseStatus)
workflowHistoryLength := msBuilder.GetNextEventID()
workflowHistoryLength := msBuilder.GetNextEventID() - 1

ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, msBuilder.GetLastWriteVersion(), transferTask.Version, transferTask)
if err != nil {
Expand Down

0 comments on commit e5f00c2

Please sign in to comment.