From 3288c1347b1ef5d9f6a76375fc29d73d6cdac167 Mon Sep 17 00:00:00 2001 From: Kavish Gambhir <33604147+kavishgambhir@users.noreply.github.com> Date: Tue, 9 Mar 2021 11:08:41 +0530 Subject: [PATCH] Requeue in progress jobs and clean stale lock info on stop (#1) In case there is a failover with a Redis Sentinel cluster with data loss there can be stale lock information which can cause job processing to be stuck if the max concurrency limit is reached. Therefore, re-enqueue the jobs which were in progress and also clean the stale lock info for the worker pool. For the cleanup to be thorough, the stop would need to be called on each worker pool instance. Co-authored-by: Kavish Gambhir Co-authored-by: Suhas Karanth --- go.mod | 2 +- worker_pool.go | 14 ++++++++ worker_pool_test.go | 4 +-- worker_test.go | 86 +++++++++++++++++++++++++++++++++++++++++---- 4 files changed, 97 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index e09db58f..bc63c5ea 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.14 require ( github.com/albrow/jobs v0.4.2 - github.com/alicebob/miniredis/v2 v2.14.3 // indirect + github.com/alicebob/miniredis/v2 v2.14.3 github.com/benmanns/goworker v0.1.3 github.com/bitly/go-simplejson v0.5.0 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect diff --git a/worker_pool.go b/worker_pool.go index d83c7b79..159af7b9 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -230,6 +230,20 @@ func (wp *WorkerPool) Stop() { }(w) } wg.Wait() + jobTypes := make([]string, 0, len(wp.jobTypes)) + for k := range wp.jobTypes { + jobTypes = append(jobTypes, k) + } + + err := wp.deadPoolReaper.requeueInProgressJobs(wp.workerPoolID, jobTypes) + if err != nil { + logError("dead_pool_reaper.requeue_in_progress_jobs", err) + } + + err = wp.deadPoolReaper.cleanStaleLockInfo(wp.workerPoolID, jobTypes) + if err != nil { + logError("dead_pool_reaper.clean_stale_lock_info", err) + } wp.heartbeater.stop() wp.retrier.stop() wp.scheduler.stop() diff --git a/worker_pool_test.go b/worker_pool_test.go index 2aec6008..feef4ff4 100644 --- a/worker_pool_test.go +++ b/worker_pool_test.go @@ -154,7 +154,7 @@ func TestWorkersPoolRunSingleThreaded(t *testing.T) { assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1))) assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1))) assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, job1))) - assert.EqualValues(t, 0, hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID)) + assert.False(t, hexists(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID)) } func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { @@ -203,7 +203,7 @@ func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1))) assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1))) assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, job1))) - assert.EqualValues(t, 0, hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID)) + assert.False(t, hexists(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID)) } // Test Helpers diff --git a/worker_test.go b/worker_test.go index baa7ab56..a0c2fdd0 100644 --- a/worker_test.go +++ b/worker_test.go @@ -366,14 +366,10 @@ func TestWorkersPaused(t *testing.T) { // Test that in the case of an unavailable Redis server, // the worker loop exits in the case of a WorkerPool.Stop -func TestStop(t *testing.T) { +func TestStopWithUnavailableRedis(t *testing.T) { redisPool := &redis.Pool{ Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", "notworking:6379", redis.DialConnectTimeout(1*time.Second)) - if err != nil { - return nil, err - } - return c, nil + return redis.Dial("tcp", "notworking:6379", redis.DialConnectTimeout(1*time.Second)) }, } wp := NewWorkerPool(TestContext{}, 10, "work", redisPool) @@ -381,6 +377,73 @@ func TestStop(t *testing.T) { wp.Stop() } +func TestStop(t *testing.T) { + redisPool := newTestPool(t) + + namespace := "work" + wp := NewWorkerPool(TestContext{}, 10, namespace, redisPool) + wp.Start() + wp.Stop() + + // verify cleanup of heartbeat + conn := redisPool.Get() + ok, err := redis.Bool(conn.Do("SISMEMBER", redisKeyWorkerPools(namespace), wp.workerPoolID)) + assert.Nil(t, err) + assert.False(t, ok) + + ok, err = redis.Bool(conn.Do("EXISTS", redisKeyHeartbeat(namespace, wp.workerPoolID))) + assert.Nil(t, err) + assert.False(t, ok) +} + +func TestStopCleanup(t *testing.T) { + redisPool := newTestPool(t) + + namespace := "work" + jobType := "dummyJob" + jobData := `{"name":"dummyJob","id":"40a2206be652914611c777f4","t":1614838373,"args":{"key":0}}` + + wp := NewWorkerPool(TestContext{}, 10, namespace, redisPool) + wp.JobWithOptions(jobType, JobOptions{MaxConcurrency: 10}, func(job *Job) error { + return nil + }) + wp.Start() + + // We are trying to simulate a scenario where there is loss of data which + // can happen when there is a failover on a Sentinel cluster. In such a + // situation, there will be jobs which will appear to be in progress from + // the perspective of redis but won't actually be getting executed in + // workers. The following redis commands are equivalent to a job in + // progress. + conn := redisPool.Get() + err := conn.Send("LPUSH", redisKeyJobsInProgress(namespace, wp.workerPoolID, jobType), jobData) + assert.NoError(t, err) + err = conn.Send("SET", redisKeyJobsLock(namespace, jobType), 1) + assert.NoError(t, err) + err = conn.Send("HSET", redisKeyJobsLockInfo(namespace, jobType), wp.workerPoolID, 1) + assert.NoError(t, err) + err = conn.Flush() + assert.NoError(t, err) + + wp.Stop() + + jobsInProgress, err := redis.Strings(conn.Do("LRANGE", redisKeyJobsInProgress(namespace, wp.workerPoolID, jobType), 0, -1)) + assert.NoError(t, err) + assert.Empty(t, jobsInProgress) + + lockCount, err := redis.Int(conn.Do("GET", redisKeyJobsLock(namespace, jobType))) + assert.NoError(t, err) + assert.Equal(t, 0, lockCount) + + ok, err := redis.Bool(conn.Do("HEXISTS", redisKeyJobsLockInfo(namespace, jobType), wp.workerPoolID)) + assert.NoError(t, err) + assert.False(t, ok) + + jobQueue, err := redis.Strings(conn.Do("LRANGE", redisKeyJobs(namespace, jobType), 0, -1)) + assert.NoError(t, err) + assert.Equal(t, []string{jobData}, jobQueue) +} + func BenchmarkJobProcessing(b *testing.B) { pool := newTestPool(b) ns := "work" @@ -488,6 +551,17 @@ func hgetInt64(pool *redis.Pool, redisKey, hashKey string) int64 { return v } +func hexists(pool *redis.Pool, redisKey, hashKey string) bool { + conn := pool.Get() + defer conn.Close() + + ok, err := redis.Bool(conn.Do("HEXISTS", redisKey, hashKey)) + if err != nil { + panic("could not HEXISTS: " + err.Error()) + } + return ok +} + func jobOnZset(pool *redis.Pool, key string) (int64, *Job) { conn := pool.Get() defer conn.Close()