Skip to content

Commit

Permalink
Cleanup dead worker pools with no heartbeat (gocraft#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
shdunning committed Jun 20, 2017
1 parent b6bd24c commit d91c48b
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 24 deletions.
17 changes: 9 additions & 8 deletions dead_pool_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,15 @@ func (r *deadPoolReaper) reap() error {

// Cleanup all dead pools
for deadPoolID, jobTypes := range deadPoolIDs {
// Requeue all dangling jobs
r.requeueInProgressJobs(deadPoolID, jobTypes)

// Remove hearbeat
_, err = conn.Do("DEL", redisKeyHeartbeat(r.namespace, deadPoolID))
if err != nil {
return err
// if we found jobs from the heartbeat, requeue them and remove the heartbeat
if len(jobTypes) > 0 {
r.requeueInProgressJobs(deadPoolID, jobTypes)
if _, err = conn.Do("DEL", redisKeyHeartbeat(r.namespace, deadPoolID)); err != nil {
return err
}
}

// Remove from set
// Remove dead pool from worker pools set
_, err = conn.Do("SREM", workerPoolsKey, deadPoolID)
if err != nil {
return err
Expand Down Expand Up @@ -151,6 +150,8 @@ func (r *deadPoolReaper) findDeadPools() (map[string][]string, error) {
// Check that last heartbeat was long enough ago to consider the pool dead
heartbeatAt, err := redis.Int64(conn.Do("HGET", heartbeatKey, "heartbeat_at"))
if err == redis.ErrNil {
// dead pool with no heartbeat
deadPools[workerPoolID] = []string{}
continue
}
if err != nil {
Expand Down
18 changes: 14 additions & 4 deletions dead_pool_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) {
err = conn.Flush()
assert.NoError(t, err)

// Test getting dead pool
// Test getting dead pool ids
reaper := newDeadPoolReaper(ns, pool)
deadPools, err := reaper.findDeadPools()
assert.NoError(t, err)
assert.Equal(t, deadPools, map[string][]string{})
assert.Equal(t, deadPools, map[string][]string{"1": {}, "2": {}, "3": {}})

// Test requeueing jobs
_, err = conn.Do("lpush", redisKeyJobsInProgress(ns, "2", "type1"), "foo")
Expand All @@ -129,19 +129,29 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, jobsCount)

// Ensure dead worker pools still in the set
jobsCount, err = redis.Int(conn.Do("scard", redisKeyWorkerPools(ns)))
assert.NoError(t, err)
assert.Equal(t, 3, jobsCount)

// Reap
err = reaper.reap()
assert.NoError(t, err)

// Ensure 0 jobs in jobs queue
// Ensure jobs queue was not altered
jobsCount, err = redis.Int(conn.Do("llen", redisKeyJobs(ns, "type1")))
assert.NoError(t, err)
assert.Equal(t, 0, jobsCount)

// Ensure 1 job in inprogress queue
// Ensure inprogress queue was not altered
jobsCount, err = redis.Int(conn.Do("llen", redisKeyJobsInProgress(ns, "2", "type1")))
assert.NoError(t, err)
assert.Equal(t, 1, jobsCount)

// Ensure dead worker pools were removed from the set
jobsCount, err = redis.Int(conn.Do("scard", redisKeyWorkerPools(ns)))
assert.NoError(t, err)
assert.Equal(t, 0, jobsCount)
}

func TestDeadPoolReaperNoJobTypes(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (o *observer) observeCheckin(jobName, jobID, checkin string) {
}

func (o *observer) loop() {
// Ever tick, we'll update redis if necessary
// Every tick we'll update redis if necessary
// We don't update it on every job because the only purpose of this data is for humans to inspect the system,
// and a fast worker could move onto new jobs every few ms.
ticker := time.Tick(1000 * time.Millisecond)
Expand Down
15 changes: 4 additions & 11 deletions worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ type middlewareHandler struct {
GenericMiddlewareHandler GenericMiddlewareHandler
}

// NewWorkerPool creates a new worker pool. ctx should be a struct literal whose type will be used for middleware and handlers. concurrency specifies how many workers to spin up - each worker can process jobs concurrently.
// NewWorkerPool creates a new worker pool. ctx should be a struct literal whose type will be used for middleware and handlers.
// concurrency specifies how many workers to spin up - each worker can process jobs concurrently.
func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, pool *redis.Pool) *WorkerPool {
if pool == nil {
panic("NewWorkerPool needs a non-nil *redis.Pool")
Expand Down Expand Up @@ -127,7 +128,8 @@ func (wp *WorkerPool) Job(name string, fn interface{}) *WorkerPool {
return wp.JobWithOptions(name, JobOptions{}, fn)
}

// JobWithOptions adds a handler for 'name' jobs as per the Job function, but permits you specify additional options such as a job's priority, retry count, and whether to send dead jobs to the dead job queue or trash them.
// JobWithOptions adds a handler for 'name' jobs as per the Job function, but permits you specify additional options
// such as a job's priority, retry count, and whether to send dead jobs to the dead job queue or trash them.
func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn interface{}) *WorkerPool {
jobOpts = applyDefaultsAndValidate(jobOpts)

Expand Down Expand Up @@ -280,15 +282,6 @@ func (wp *WorkerPool) writeConcurrencyControlsToRedis() {
}
}

func newJobTypeGeneric(name string, opts JobOptions, handler GenericHandler) *jobType {
return &jobType{
Name: name,
JobOptions: opts,
IsGeneric: true,
GenericHandler: handler,
}
}

// validateContextType will panic if context is invalid
func validateContextType(ctxType reflect.Type) {
if ctxType.Kind() != reflect.Struct {
Expand Down

0 comments on commit d91c48b

Please sign in to comment.