Skip to content

Commit

Permalink
Reap stale locks (gocraft#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
shdunning committed Jun 22, 2017
1 parent d91c48b commit e8fb31a
Show file tree
Hide file tree
Showing 9 changed files with 309 additions and 105 deletions.
64 changes: 46 additions & 18 deletions dead_pool_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,30 @@ import (
)

const (
deadTime = 5 * time.Minute
reapPeriod = 10 * time.Minute
reapJitterSecs = 30
deadTime = 5 * time.Minute
reapPeriod = 10 * time.Minute
reapJitterSecs = 30
requeueKeysPerJob = 4
)

type deadPoolReaper struct {
namespace string
pool *redis.Pool
deadTime time.Duration
reapPeriod time.Duration
namespace string
pool *redis.Pool
deadTime time.Duration
reapPeriod time.Duration
curJobTypes []string

stopChan chan struct{}
doneStoppingChan chan struct{}
}

func newDeadPoolReaper(namespace string, pool *redis.Pool) *deadPoolReaper {
func newDeadPoolReaper(namespace string, pool *redis.Pool, curJobTypes []string) *deadPoolReaper {
return &deadPoolReaper{
namespace: namespace,
pool: pool,
deadTime: deadTime,
reapPeriod: reapPeriod,
curJobTypes: curJobTypes,
stopChan: make(chan struct{}),
doneStoppingChan: make(chan struct{}),
}
Expand Down Expand Up @@ -86,33 +89,59 @@ func (r *deadPoolReaper) reap() error {

// Cleanup all dead pools
for deadPoolID, jobTypes := range deadPoolIDs {
lockJobTypes := jobTypes
// if we found jobs from the heartbeat, requeue them and remove the heartbeat
if len(jobTypes) > 0 {
r.requeueInProgressJobs(deadPoolID, jobTypes)
if _, err = conn.Do("DEL", redisKeyHeartbeat(r.namespace, deadPoolID)); err != nil {
return err
}
} else {
// try to clean up locks for the current set of jobs if heartbeat was not found
lockJobTypes = r.curJobTypes
}

// Remove dead pool from worker pools set
_, err = conn.Do("SREM", workerPoolsKey, deadPoolID)
if err != nil {
if _, err = conn.Do("SREM", workerPoolsKey, deadPoolID); err != nil {
return err
}
// Cleanup any stale lock info
if err = r.cleanStaleLockInfo(deadPoolID, lockJobTypes); err != nil {
return err
}
}

return nil
}

func (r *deadPoolReaper) cleanStaleLockInfo(poolID string, jobTypes []string) error {
numKeys := len(jobTypes) * 2
redisReapLocksScript := redis.NewScript(numKeys, redisLuaReapStaleLocks)
var scriptArgs = make([]interface{}, 0, numKeys+1) // +1 for argv[1]

for _, jobType := range jobTypes {
scriptArgs = append(scriptArgs, redisKeyJobsLock(r.namespace, jobType), redisKeyJobsLockInfo(r.namespace, jobType))
}
scriptArgs = append(scriptArgs, poolID) // ARGV[1]

conn := r.pool.Get()
defer conn.Close()
if _, err := redisReapLocksScript.Do(conn, scriptArgs...); err != nil {
return err
}

return nil
}

func (r *deadPoolReaper) requeueInProgressJobs(poolID string, jobTypes []string) error {
numArgs := len(jobTypes) * 2
redisRequeueScript := redis.NewScript(numArgs, redisLuaReenqueueJob)
var scriptArgs = make([]interface{}, 0, numArgs)
numKeys := len(jobTypes) * requeueKeysPerJob
redisRequeueScript := redis.NewScript(numKeys, redisLuaReenqueueJob)
var scriptArgs = make([]interface{}, 0, numKeys+1)

for _, jobType := range jobTypes {
// pops from in progress, push into job queue and decrement the queue lock
scriptArgs = append(scriptArgs, redisKeyJobsInProgress(r.namespace, poolID, jobType), redisKeyJobs(r.namespace, jobType))
scriptArgs = append(scriptArgs, redisKeyJobsInProgress(r.namespace, poolID, jobType), redisKeyJobs(r.namespace, jobType), redisKeyJobsLock(r.namespace, jobType), redisKeyJobsLockInfo(r.namespace, jobType)) // KEYS[1-4 * N]
}
scriptArgs = append(scriptArgs, poolID) // ARGV[1]

conn := r.pool.Get()
defer conn.Close()
Expand Down Expand Up @@ -146,18 +175,17 @@ func (r *deadPoolReaper) findDeadPools() (map[string][]string, error) {
deadPools := map[string][]string{}
for _, workerPoolID := range workerPoolIDs {
heartbeatKey := redisKeyHeartbeat(r.namespace, workerPoolID)

// Check that last heartbeat was long enough ago to consider the pool dead
heartbeatAt, err := redis.Int64(conn.Do("HGET", heartbeatKey, "heartbeat_at"))
if err == redis.ErrNil {
// dead pool with no heartbeat
// heartbeat expired, save dead pool and use cur set of jobs from reaper
deadPools[workerPoolID] = []string{}
continue
}
if err != nil {
return nil, err
}

// Check that last heartbeat was long enough ago to consider the pool dead
if time.Unix(heartbeatAt, 0).Add(r.deadTime).After(time.Now()) {
continue
}
Expand Down
88 changes: 79 additions & 9 deletions dead_pool_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestDeadPoolReaper(t *testing.T) {
assert.NoError(t, err)

// Test getting dead pool
reaper := newDeadPoolReaper(ns, pool)
reaper := newDeadPoolReaper(ns, pool, []string{})
deadPools, err := reaper.findDeadPools()
assert.NoError(t, err)
assert.Equal(t, map[string][]string{"2": {"type1", "type2"}, "3": {"type1", "type2"}}, deadPools)
Expand All @@ -57,6 +57,8 @@ func TestDeadPoolReaper(t *testing.T) {
assert.NoError(t, err)
_, err = conn.Do("incr", redisKeyJobsLock(ns, "type1"))
assert.NoError(t, err)
_, err = conn.Do("hincrby", redisKeyJobsLockInfo(ns, "type1"), "2", 1) // worker pool 2 has lock
assert.NoError(t, err)

// Ensure 0 jobs in jobs queue
jobsCount, err := redis.Int(conn.Do("llen", redisKeyJobs(ns, "type1")))
Expand All @@ -82,10 +84,10 @@ func TestDeadPoolReaper(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 0, jobsCount)

// Lock count should get decremented
lockCount, err := redis.Int(conn.Do("get", redisKeyJobsLock(ns, "type1")))
assert.NoError(t, err)
assert.Equal(t, 0, lockCount)
// Locks should get cleaned up
assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, "type1")))
v, _ := conn.Do("HGET", redisKeyJobsLockInfo(ns, "type1"), "2")
assert.Nil(t, v)
}

func TestDeadPoolReaperNoHeartbeat(t *testing.T) {
Expand All @@ -106,14 +108,23 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) {
assert.NoError(t, err)
err = conn.Send("SADD", workerPoolsKey, "3")
assert.NoError(t, err)
// stale lock info
err = conn.Send("SET", redisKeyJobsLock(ns, "type1"), 3)
assert.NoError(t, err)
err = conn.Send("HSET", redisKeyJobsLockInfo(ns, "type1"), "1", 1)
assert.NoError(t, err)
err = conn.Send("HSET", redisKeyJobsLockInfo(ns, "type1"), "2", 1)
assert.NoError(t, err)
err = conn.Send("HSET", redisKeyJobsLockInfo(ns, "type1"), "3", 1)
assert.NoError(t, err)
err = conn.Flush()
assert.NoError(t, err)

// Test getting dead pool ids
reaper := newDeadPoolReaper(ns, pool)
reaper := newDeadPoolReaper(ns, pool, []string{"type1"})
deadPools, err := reaper.findDeadPools()
assert.NoError(t, err)
assert.Equal(t, deadPools, map[string][]string{"1": {}, "2": {}, "3": {}})
assert.Equal(t, map[string][]string{"1": {}, "2": {}, "3": {}}, deadPools)

// Test requeueing jobs
_, err = conn.Do("lpush", redisKeyJobsInProgress(ns, "2", "type1"), "foo")
Expand Down Expand Up @@ -152,6 +163,13 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) {
jobsCount, err = redis.Int(conn.Do("scard", redisKeyWorkerPools(ns)))
assert.NoError(t, err)
assert.Equal(t, 0, jobsCount)

// Stale lock info was cleaned up using reap.curJobTypes
assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, "type1")))
for _, poolID := range []string{"1", "2", "3"} {
v, _ := conn.Do("HGET", redisKeyJobsLockInfo(ns, "type1"), poolID)
assert.Nil(t, v)
}
}

func TestDeadPoolReaperNoJobTypes(t *testing.T) {
Expand Down Expand Up @@ -186,7 +204,7 @@ func TestDeadPoolReaperNoJobTypes(t *testing.T) {
assert.NoError(t, err)

// Test getting dead pool
reaper := newDeadPoolReaper(ns, pool)
reaper := newDeadPoolReaper(ns, pool, []string{})
deadPools, err := reaper.findDeadPools()
assert.NoError(t, err)
assert.Equal(t, map[string][]string{"2": {"type1", "type2"}}, deadPools)
Expand Down Expand Up @@ -261,7 +279,7 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) {

// setup a worker pool and start the reaper, which should restart the stale job above
wp := setupTestWorkerPool(pool, ns, job1, 1, JobOptions{Priority: 1})
wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool)
wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, []string{"job1"})
wp.deadPoolReaper.deadTime = expectedDeadTime
wp.deadPoolReaper.start()
// provide some initialization time
Expand All @@ -273,3 +291,55 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) {
staleHeart.stop()
wp.deadPoolReaper.stop()
}

func TestDeadPoolReaperCleanStaleLocks(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)

conn := pool.Get()
defer conn.Close()
job1, job2 := "type1", "type2"
jobNames := []string{job1, job2}
workerPoolID1, workerPoolID2 := "1", "2"
lock1 := redisKeyJobsLock(ns, job1)
lock2 := redisKeyJobsLock(ns, job2)
lockInfo1 := redisKeyJobsLockInfo(ns, job1)
lockInfo2 := redisKeyJobsLockInfo(ns, job2)

// Create redis data
var err error
err = conn.Send("SET", lock1, 3)
assert.NoError(t, err)
err = conn.Send("SET", lock2, 1)
assert.NoError(t, err)
err = conn.Send("HSET", lockInfo1, workerPoolID1, 1) // workerPoolID1 holds 1 lock on job1
assert.NoError(t, err)
err = conn.Send("HSET", lockInfo1, workerPoolID2, 2) // workerPoolID2 holds 2 locks on job1
assert.NoError(t, err)
err = conn.Send("HSET", lockInfo2, workerPoolID2, 2) // test that we don't go below 0 on job2 lock
assert.NoError(t, err)
err = conn.Flush()
assert.NoError(t, err)

reaper := newDeadPoolReaper(ns, pool, jobNames)
// clean lock info for workerPoolID1
reaper.cleanStaleLockInfo(workerPoolID1, jobNames)
assert.NoError(t, err)
assert.EqualValues(t, 2, getInt64(pool, lock1)) // job1 lock should be decr by 1
assert.EqualValues(t, 1, getInt64(pool, lock2)) // job2 lock is unchanged
v, _ := conn.Do("HGET", lockInfo1, workerPoolID1) // workerPoolID1 removed from job1's lock info
assert.Nil(t, v)

// now clean lock info for workerPoolID2
reaper.cleanStaleLockInfo(workerPoolID2, jobNames)
assert.NoError(t, err)
// both locks should be at 0
assert.EqualValues(t, 0, getInt64(pool, lock1))
assert.EqualValues(t, 0, getInt64(pool, lock2))
// worker pool ID 2 removed from both lock info hashes
v, err = conn.Do("HGET", lockInfo1, workerPoolID2)
assert.Nil(t, v)
v, err = conn.Do("HGET", lockInfo2, workerPoolID2)
assert.Nil(t, v)
}
20 changes: 14 additions & 6 deletions priority_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,23 @@ type sampleItem struct {
priority uint

// payload:
redisJobs string
redisJobsInProg string
redisJobs string
redisJobsInProg string
redisJobsPaused string
redisJobsLock string
redisJobsLockInfo string
redisJobsMaxConcurrency string
}

