Skip to content

Commit

Permalink
Fix infinite worker loop preventing shutdown when Stop() called (gocr…
Browse files Browse the repository at this point in the history
…aft#54)

* Use 1 second timeout for bad connect test

* Remove infinite loop preventing shutdown

* Add test for worker pool stop if jobs in queue
  • Loading branch information
tylerb authored and taylorchu committed Jul 11, 2017
1 parent e8fb31a commit d3a863d
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 23 deletions.
40 changes: 18 additions & 22 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,29 +111,25 @@ func (w *worker) loop() {
drained = true
timer.Reset(0)
case <-timer.C:
gotJob := true
for gotJob {
job, err := w.fetchJob()
if err != nil {
logError("worker.fetch", err)
gotJob = false
timer.Reset(10 * time.Millisecond)
} else if job != nil {
w.processJob(job)
consequtiveNoJobs = 0
} else {
gotJob = false
if drained {
w.doneDrainingChan <- struct{}{}
drained = false
}
consequtiveNoJobs++
idx := consequtiveNoJobs
if idx >= int64(len(sleepBackoffsInMilliseconds)) {
idx = int64(len(sleepBackoffsInMilliseconds)) - 1
}
timer.Reset(time.Duration(sleepBackoffsInMilliseconds[idx]) * time.Millisecond)
job, err := w.fetchJob()
if err != nil {
logError("worker.fetch", err)
timer.Reset(10 * time.Millisecond)
} else if job != nil {
w.processJob(job)
consequtiveNoJobs = 0
timer.Reset(0)
} else {
if drained {
w.doneDrainingChan <- struct{}{}
drained = false
}
consequtiveNoJobs++
idx := consequtiveNoJobs
if idx >= int64(len(sleepBackoffsInMilliseconds)) {
idx = int64(len(sleepBackoffsInMilliseconds)) - 1
}
timer.Reset(time.Duration(sleepBackoffsInMilliseconds[idx]) * time.Millisecond)
}
}
}
Expand Down
47 changes: 46 additions & 1 deletion worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package work
import (
"fmt"
"strconv"
"sync/atomic"
"testing"
"time"

"github.com/garyburd/redigo/redis"
"github.com/gocraft/work"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -361,7 +363,7 @@ func TestWorkersPaused(t *testing.T) {
func TestStop(t *testing.T) {
redisPool := &redis.Pool{
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", "notworking:6379")
c, err := redis.Dial("tcp", "notworking:6379", redis.DialConnectTimeout(1*time.Second))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -581,3 +583,46 @@ func deletePausedAndLockedKeys(namespace, jobName string, pool *redis.Pool) erro
}
return nil
}

type emptyCtx struct{}

// Starts up a pool with two workers emptying it as fast as they can
// The pool is Stop()ped while jobs are still going on. Tests that the
// pool processing is really stopped and that it's not first completely
// drained before returning.
// https://github.com/gocraft/work/issues/24
func TestWorkerPoolStop(t *testing.T) {
ns := "will_it_end"
pool := newTestPool(":6379")
var started, stopped int32
num_iters := 30

wp := NewWorkerPool(emptyCtx{}, 2, ns, pool)

wp.Job("sample_job", func(c *emptyCtx, job *Job) error {
atomic.AddInt32(&started, 1)
time.Sleep(1 * time.Second)
atomic.AddInt32(&stopped, 1)
return nil
})

var enqueuer = work.NewEnqueuer(ns, pool)

for i := 0; i <= num_iters; i++ {
enqueuer.Enqueue("sample_job", work.Q{})
}

// Start the pool and quit before it has had a chance to complete
// all the jobs.
wp.Start()
time.Sleep(5 * time.Second)
wp.Stop()

if started != stopped {
t.Errorf("Expected that jobs were finished and not killed while processing (started=%d, stopped=%d)", started, stopped)
}

if started >= int32(num_iters) {
t.Errorf("Expected that jobs queue was not completely emptied.")
}
}

0 comments on commit d3a863d

Please sign in to comment.