Skip to content

Commit

Permalink
Merge pull request #15 from jalkanen/master
Browse files Browse the repository at this point in the history
Provide a way to specify a custom backoff function for retried jobs.
  • Loading branch information
cypriss authored Sep 1, 2016
2 parents dfa0463 + ca4d7cd commit 8aaf018
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 3 deletions.
18 changes: 15 additions & 3 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,20 @@ func (w *worker) addToRetry(job *Job, runErr error) {
conn := w.pool.Get()
defer conn.Close()

var backoff BackoffCalculator

// Choose the backoff provider
jt, ok := w.jobTypes[job.Name]; if ok {
backoff = jt.Backoff
}

if backoff == nil {
backoff = defaultBackoffCalculator
}

conn.Send("MULTI")
conn.Send("LREM", job.inProgQueue, 1, job.rawJSON)
conn.Send("ZADD", redisKeyRetry(w.namespace), nowEpochSeconds()+backoff(job.Fails), rawJSON)
conn.Send("ZADD", redisKeyRetry(w.namespace), nowEpochSeconds()+backoff(job), rawJSON)
_, err = conn.Do("EXEC")
if err != nil {
logError("worker.add_to_retry.exec", err)
Expand Down Expand Up @@ -281,7 +292,8 @@ func (w *worker) addToDead(job *Job, runErr error) {
}
}

// backoff returns number of seconds t
func backoff(fails int64) int64 {
// Default algorithm returns an fastly increasing backoff counter which grows in an unbounded fashion
func defaultBackoffCalculator(job *Job) int64 {
fails := job.Fails
return (fails * fails * fails * fails) + 15 + (rand.Int63n(30) * (fails + 1))
}
7 changes: 7 additions & 0 deletions worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,18 @@ type jobType struct {
DynamicHandler reflect.Value
}

// You may provide your own backoff function for retrying failed jobs or use the builtin one.
// Returns the number of seconds to wait until the next attempt.
//
// The builtin backoff calculator provides an exponentially increasing wait function.
type BackoffCalculator func(job *Job) int64

// JobOptions can be passed to JobWithOptions.
type JobOptions struct {
Priority uint // Priority from 1 to 10000
MaxFails uint // 1: send straight to dead (unless SkipDead)
SkipDead bool // If true, don't send failed jobs to the dead queue when retries are exhausted.
Backoff BackoffCalculator // If not set, uses the default backoff algorithm
}

// GenericHandler is a job handler without any custom context.
Expand Down
51 changes: 51 additions & 0 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,57 @@ func TestWorkerRetry(t *testing.T) {
assert.True(t, (nowEpochSeconds()-job.FailedAt) <= 2)
}

// Check if a custom backoff function functions functionally.
func TestWorkerRetryWithCustomBackoff(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
job1 := "job1"
deleteQueue(pool, ns, job1)
deleteRetryAndDead(pool, ns)
calledCustom := 0

custombo := func(job *Job) int64 {
calledCustom++
return 5 // Always 5 seconds
}

jobTypes := make(map[string]*jobType)
jobTypes[job1] = &jobType{
Name: job1,
JobOptions: JobOptions{Priority: 1, MaxFails: 3, Backoff: custombo},
IsGeneric: true,
GenericHandler: func(job *Job) error {
return fmt.Errorf("sorry kid")
},
}

enqueuer := NewEnqueuer(ns, pool)
_, err := enqueuer.Enqueue(job1, Q{"a": 1})
assert.Nil(t, err)
w := newWorker(ns, "1", pool, tstCtxType, nil, jobTypes)
w.start()
w.drain()
w.stop()

// Ensure the right stuff is in our queues:
assert.EqualValues(t, 1, zsetSize(pool, redisKeyRetry(ns)))
assert.EqualValues(t, 0, zsetSize(pool, redisKeyDead(ns)))
assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1)))
assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, "1", job1)))

// Get the job on the retry queue
ts, job := jobOnZset(pool, redisKeyRetry(ns))

assert.True(t, ts > nowEpochSeconds()) // enqueued in the future
assert.True(t, ts < (nowEpochSeconds()+10)) // but less than ten secs in

assert.Equal(t, job1, job.Name) // basics are preserved
assert.EqualValues(t, 1, job.Fails)
assert.Equal(t, "sorry kid", job.LastErr)
assert.True(t, (nowEpochSeconds()-job.FailedAt) <= 2)
assert.Equal(t, 1, calledCustom)
}

func TestWorkerDead(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
Expand Down

0 comments on commit 8aaf018

Please sign in to comment.