Skip to content

Commit

Permalink
Fail inflight decision when buffered events exceeds limit (uber#1326)
Browse files Browse the repository at this point in the history
* Fail inflight decision when buffered events exceeds limit

* Fix integration test TestRateLimitBufferedEvents
  • Loading branch information
yiminc authored Dec 11, 2018
1 parent b01495b commit 7fc51ca
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 100 deletions.
48 changes: 19 additions & 29 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/persistence-tests"
"github.com/uber/cadence/service/history"
"github.com/uber/cadence/service/matching"
)

Expand Down Expand Up @@ -72,7 +71,7 @@ func (s *IntegrationBase) setupShards() {
}
}

func TestIntegrationSuite(t *testing.T) {
func TestRateLimitBufferedEventsTestIntegrationSuite(t *testing.T) {
flag.Parse()
if *integration && !*testEventsV2 {
s := new(integrationSuite)
Expand Down Expand Up @@ -2058,16 +2057,12 @@ func (s *integrationSuite) TestRateLimitBufferedEvents() {
s.Nil(s.sendSignal(s.domainName, workflowExecution, "SignalName", buf.Bytes(), identity))
}

// Rate limitted signals
for i := 0; i < 10; i++ {
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, i)
signalErr := s.sendSignal(s.domainName, workflowExecution, "SignalName", buf.Bytes(), identity)
s.NotNil(signalErr)
s.Equal(history.ErrBufferedEventsLimitExceeded, signalErr)
}
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, 101)
signalErr := s.sendSignal(s.domainName, workflowExecution, "SignalName", buf.Bytes(), identity)
s.Nil(signalErr)

// First decision is empty
// this decision will be ignored as he decision task is already failed
return nil, []*workflow.Decision{}, nil
}

Expand All @@ -2091,10 +2086,10 @@ func (s *integrationSuite) TestRateLimitBufferedEvents() {
T: s.T(),
}

// Make first decision to schedule activity
// first decision to send 101 signals, the last signal will force fail decision and flush buffered events.
_, err := poller.PollAndProcessDecisionTask(false, false)
s.logger.Infof("PollAndProcessDecisionTask: %v", err)
s.Nil(err)
s.EqualError(err, "EntityNotExistsError{Message: Decision task not found.}")

// Process signal in decider
_, err = poller.PollAndProcessDecisionTask(true, false)
Expand All @@ -2104,7 +2099,7 @@ func (s *integrationSuite) TestRateLimitBufferedEvents() {
s.printWorkflowHistory(s.domainName, workflowExecution)

s.True(workflowComplete)
s.Equal(100, signalCount)
s.Equal(101, signalCount) // check that all 101 signals are received.
}

func (s *integrationSuite) TestSignalWorkflow_DuplicateRequest() {
Expand Down Expand Up @@ -6471,32 +6466,27 @@ func (s *integrationSuite) TestTaskProcessingProtectionForRateLimitError() {
s.Nil(err)

// Send one signal to create a new decision
for i := 0; i < 1; i++ {
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, i)
s.Nil(s.sendSignal(s.domainName, workflowExecution, "SignalName", buf.Bytes(), identity))
}
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, 0)
s.Nil(s.sendSignal(s.domainName, workflowExecution, "SignalName", buf.Bytes(), identity))

// Drop decision to cause all events to be buffered from now on
_, err = poller.PollAndProcessDecisionTask(false, true)
s.logger.Infof("PollAndProcessDecisionTask: %v", err)
s.Nil(err)

// Buffered Signals
// Buffered 100 Signals
for i := 1; i < 101; i++ {
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, i)
s.Nil(s.sendSignal(s.domainName, workflowExecution, "SignalName", buf.Bytes(), identity))
}

// Rate limitted signals
for i := 0; i < 10; i++ {
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, i)
signalErr := s.sendSignal(s.domainName, workflowExecution, "SignalName", buf.Bytes(), identity)
s.NotNil(signalErr)
s.Equal(history.ErrBufferedEventsLimitExceeded, signalErr)
}
// 101 signal, which will fail the decision
buf = new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, 101)
signalErr := s.sendSignal(s.domainName, workflowExecution, "SignalName", buf.Bytes(), identity)
s.Nil(signalErr)

// Process signal in decider
_, err = poller.PollAndProcessDecisionTaskWithAttempt(true, false, false, false, 0)
Expand All @@ -6506,7 +6496,7 @@ func (s *integrationSuite) TestTaskProcessingProtectionForRateLimitError() {
s.printWorkflowHistory(s.domainName, workflowExecution)

s.True(workflowComplete)
s.Equal(101, signalCount)
s.Equal(102, signalCount)
}

