Skip to content

Commit

Permalink
Retryable error for workflow rate limits in task processing (uber#5782)
Browse files Browse the repository at this point in the history
* Retryable error for workflow rate limits in task processing

* Update task_test.go

* Update task_test.go
  • Loading branch information
sankari165 committed Mar 15, 2024
1 parent 2a30afd commit 38930fa
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 0 deletions.
5 changes: 5 additions & 0 deletions service/history/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ func (t *taskImpl) HandleErr(err error) (retErr error) {
return err
}

if err == errWorkflowRateLimited {
// metrics are emitted within the rate limiter
return err
}

// this is a transient error
if isRedispatchErr(err) {
t.scope.IncCounter(metrics.TaskStandbyRetryCounterPerDomain)
Expand Down
23 changes: 23 additions & 0 deletions service/history/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package task

import (
"context"
"errors"
"testing"
"time"
Expand Down Expand Up @@ -182,6 +183,15 @@ func (s *taskSuite) TestHandleErr_ErrDomainNotActive() {
s.Equal(err, taskBase.HandleErr(err))
}

func (s *taskSuite) TestHandleErr_ErrWorkflowRateLimited() {
taskBase := s.newTestTask(func(task Info) (bool, error) {
return true, nil
}, nil)

taskBase.submitTime = time.Now()
s.Equal(errWorkflowRateLimited, taskBase.HandleErr(errWorkflowRateLimited))
}

func (s *taskSuite) TestHandleErr_ErrCurrentWorkflowConditionFailed() {
taskBase := s.newTestTask(func(task Info) (bool, error) {
return true, nil
Expand Down Expand Up @@ -280,6 +290,19 @@ func (s *taskSuite) TestHandleErr_ErrMaxAttempts() {
})
}

func (s *taskSuite) TestRetryErr() {
taskBase := s.newTestTask(func(task Info) (bool, error) {
return true, nil
}, nil)

s.Equal(false, taskBase.RetryErr(errWorkflowBusy))
s.Equal(false, taskBase.RetryErr(ErrTaskPendingActive))
s.Equal(false, taskBase.RetryErr(context.DeadlineExceeded))
s.Equal(false, taskBase.RetryErr(&redispatchError{Reason: "random-reason"}))
// rate limited errors are retried
s.Equal(true, taskBase.RetryErr(errWorkflowRateLimited))
}

func (s *taskSuite) newTestTask(
taskFilter Filter,
redispatchFn func(task Task),
Expand Down
1 change: 1 addition & 0 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ var (
errUnknownTransferTask = errors.New("unknown transfer task")
errWorkflowBusy = errors.New("unable to get workflow execution lock within specified timeout")
errTargetDomainNotActive = errors.New("target domain not active")
errWorkflowRateLimited = errors.New("workflow is being rate limited for making too many requests")
)

type (
Expand Down

0 comments on commit 38930fa

Please sign in to comment.