From efa68ee5e26c7c1cd3ea65606c0e8f19404b31d1 Mon Sep 17 00:00:00 2001 From: Venkat Date: Mon, 9 Dec 2019 11:54:07 -0800 Subject: [PATCH] frontend.PollForDecisionTask: add validation to verify history is complete (#2890) --- common/metrics/defs.go | 2 + host/integration_test.go | 4 +- service/frontend/workflowHandler.go | 51 ++++++++++++++++++++++++ service/frontend/workflowHandler_test.go | 36 +++++++++++++++++ 4 files changed, 91 insertions(+), 2 deletions(-) diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 4db89f4a155..846e36562b3 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -1368,6 +1368,7 @@ const ( CadenceErrRetryTaskCounter CadenceErrBadBinaryCounter CadenceErrClientVersionNotSupportedCounter + CadenceErrIncompleteHistoryCounter PersistenceRequests PersistenceFailures PersistenceLatency @@ -1706,6 +1707,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ CadenceErrRetryTaskCounter: {metricName: "cadence_errors_retry_task", metricType: Counter}, CadenceErrBadBinaryCounter: {metricName: "cadence_errors_bad_binary", metricType: Counter}, CadenceErrClientVersionNotSupportedCounter: {metricName: "cadence_errors_client_version_not_supported", metricType: Counter}, + CadenceErrIncompleteHistoryCounter: {metricName: "cadence_errors_incomplete_history", metricType: Counter}, PersistenceRequests: {metricName: "persistence_requests", metricType: Counter}, PersistenceFailures: {metricName: "persistence_errors", metricType: Counter}, PersistenceLatency: {metricName: "persistence_latency", metricType: Timer}, diff --git a/host/integration_test.go b/host/integration_test.go index fd2d0bde682..ba052981fc6 100644 --- a/host/integration_test.go +++ b/host/integration_test.go @@ -560,7 +560,7 @@ func (s *integrationSuite) TestDecisionAndActivityTimeoutsWorkflow() { for i := 0; i < 8; i++ { dropDecisionTask := (i%2 == 0) - s.Logger.Info("Calling Decision Task: %d", tag.Counter(i)) + s.Logger.Info("Calling Decision Task", tag.Counter(i)) var err error if dropDecisionTask { _, err = poller.PollAndProcessDecisionTask(true, true) @@ -579,7 +579,7 @@ func (s *integrationSuite) TestDecisionAndActivityTimeoutsWorkflow() { history := historyResponse.History common.PrettyPrintHistory(history, s.Logger) } - s.True(err == nil || err == matching.ErrNoTasks, "Error", tag.Error(err)) + s.True(err == nil || err == matching.ErrNoTasks, "%v", err) if !dropDecisionTask { s.Logger.Info("Calling Activity Task: %d", tag.Counter(i)) err = poller.PollAndProcessActivityTask(i%4 == 0) diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index cb08d3346fa..1bb09fb2503 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -3219,6 +3219,25 @@ func (wh *WorkflowHandler) createPollForDecisionTaskResponse( return nil, err } + // we expect history to contain all events up to the current DecisionStartedEvent + lastEventID := matchingResp.GetStartedEventId() + if matchingResp.Query != nil { + // for query tasks, startedEventID is irrelevant, we expect history to + // contain all events upto nextEventID-1 + lastEventID = nextEventID - 1 + } + + if err := verifyHistoryIsComplete(history, firstEventID, lastEventID); err != nil { + scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter) + wh.GetLogger().Error("PollForDecisionTask: incomplete history", + tag.WorkflowDomainID(domainID), + tag.WorkflowDomainName(domain.GetInfo().Name), + tag.WorkflowID(matchingResp.WorkflowExecution.GetWorkflowId()), + tag.WorkflowRunID(matchingResp.WorkflowExecution.GetRunId()), + tag.Error(err)) + return nil, err + } + if len(persistenceToken) != 0 { continuation, err = serializeHistoryToken(&getHistoryContinuationToken{ RunID: matchingResp.WorkflowExecution.GetRunId(), @@ -3254,6 +3273,38 @@ func (wh *WorkflowHandler) createPollForDecisionTaskResponse( return resp, nil } +func verifyHistoryIsComplete( + history *gen.History, + expectedFirstEventID int64, + expectedLastEventID int64, +) error { + + firstEventID := int64(-1) + lastEventID := int64(-1) + events := history.GetEvents() + nEvents := len(events) + if nEvents > 0 { + firstEventID = events[0].GetEventId() + lastEventID = events[nEvents-1].GetEventId() + } + + nExpectedEvents := expectedLastEventID - expectedFirstEventID + 1 + + if firstEventID == expectedFirstEventID && + lastEventID == expectedLastEventID && + int64(nEvents) == nExpectedEvents { + return nil + } + + return fmt.Errorf( + "incomplete history: expected events [%v-%v] but got events [%v-%v] of length %v", + expectedFirstEventID, + expectedLastEventID, + firstEventID, + lastEventID, + nEvents) +} + func deserializeHistoryToken(bytes []byte) (*getHistoryContinuationToken, error) { token := &getHistoryContinuationToken{} err := json.Unmarshal(bytes, token) diff --git a/service/frontend/workflowHandler_test.go b/service/frontend/workflowHandler_test.go index 9af619eb150..3ec51d1d23f 100644 --- a/service/frontend/workflowHandler_test.go +++ b/service/frontend/workflowHandler_test.go @@ -1221,6 +1221,42 @@ func (s *workflowHandlerSuite) TestConvertIndexedKeyToThrift() { }) } +func (s *workflowHandlerSuite) TestVerifyHistoryIsComplete() { + events := make([]*shared.HistoryEvent, 50) + for i := 0; i < len(events); i++ { + events[i] = &shared.HistoryEvent{EventId: common.Int64Ptr(int64(i + 1))} + } + var eventsWithHoles []*shared.HistoryEvent + eventsWithHoles = append(eventsWithHoles, events[9:12]...) + eventsWithHoles = append(eventsWithHoles, events[20:31]...) + + testCases := []struct { + events []*shared.HistoryEvent + firstEventID int64 + lastEventID int64 + isResultErr bool + }{ + {events[:1], 1, 1, false}, + {events[:5], 1, 5, false}, + {events[9:31], 10, 31, false}, + {eventsWithHoles, 10, 31, true}, + {events[9:31], 9, 31, true}, + {events[9:31], 11, 31, true}, + {events[9:31], 10, 30, true}, + {events[9:31], 10, 32, true}, + } + + for _, tc := range testCases { + history := &shared.History{tc.events} + err := verifyHistoryIsComplete(history, tc.firstEventID, tc.lastEventID) + if tc.isResultErr { + s.Error(err) + } else { + s.NoError(err) + } + } +} + func (s *workflowHandlerSuite) newConfig() *Config { return NewConfig(dc.NewCollection(dc.NewNopClient(), s.mockResource.GetLogger()), numHistoryShards, false) }