diff --git a/go.mod b/go.mod index e09db58f..bc63c5ea 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.14 require ( github.com/albrow/jobs v0.4.2 - github.com/alicebob/miniredis/v2 v2.14.3 // indirect + github.com/alicebob/miniredis/v2 v2.14.3 github.com/benmanns/goworker v0.1.3 github.com/bitly/go-simplejson v0.5.0 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect diff --git a/worker_pool.go b/worker_pool.go index d83c7b79..159af7b9 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -230,6 +230,20 @@ func (wp *WorkerPool) Stop() { }(w) } wg.Wait() + jobTypes := make([]string, 0, len(wp.jobTypes)) + for k := range wp.jobTypes { + jobTypes = append(jobTypes, k) + } + + err := wp.deadPoolReaper.requeueInProgressJobs(wp.workerPoolID, jobTypes) + if err != nil { + logError("dead_pool_reaper.requeue_in_progress_jobs", err) + } + + err = wp.deadPoolReaper.cleanStaleLockInfo(wp.workerPoolID, jobTypes) + if err != nil { + logError("dead_pool_reaper.clean_stale_lock_info", err) + } wp.heartbeater.stop() wp.retrier.stop() wp.scheduler.stop() diff --git a/worker_pool_test.go b/worker_pool_test.go index 2aec6008..feef4ff4 100644 --- a/worker_pool_test.go +++ b/worker_pool_test.go @@ -154,7 +154,7 @@ func TestWorkersPoolRunSingleThreaded(t *testing.T) { assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1))) assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1))) assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, job1))) - assert.EqualValues(t, 0, hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID)) + assert.False(t, hexists(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID)) } func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { @@ -203,7 +203,7 @@ func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1))) assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1))) assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, job1))) - assert.EqualValues(t, 0, hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID)) + assert.False(t, hexists(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID)) } // Test Helpers diff --git a/worker_test.go b/worker_test.go index baa7ab56..a0c2fdd0 100644 --- a/worker_test.go +++ b/worker_test.go @@ -366,14 +366,10 @@ func TestWorkersPaused(t *testing.T) { // Test that in the case of an unavailable Redis server, // the worker loop exits in the case of a WorkerPool.Stop -func TestStop(t *testing.T) { +func TestStopWithUnavailableRedis(t *testing.T) { redisPool := &redis.Pool{ Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", "notworking:6379", redis.DialConnectTimeout(1*time.Second)) - if err != nil { - return nil, err - } - return c, nil + return redis.Dial("tcp", "notworking:6379", redis.DialConnectTimeout(1*time.Second)) }, } wp := NewWorkerPool(TestContext{}, 10, "work", redisPool) @@ -381,6 +377,73 @@ func TestStop(t *testing.T) { wp.Stop() } +func TestStop(t *testing.T) { + redisPool := newTestPool(t) + + namespace := "work" + wp := NewWorkerPool(TestContext{}, 10, namespace, redisPool) + wp.Start() + wp.Stop() + + // verify cleanup of heartbeat + conn := redisPool.Get() + ok, err := redis.Bool(conn.Do("SISMEMBER", redisKeyWorkerPools(namespace), wp.workerPoolID)) + assert.Nil(t, err) + assert.False(t, ok) + + ok, err = redis.Bool(conn.Do("EXISTS", redisKeyHeartbeat(namespace, wp.workerPoolID))) + assert.Nil(t, err) + assert.False(t, ok) +} + +func TestStopCleanup(t *testing.T) { + redisPool := newTestPool(t) + + namespace := "work" + jobType := "dummyJob" + jobData := `{"name":"dummyJob","id":"40a2206be652914611c777f4","t":1614838373,"args":{"key":0}}` + + wp := NewWorkerPool(TestContext{}, 10, namespace, redisPool) + wp.JobWithOptions(jobType, JobOptions{MaxConcurrency: 10}, func(job *Job) error { + return nil + }) + wp.Start() + + // We are trying to simulate a scenario where there is loss of data which + // can happen when there is a failover on a Sentinel cluster. In such a + // situation, there will be jobs which will appear to be in progress from + // the perspective of redis but won't actually be getting executed in + // workers. The following redis commands are equivalent to a job in + // progress. + conn := redisPool.Get() + err := conn.Send("LPUSH", redisKeyJobsInProgress(namespace, wp.workerPoolID, jobType), jobData) + assert.NoError(t, err) + err = conn.Send("SET", redisKeyJobsLock(namespace, jobType), 1) + assert.NoError(t, err) + err = conn.Send("HSET", redisKeyJobsLockInfo(namespace, jobType), wp.workerPoolID, 1) + assert.NoError(t, err) + err = conn.Flush() + assert.NoError(t, err) + + wp.Stop() + + jobsInProgress, err := redis.Strings(conn.Do("LRANGE", redisKeyJobsInProgress(namespace, wp.workerPoolID, jobType), 0, -1)) + assert.NoError(t, err) + assert.Empty(t, jobsInProgress) + + lockCount, err := redis.Int(conn.Do("GET", redisKeyJobsLock(namespace, jobType))) + assert.NoError(t, err) + assert.Equal(t, 0, lockCount) + + ok, err := redis.Bool(conn.Do("HEXISTS", redisKeyJobsLockInfo(namespace, jobType), wp.workerPoolID)) + assert.NoError(t, err) + assert.False(t, ok) + + jobQueue, err := redis.Strings(conn.Do("LRANGE", redisKeyJobs(namespace, jobType), 0, -1)) + assert.NoError(t, err) + assert.Equal(t, []string{jobData}, jobQueue) +} + func BenchmarkJobProcessing(b *testing.B) { pool := newTestPool(b) ns := "work" @@ -488,6 +551,17 @@ func hgetInt64(pool *redis.Pool, redisKey, hashKey string) int64 { return v } +func hexists(pool *redis.Pool, redisKey, hashKey string) bool { + conn := pool.Get() + defer conn.Close() + + ok, err := redis.Bool(conn.Do("HEXISTS", redisKey, hashKey)) + if err != nil { + panic("could not HEXISTS: " + err.Error()) + } + return ok +} + func jobOnZset(pool *redis.Pool, key string) (int64, *Job) { conn := pool.Get() defer conn.Close()