func (s *integrationSuite) TestStickyTimeout_NonTransientDecision() {
Expand Down
37 changes: 1 addition & 36 deletions service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,42 +828,7 @@ func (t *timerQueueActiveProcessorImpl) updateWorkflowExecution(
t.timerQueueProcessorBase.Stop()
return err
}

// Check if the processing is blocked due to limit exceeded error and fail any outstanding decision to
// unblock processing
if err == ErrBufferedEventsLimitExceeded {
context.clear()

var err1 error
// Reload workflow execution so we can apply the decision task failure event
msBuilder, err1 = context.loadWorkflowExecution()
if err1 != nil {
return err1
}

if di, ok := msBuilder.GetInFlightDecisionTask(); ok {
msBuilder.AddDecisionTaskFailedEvent(di.ScheduleID, di.StartedID,
workflow.DecisionTaskFailedCauseForceCloseDecision, nil, identityHistoryService)

var transT, timerT []persistence.Task
transT, timerT, err1 = context.scheduleNewDecision(transT, timerT)
if err1 != nil {
return err1
}

// Generate a transaction ID for appending events to history
transactionID, err1 := t.historyService.shard.GetNextTransferTaskID()
if err1 != nil {
return err1
}
err1 = context.updateWorkflowExecution(transT, timerT, transactionID)
if err1 != nil {
return err1
}
}

return err
}
return err
}

t.notifyNewTimers(timerTasks)
Expand Down
35 changes: 0 additions & 35 deletions service/history/transferQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,41 +1010,6 @@ Update_History_Loop:
if err == ErrConflict {
continue Update_History_Loop
}

// Check if the processing is blocked due to limit exceeded error and fail any outstanding decision to
// unblock processing
if err == ErrBufferedEventsLimitExceeded {
context.clear()

var err1 error
// Reload workflow execution so we can apply the decision task failure event
msBuilder, err1 = context.loadWorkflowExecution()
if err1 != nil {
return err1
}

if di, ok := msBuilder.GetInFlightDecisionTask(); ok {
msBuilder.AddDecisionTaskFailedEvent(di.ScheduleID, di.StartedID,
workflow.DecisionTaskFailedCauseForceCloseDecision, nil, identityHistoryService)

var transT, timerT []persistence.Task
transT, timerT, err1 = context.scheduleNewDecision(transT, timerT)
if err1 != nil {
return err1
}

// Generate a transaction ID for appending events to history
transactionID, err1 := t.historyService.shard.GetNextTransferTaskID()
if err1 != nil {
return err1
}
err1 = context.updateWorkflowExecution(transT, timerT, transactionID)
if err1 != nil {
return err1
}
}
}

return err
}

Expand Down
40 changes: 40 additions & 0 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,14 @@ func (c *workflowExecutionContextImpl) update(transferTasks []persistence.Task,
// Take a snapshot of all updates we have accumulated for this execution
updates, err := c.msBuilder.CloseUpdateSession()
if err != nil {
if err == ErrBufferedEventsLimitExceeded {
if err1 := c.failInflightDecision(); err1 != nil {
return err1
}

// Buffered events are flushed, we want upper layer to retry
return ErrConflict
}
return err
}

Expand Down Expand Up @@ -710,6 +718,38 @@ func (c *workflowExecutionContextImpl) scheduleNewDecision(transferTasks []persi
return transferTasks, timerTasks, nil
}

func (c *workflowExecutionContextImpl) failInflightDecision() error {
c.clear()

// Reload workflow execution so we can apply the decision task failure event
msBuilder, err1 := c.loadWorkflowExecution()
if err1 != nil {
return err1
}

if di, ok := msBuilder.GetInFlightDecisionTask(); ok {
msBuilder.AddDecisionTaskFailedEvent(di.ScheduleID, di.StartedID,
workflow.DecisionTaskFailedCauseForceCloseDecision, nil, identityHistoryService)

var transT, timerT []persistence.Task
transT, timerT, err1 = c.scheduleNewDecision(transT, timerT)
if err1 != nil {
return err1
}

// Generate a transaction ID for appending events to history
transactionID, err1 := c.shard.GetNextTransferTaskID()
if err1 != nil {
return err1
}
err1 = c.updateWorkflowExecution(transT, timerT, transactionID)
if err1 != nil {
return err1
}
}
return nil
}

func (c *workflowExecutionContextImpl) emitWorkflowExecutionStats(stats *persistence.MutableStateStats, executionInfoHistorySize int64) {
if stats == nil {
return
Expand Down

0 comments on commit 7fc51ca

Please sign in to comment.