Skip to content

Commit

Permalink
Requeue in progress jobs and clean stale lock info on stop (#1)
Browse files Browse the repository at this point in the history
In case there is a failover with a Redis Sentinel cluster with data loss
there can be stale lock information which can cause job processing to be
stuck if the max concurrency limit is reached. Therefore, re-enqueue the
jobs which were in progress and also clean the stale lock info for the
worker pool.

For the cleanup to be thorough, the stop would need to be called on each
worker pool instance.

Co-authored-by: Kavish Gambhir <kavish.gambhir@go-jek.com>
Co-authored-by: Suhas Karanth <sudo.suhas@gmail.com>
  • Loading branch information
3 people committed Mar 9, 2021
1 parent 6d45b29 commit 3288c13
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 9 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.14

require (
github.com/albrow/jobs v0.4.2
github.com/alicebob/miniredis/v2 v2.14.3 // indirect
github.com/alicebob/miniredis/v2 v2.14.3
github.com/benmanns/goworker v0.1.3
github.com/bitly/go-simplejson v0.5.0 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
Expand Down
14 changes: 14 additions & 0 deletions worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,20 @@ func (wp *WorkerPool) Stop() {
}(w)
}
wg.Wait()
jobTypes := make([]string, 0, len(wp.jobTypes))
for k := range wp.jobTypes {
jobTypes = append(jobTypes, k)
}

err := wp.deadPoolReaper.requeueInProgressJobs(wp.workerPoolID, jobTypes)
if err != nil {
logError("dead_pool_reaper.requeue_in_progress_jobs", err)
}

err = wp.deadPoolReaper.cleanStaleLockInfo(wp.workerPoolID, jobTypes)
if err != nil {
logError("dead_pool_reaper.clean_stale_lock_info", err)
}
wp.heartbeater.stop()
wp.retrier.stop()
wp.scheduler.stop()
Expand Down
4 changes: 2 additions & 2 deletions worker_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestWorkersPoolRunSingleThreaded(t *testing.T) {
assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1)))
assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1)))
assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, job1)))
assert.EqualValues(t, 0, hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID))
assert.False(t, hexists(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID))
}

func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) {
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) {
assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1)))
assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1)))
assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, job1)))
assert.EqualValues(t, 0, hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID))
assert.False(t, hexists(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID))
}

// Test Helpers
Expand Down
86 changes: 80 additions & 6 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,21 +366,84 @@ func TestWorkersPaused(t *testing.T) {

// Test that in the case of an unavailable Redis server,
// the worker loop exits in the case of a WorkerPool.Stop
func TestStop(t *testing.T) {
func TestStopWithUnavailableRedis(t *testing.T) {
redisPool := &redis.Pool{
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", "notworking:6379", redis.DialConnectTimeout(1*time.Second))
if err != nil {
return nil, err
}
return c, nil
return redis.Dial("tcp", "notworking:6379", redis.DialConnectTimeout(1*time.Second))
},
}
wp := NewWorkerPool(TestContext{}, 10, "work", redisPool)
wp.Start()
wp.Stop()
}

func TestStop(t *testing.T) {
redisPool := newTestPool(t)

namespace := "work"
wp := NewWorkerPool(TestContext{}, 10, namespace, redisPool)
wp.Start()
wp.Stop()

// verify cleanup of heartbeat
conn := redisPool.Get()
ok, err := redis.Bool(conn.Do("SISMEMBER", redisKeyWorkerPools(namespace), wp.workerPoolID))
assert.Nil(t, err)
assert.False(t, ok)

ok, err = redis.Bool(conn.Do("EXISTS", redisKeyHeartbeat(namespace, wp.workerPoolID)))
assert.Nil(t, err)
assert.False(t, ok)
}

func TestStopCleanup(t *testing.T) {
redisPool := newTestPool(t)

namespace := "work"
jobType := "dummyJob"
jobData := `{"name":"dummyJob","id":"40a2206be652914611c777f4","t":1614838373,"args":{"key":0}}`

wp := NewWorkerPool(TestContext{}, 10, namespace, redisPool)
wp.JobWithOptions(jobType, JobOptions{MaxConcurrency: 10}, func(job *Job) error {
return nil
})
wp.Start()

// We are trying to simulate a scenario where there is loss of data which
// can happen when there is a failover on a Sentinel cluster. In such a
// situation, there will be jobs which will appear to be in progress from
// the perspective of redis but won't actually be getting executed in
// workers. The following redis commands are equivalent to a job in
// progress.
conn := redisPool.Get()
err := conn.Send("LPUSH", redisKeyJobsInProgress(namespace, wp.workerPoolID, jobType), jobData)
assert.NoError(t, err)
err = conn.Send("SET", redisKeyJobsLock(namespace, jobType), 1)
assert.NoError(t, err)
err = conn.Send("HSET", redisKeyJobsLockInfo(namespace, jobType), wp.workerPoolID, 1)
assert.NoError(t, err)
err = conn.Flush()
assert.NoError(t, err)

wp.Stop()

jobsInProgress, err := redis.Strings(conn.Do("LRANGE", redisKeyJobsInProgress(namespace, wp.workerPoolID, jobType), 0, -1))
assert.NoError(t, err)
assert.Empty(t, jobsInProgress)

lockCount, err := redis.Int(conn.Do("GET", redisKeyJobsLock(namespace, jobType)))
assert.NoError(t, err)
assert.Equal(t, 0, lockCount)

ok, err := redis.Bool(conn.Do("HEXISTS", redisKeyJobsLockInfo(namespace, jobType), wp.workerPoolID))
assert.NoError(t, err)
assert.False(t, ok)

jobQueue, err := redis.Strings(conn.Do("LRANGE", redisKeyJobs(namespace, jobType), 0, -1))
assert.NoError(t, err)
assert.Equal(t, []string{jobData}, jobQueue)
}

func BenchmarkJobProcessing(b *testing.B) {
pool := newTestPool(b)
ns := "work"
Expand Down Expand Up @@ -488,6 +551,17 @@ func hgetInt64(pool *redis.Pool, redisKey, hashKey string) int64 {
return v
}

func hexists(pool *redis.Pool, redisKey, hashKey string) bool {
conn := pool.Get()
defer conn.Close()

ok, err := redis.Bool(conn.Do("HEXISTS", redisKey, hashKey))
if err != nil {
panic("could not HEXISTS: " + err.Error())
}
return ok
}

func jobOnZset(pool *redis.Pool, key string) (int64, *Job) {
conn := pool.Get()
defer conn.Close()
Expand Down

0 comments on commit 3288c13

Please sign in to comment.