Skip to content

Commit

Permalink
frontend.PollForDecisionTask: add validation to verify history is com…
Browse files Browse the repository at this point in the history
…plete (#2890)
  • Loading branch information
venkat1109 authored Dec 9, 2019
1 parent ccdf53d commit efa68ee
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 2 deletions.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1368,6 +1368,7 @@ const (
CadenceErrRetryTaskCounter
CadenceErrBadBinaryCounter
CadenceErrClientVersionNotSupportedCounter
CadenceErrIncompleteHistoryCounter
PersistenceRequests
PersistenceFailures
PersistenceLatency
Expand Down Expand Up @@ -1706,6 +1707,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
CadenceErrRetryTaskCounter: {metricName: "cadence_errors_retry_task", metricType: Counter},
CadenceErrBadBinaryCounter: {metricName: "cadence_errors_bad_binary", metricType: Counter},
CadenceErrClientVersionNotSupportedCounter: {metricName: "cadence_errors_client_version_not_supported", metricType: Counter},
CadenceErrIncompleteHistoryCounter: {metricName: "cadence_errors_incomplete_history", metricType: Counter},
PersistenceRequests: {metricName: "persistence_requests", metricType: Counter},
PersistenceFailures: {metricName: "persistence_errors", metricType: Counter},
PersistenceLatency: {metricName: "persistence_latency", metricType: Timer},
Expand Down
4 changes: 2 additions & 2 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func (s *integrationSuite) TestDecisionAndActivityTimeoutsWorkflow() {

for i := 0; i < 8; i++ {
dropDecisionTask := (i%2 == 0)
s.Logger.Info("Calling Decision Task: %d", tag.Counter(i))
s.Logger.Info("Calling Decision Task", tag.Counter(i))
var err error
if dropDecisionTask {
_, err = poller.PollAndProcessDecisionTask(true, true)
Expand All @@ -579,7 +579,7 @@ func (s *integrationSuite) TestDecisionAndActivityTimeoutsWorkflow() {
history := historyResponse.History
common.PrettyPrintHistory(history, s.Logger)
}
s.True(err == nil || err == matching.ErrNoTasks, "Error", tag.Error(err))
s.True(err == nil || err == matching.ErrNoTasks, "%v", err)
if !dropDecisionTask {
s.Logger.Info("Calling Activity Task: %d", tag.Counter(i))
err = poller.PollAndProcessActivityTask(i%4 == 0)
Expand Down
51 changes: 51 additions & 0 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3219,6 +3219,25 @@ func (wh *WorkflowHandler) createPollForDecisionTaskResponse(
return nil, err
}

// we expect history to contain all events up to the current DecisionStartedEvent
lastEventID := matchingResp.GetStartedEventId()
if matchingResp.Query != nil {
// for query tasks, startedEventID is irrelevant, we expect history to
// contain all events upto nextEventID-1
lastEventID = nextEventID - 1
}

if err := verifyHistoryIsComplete(history, firstEventID, lastEventID); err != nil {
scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
wh.GetLogger().Error("PollForDecisionTask: incomplete history",
tag.WorkflowDomainID(domainID),
tag.WorkflowDomainName(domain.GetInfo().Name),
tag.WorkflowID(matchingResp.WorkflowExecution.GetWorkflowId()),
tag.WorkflowRunID(matchingResp.WorkflowExecution.GetRunId()),
tag.Error(err))
return nil, err
}

if len(persistenceToken) != 0 {
continuation, err = serializeHistoryToken(&getHistoryContinuationToken{
RunID: matchingResp.WorkflowExecution.GetRunId(),
Expand Down Expand Up @@ -3254,6 +3273,38 @@ func (wh *WorkflowHandler) createPollForDecisionTaskResponse(
return resp, nil
}

func verifyHistoryIsComplete(
history *gen.History,
expectedFirstEventID int64,
expectedLastEventID int64,
) error {

firstEventID := int64(-1)
lastEventID := int64(-1)
events := history.GetEvents()
nEvents := len(events)
if nEvents > 0 {
firstEventID = events[0].GetEventId()
lastEventID = events[nEvents-1].GetEventId()
}

nExpectedEvents := expectedLastEventID - expectedFirstEventID + 1

if firstEventID == expectedFirstEventID &&
lastEventID == expectedLastEventID &&
int64(nEvents) == nExpectedEvents {
return nil
}

return fmt.Errorf(
"incomplete history: expected events [%v-%v] but got events [%v-%v] of length %v",
expectedFirstEventID,
expectedLastEventID,
firstEventID,
lastEventID,
nEvents)
}

func deserializeHistoryToken(bytes []byte) (*getHistoryContinuationToken, error) {
token := &getHistoryContinuationToken{}
err := json.Unmarshal(bytes, token)
Expand Down
36 changes: 36 additions & 0 deletions service/frontend/workflowHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,42 @@ func (s *workflowHandlerSuite) TestConvertIndexedKeyToThrift() {
})
}

func (s *workflowHandlerSuite) TestVerifyHistoryIsComplete() {
events := make([]*shared.HistoryEvent, 50)
for i := 0; i < len(events); i++ {
events[i] = &shared.HistoryEvent{EventId: common.Int64Ptr(int64(i + 1))}
}
var eventsWithHoles []*shared.HistoryEvent
eventsWithHoles = append(eventsWithHoles, events[9:12]...)
eventsWithHoles = append(eventsWithHoles, events[20:31]...)

testCases := []struct {
events []*shared.HistoryEvent
firstEventID int64
lastEventID int64
isResultErr bool
}{
{events[:1], 1, 1, false},
{events[:5], 1, 5, false},
{events[9:31], 10, 31, false},
{eventsWithHoles, 10, 31, true},
{events[9:31], 9, 31, true},
{events[9:31], 11, 31, true},
{events[9:31], 10, 30, true},
{events[9:31], 10, 32, true},
}

for _, tc := range testCases {
history := &shared.History{tc.events}
err := verifyHistoryIsComplete(history, tc.firstEventID, tc.lastEventID)
if tc.isResultErr {
s.Error(err)
} else {
s.NoError(err)
}
}
}

func (s *workflowHandlerSuite) newConfig() *Config {
return NewConfig(dc.NewCollection(dc.NewNopClient(), s.mockResource.GetLogger()), numHistoryShards, false)
}
Expand Down

0 comments on commit efa68ee

Please sign in to comment.