func (s *prioritySampler) add(priority uint, redisJobs, redisJobsInProg string) {
func (s *prioritySampler) add(priority uint, redisJobs, redisJobsInProg, redisJobsPaused, redisJobsLock, redisJobsLockInfo, redisJobsMaxConcurrency string) {
sample := sampleItem{
priority: priority,
redisJobs: redisJobs,
redisJobsInProg: redisJobsInProg,
priority: priority,
redisJobs: redisJobs,
redisJobsInProg: redisJobsInProg,
redisJobsPaused: redisJobsPaused,
redisJobsLock: redisJobsLock,
redisJobsLockInfo: redisJobsLockInfo,
redisJobsMaxConcurrency: redisJobsMaxConcurrency,
}
s.samples = append(s.samples, sample)
s.sum += priority
Expand Down
15 changes: 11 additions & 4 deletions priority_sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (

func TestPrioritySampler(t *testing.T) {
ps := prioritySampler{}
ps.add(5, "jobs.5", "jobsinprog.5")
ps.add(2, "jobs.2a", "jobsinprog.2a")
ps.add(1, "jobs.1b", "jobsinprog.1b")

ps.add(5, "jobs.5", "jobsinprog.5", "jobspaused.5", "jobslock.5", "jobslockinfo.5", "jobsconcurrency.5")
ps.add(2, "jobs.2a", "jobsinprog.2a", "jobspaused.2a", "jobslock.2a", "jobslockinfo.2a", "jobsconcurrency.2a")
ps.add(1, "jobs.1b", "jobsinprog.1b", "jobspaused.1b", "jobslock.1b", "jobslockinfo.1b", "jobsconcurrency.1b")

var c5 = 0
var c2 = 0
Expand Down Expand Up @@ -41,7 +42,13 @@ func TestPrioritySampler(t *testing.T) {
func BenchmarkPrioritySampler(b *testing.B) {
ps := prioritySampler{}
for i := 0; i < 200; i++ {
ps.add(uint(i)+1, "jobs."+fmt.Sprint(i), "jobsinprog."+fmt.Sprint(i))
ps.add(uint(i)+1,
"jobs."+fmt.Sprint(i),
"jobsinprog."+fmt.Sprint(i),
"jobspaused."+fmt.Sprint(i),
"jobslock."+fmt.Sprint(i),
"jobslockinfo."+fmt.Sprint(i),
"jobsmaxconcurrency."+fmt.Sprint(i))
}

b.ResetTimer()
Expand Down
Loading

0 comments on commit e8fb31a

Please sign in to comment.