From d91c48b8a041f3669b25f4b0507cf9e7ffe6ae68 Mon Sep 17 00:00:00 2001 From: Shaun Dunning Date: Tue, 20 Jun 2017 16:23:22 -0400 Subject: [PATCH] Cleanup dead worker pools with no heartbeat (#51) --- dead_pool_reaper.go | 17 +++++++++-------- dead_pool_reaper_test.go | 18 ++++++++++++++---- observer.go | 2 +- worker_pool.go | 15 ++++----------- 4 files changed, 28 insertions(+), 24 deletions(-) diff --git a/dead_pool_reaper.go b/dead_pool_reaper.go index 2d11fa03..8ec3f864 100644 --- a/dead_pool_reaper.go +++ b/dead_pool_reaper.go @@ -86,16 +86,15 @@ func (r *deadPoolReaper) reap() error { // Cleanup all dead pools for deadPoolID, jobTypes := range deadPoolIDs { - // Requeue all dangling jobs - r.requeueInProgressJobs(deadPoolID, jobTypes) - - // Remove hearbeat - _, err = conn.Do("DEL", redisKeyHeartbeat(r.namespace, deadPoolID)) - if err != nil { - return err + // if we found jobs from the heartbeat, requeue them and remove the heartbeat + if len(jobTypes) > 0 { + r.requeueInProgressJobs(deadPoolID, jobTypes) + if _, err = conn.Do("DEL", redisKeyHeartbeat(r.namespace, deadPoolID)); err != nil { + return err + } } - // Remove from set + // Remove dead pool from worker pools set _, err = conn.Do("SREM", workerPoolsKey, deadPoolID) if err != nil { return err @@ -151,6 +150,8 @@ func (r *deadPoolReaper) findDeadPools() (map[string][]string, error) { // Check that last heartbeat was long enough ago to consider the pool dead heartbeatAt, err := redis.Int64(conn.Do("HGET", heartbeatKey, "heartbeat_at")) if err == redis.ErrNil { + // dead pool with no heartbeat + deadPools[workerPoolID] = []string{} continue } if err != nil { diff --git a/dead_pool_reaper_test.go b/dead_pool_reaper_test.go index c677a693..c4bc38e2 100644 --- a/dead_pool_reaper_test.go +++ b/dead_pool_reaper_test.go @@ -109,11 +109,11 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) { err = conn.Flush() assert.NoError(t, err) - // Test getting dead pool + // Test getting dead pool ids reaper := newDeadPoolReaper(ns, pool) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) - assert.Equal(t, deadPools, map[string][]string{}) + assert.Equal(t, deadPools, map[string][]string{"1": {}, "2": {}, "3": {}}) // Test requeueing jobs _, err = conn.Do("lpush", redisKeyJobsInProgress(ns, "2", "type1"), "foo") @@ -129,19 +129,29 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 1, jobsCount) + // Ensure dead worker pools still in the set + jobsCount, err = redis.Int(conn.Do("scard", redisKeyWorkerPools(ns))) + assert.NoError(t, err) + assert.Equal(t, 3, jobsCount) + // Reap err = reaper.reap() assert.NoError(t, err) - // Ensure 0 jobs in jobs queue + // Ensure jobs queue was not altered jobsCount, err = redis.Int(conn.Do("llen", redisKeyJobs(ns, "type1"))) assert.NoError(t, err) assert.Equal(t, 0, jobsCount) - // Ensure 1 job in inprogress queue + // Ensure inprogress queue was not altered jobsCount, err = redis.Int(conn.Do("llen", redisKeyJobsInProgress(ns, "2", "type1"))) assert.NoError(t, err) assert.Equal(t, 1, jobsCount) + + // Ensure dead worker pools were removed from the set + jobsCount, err = redis.Int(conn.Do("scard", redisKeyWorkerPools(ns))) + assert.NoError(t, err) + assert.Equal(t, 0, jobsCount) } func TestDeadPoolReaperNoJobTypes(t *testing.T) { diff --git a/observer.go b/observer.go index d92c74e5..5ddc0c53 100644 --- a/observer.go +++ b/observer.go @@ -120,7 +120,7 @@ func (o *observer) observeCheckin(jobName, jobID, checkin string) { } func (o *observer) loop() { - // Ever tick, we'll update redis if necessary + // Every tick we'll update redis if necessary // We don't update it on every job because the only purpose of this data is for humans to inspect the system, // and a fast worker could move onto new jobs every few ms. ticker := time.Tick(1000 * time.Millisecond) diff --git a/worker_pool.go b/worker_pool.go index 08e3d652..04f19c9a 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -69,7 +69,8 @@ type middlewareHandler struct { GenericMiddlewareHandler GenericMiddlewareHandler } -// NewWorkerPool creates a new worker pool. ctx should be a struct literal whose type will be used for middleware and handlers. concurrency specifies how many workers to spin up - each worker can process jobs concurrently. +// NewWorkerPool creates a new worker pool. ctx should be a struct literal whose type will be used for middleware and handlers. +// concurrency specifies how many workers to spin up - each worker can process jobs concurrently. func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, pool *redis.Pool) *WorkerPool { if pool == nil { panic("NewWorkerPool needs a non-nil *redis.Pool") @@ -127,7 +128,8 @@ func (wp *WorkerPool) Job(name string, fn interface{}) *WorkerPool { return wp.JobWithOptions(name, JobOptions{}, fn) } -// JobWithOptions adds a handler for 'name' jobs as per the Job function, but permits you specify additional options such as a job's priority, retry count, and whether to send dead jobs to the dead job queue or trash them. +// JobWithOptions adds a handler for 'name' jobs as per the Job function, but permits you specify additional options +// such as a job's priority, retry count, and whether to send dead jobs to the dead job queue or trash them. func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn interface{}) *WorkerPool { jobOpts = applyDefaultsAndValidate(jobOpts) @@ -280,15 +282,6 @@ func (wp *WorkerPool) writeConcurrencyControlsToRedis() { } } -func newJobTypeGeneric(name string, opts JobOptions, handler GenericHandler) *jobType { - return &jobType{ - Name: name, - JobOptions: opts, - IsGeneric: true, - GenericHandler: handler, - } -} - // validateContextType will panic if context is invalid func validateContextType(ctxType reflect.Type) { if ctxType.Kind() != reflect.Struct {