Skip to content

Commit

Permalink
frontend.getHistory: add validation / sanity checks (#2900)
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Dec 12, 2019
1 parent 9e996b0 commit 97de98c
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 51 deletions.
129 changes: 93 additions & 36 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1734,7 +1734,6 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
tag.WorkflowID(getRequest.Execution.GetWorkflowId()),
tag.WorkflowRunID(getRequest.Execution.GetRunId()),
tag.WorkflowDomainID(domainID), tag.WorkflowSize(int64(getRequest.GetMaximumPageSize())))

getRequest.MaximumPageSize = common.Int32Ptr(common.GetHistoryMaxPageSize)
}

Expand Down Expand Up @@ -3008,6 +3007,7 @@ func (wh *WorkflowHandler) getHistory(
historyEvents := []*gen.HistoryEvent{}
var size int

isFirstPage := len(nextPageToken) == 0
shardID := common.WorkflowIDToHistoryShard(*execution.WorkflowId, wh.config.NumHistoryShards)
var err error
historyEvents, size, nextPageToken, err = persistence.ReadFullPageV2Events(wh.GetHistoryManager(), &persistence.ReadHistoryBranchRequest{
Expand All @@ -3024,7 +3024,32 @@ func (wh *WorkflowHandler) getHistory(

scope.RecordTimer(metrics.HistorySize, time.Duration(size))

isLastPage := len(nextPageToken) == 0
if err := verifyHistoryIsComplete(
historyEvents,
firstEventID,
nextEventID-1,
isFirstPage,
isLastPage,
int(pageSize)); err != nil {
scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
wh.GetLogger().Error("getHistory: incomplete history",
tag.WorkflowDomainID(domainID),
tag.WorkflowID(execution.GetWorkflowId()),
tag.WorkflowRunID(execution.GetRunId()),
tag.Error(err))
return nil, nil, err
}

if len(nextPageToken) == 0 && transientDecision != nil {
if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
wh.GetLogger().Error("getHistory error",
tag.WorkflowDomainID(domainID),
tag.WorkflowID(execution.GetWorkflowId()),
tag.WorkflowRunID(execution.GetRunId()),
tag.Error(err))
}
// Append the transient decision events once we are done enumerating everything from the events table
historyEvents = append(historyEvents, transientDecision.ScheduledEvent, transientDecision.StartedEvent)
}
Expand All @@ -3034,6 +3059,25 @@ func (wh *WorkflowHandler) getHistory(
return executionHistory, nextPageToken, nil
}

func (wh *WorkflowHandler) validateTransientDecisionEvents(
expectedNextEventID int64,
decision *gen.TransientDecisionInfo,
) error {

if decision.ScheduledEvent.GetEventId() == expectedNextEventID &&
decision.StartedEvent.GetEventId() == expectedNextEventID+1 {
return nil
}

return fmt.Errorf(
"invalid transient decision: "+
"expectedScheduledEventID=%v expectedStartedEventID=%v but have scheduledEventID=%v startedEventID=%v",
expectedNextEventID,
expectedNextEventID+1,
decision.ScheduledEvent.GetEventId(),
decision.StartedEvent.GetEventId())
}

func (wh *WorkflowHandler) getLoggerForTask(taskToken []byte) log.Logger {
logger := wh.GetLogger()
task, err := wh.tokenSerializer.Deserialize(taskToken)
Expand Down Expand Up @@ -3223,25 +3267,6 @@ func (wh *WorkflowHandler) createPollForDecisionTaskResponse(
return nil, err
}

// we expect history to contain all events up to the current DecisionStartedEvent
lastEventID := matchingResp.GetStartedEventId()
if matchingResp.Query != nil {
// for query tasks, startedEventID is irrelevant, we expect history to
// contain all events upto nextEventID-1
lastEventID = nextEventID - 1
}

if err := verifyHistoryIsComplete(history, firstEventID, lastEventID); err != nil {
scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
wh.GetLogger().Error("PollForDecisionTask: incomplete history",
tag.WorkflowDomainID(domainID),
tag.WorkflowDomainName(domain.GetInfo().Name),
tag.WorkflowID(matchingResp.WorkflowExecution.GetWorkflowId()),
tag.WorkflowRunID(matchingResp.WorkflowExecution.GetRunId()),
tag.Error(err))
return nil, err
}

if len(persistenceToken) != 0 {
continuation, err = serializeHistoryToken(&getHistoryContinuationToken{
RunID: matchingResp.WorkflowExecution.GetRunId(),
Expand Down Expand Up @@ -3278,35 +3303,67 @@ func (wh *WorkflowHandler) createPollForDecisionTaskResponse(
}

func verifyHistoryIsComplete(
history *gen.History,
events []*gen.HistoryEvent,
expectedFirstEventID int64,
expectedLastEventID int64,
isFirstPage bool,
isLastPage bool,
pageSize int,
) error {

firstEventID := int64(-1)
lastEventID := int64(-1)
events := history.GetEvents()
nEvents := len(events)
if nEvents > 0 {
firstEventID = events[0].GetEventId()
lastEventID = events[nEvents-1].GetEventId()
if nEvents == 0 {
if isLastPage {
// we seem to be returning a non-nil pageToken on the lastPage which
// in turn cases the client to call getHistory again - only to find
// there are no more events to consume - bail out if this is the case here
return nil
}
return fmt.Errorf("invalid history: contains zero events")
}

firstEventID := events[0].GetEventId()
lastEventID := events[nEvents-1].GetEventId()

if !isFirstPage { // atleast one page of history has been read previously
if firstEventID <= expectedFirstEventID {
// not first page and no events have been read in the previous pages - not possible
return &gen.InternalServiceError{
Message: fmt.Sprintf(
"invalid history: expected first eventID to be > %v but got %v", expectedFirstEventID, firstEventID),
}
}
expectedFirstEventID = firstEventID
}

if !isLastPage {
// estimate lastEventID based on pageSize. This is a lower bound
// since the persistence layer counts "batch of events" as a single page
expectedLastEventID = expectedFirstEventID + int64(pageSize) - 1
}

nExpectedEvents := expectedLastEventID - expectedFirstEventID + 1

if firstEventID == expectedFirstEventID &&
lastEventID == expectedLastEventID &&
int64(nEvents) == nExpectedEvents {
((isLastPage && lastEventID == expectedLastEventID && int64(nEvents) == nExpectedEvents) ||
(!isLastPage && lastEventID >= expectedLastEventID && int64(nEvents) >= nExpectedEvents)) {
return nil
}

return fmt.Errorf(
"incomplete history: expected events [%v-%v] but got events [%v-%v] of length %v",
expectedFirstEventID,
expectedLastEventID,
firstEventID,
lastEventID,
nEvents)
return &gen.InternalServiceError{
Message: fmt.Sprintf(
"incomplete history: "+
"expected events [%v-%v] but got events [%v-%v] of length %v:"+
"isFirstPage=%v,isLastPage=%v,pageSize=%v",
expectedFirstEventID,
expectedLastEventID,
firstEventID,
lastEventID,
nEvents,
isFirstPage,
isLastPage,
pageSize),
}
}

func deserializeHistoryToken(bytes []byte) (*getHistoryContinuationToken, error) {
Expand Down
47 changes: 32 additions & 15 deletions service/frontend/workflowHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ func (s *workflowHandlerSuite) TestGetHistory() {
s.mockHistoryV2Mgr.On("ReadHistoryBranch", req).Return(&persistence.ReadHistoryBranchResponse{
HistoryEvents: []*shared.HistoryEvent{
{
EventId: common.Int64Ptr(int64(1)),
EventId: common.Int64Ptr(int64(100)),
},
},
NextPageToken: []byte{},
Expand All @@ -991,9 +991,9 @@ func (s *workflowHandlerSuite) TestGetHistory() {

scope := metrics.NoopScope(metrics.Frontend)
history, token, err := wh.getHistory(scope, domainID, we, firstEventID, nextEventID, 0, []byte{}, nil, branchToken)
s.NoError(err)
s.NotNil(history)
s.Equal([]byte{}, token)
s.NoError(err)
}

func (s *workflowHandlerSuite) TestListArchivedVisibility_Failure_InvalidRequest() {
Expand Down Expand Up @@ -1234,25 +1234,42 @@ func (s *workflowHandlerSuite) TestVerifyHistoryIsComplete() {
events []*shared.HistoryEvent
firstEventID int64
lastEventID int64
isFirstPage bool
isLastPage bool
pageSize int
isResultErr bool
}{
{events[:1], 1, 1, false},
{events[:5], 1, 5, false},
{events[9:31], 10, 31, false},
{eventsWithHoles, 10, 31, true},
{events[9:31], 9, 31, true},
{events[9:31], 11, 31, true},
{events[9:31], 10, 30, true},
{events[9:31], 10, 32, true},
{events[:1], 1, 1, true, true, 1000, false},
{events[:5], 1, 5, true, true, 1000, false},
{events[9:31], 10, 31, true, true, 1000, false},
{events[9:29], 10, 50, true, false, 20, false},
{events[9:30], 10, 50, true, false, 20, false},

{events[9:29], 1, 50, false, false, 20, false},
{events[9:29], 1, 29, false, true, 20, false},

{eventsWithHoles, 1, 50, false, false, 22, true},
{eventsWithHoles, 10, 50, true, false, 22, true},
{eventsWithHoles, 1, 31, false, true, 22, true},
{eventsWithHoles, 10, 31, true, true, 1000, true},

{events[9:31], 9, 31, true, true, 1000, true},
{events[9:31], 9, 50, true, false, 22, true},
{events[9:31], 11, 31, true, true, 1000, true},
{events[9:31], 11, 50, true, false, 22, true},

{events[9:31], 10, 30, true, true, 1000, true},
{events[9:31], 1, 30, false, true, 22, true},
{events[9:31], 10, 32, true, true, 1000, true},
{events[9:31], 1, 32, false, true, 22, true},
}

for _, tc := range testCases {
history := &shared.History{tc.events}
err := verifyHistoryIsComplete(history, tc.firstEventID, tc.lastEventID)
for i, tc := range testCases {
err := verifyHistoryIsComplete(tc.events, tc.firstEventID, tc.lastEventID, tc.isFirstPage, tc.isLastPage, tc.pageSize)
if tc.isResultErr {
s.Error(err)
s.Error(err, "testcase %v failed", i)
} else {
s.NoError(err)
s.NoError(err, "testcase %v failed", i)
}
}
}
Expand Down

0 comments on commit 97de98c

Please sign in to comment.