Skip to content

Commit

Permalink
Remove stale lock cleanup code -- TODO for another day
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaun Dunning committed Apr 4, 2017
1 parent bccd9e7 commit 4fef315
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 77 deletions.
19 changes: 0 additions & 19 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,22 +341,3 @@ if redis.call('set', KEYS[2], '1', 'NX', 'EX', '86400') then
end
return 'dup'
`

// KEYS[1] = jobs run queue
var redisRemoveStaleKeys = fmt.Sprintf(`
-- getLockKey will be inserted below
%s
-- getConcurrencyKey will be inserted below
%s
-- TODO: need something more efficient than KEYS cmd
local function isInProgress(jobQueue)
return redis.call('keys', jobQueue .. ':*:inprogress')
end
local jobQueue = KEYS[1]
if next(isInProgress(jobQueue)) == nil then
redis.call('del', getLockKey(jobQueue))
redis.call('del', getConcurrencyKey(jobQueue))
end
return 0`, redisLuaJobsLockedKey, redisLuaJobsConcurrencyKey)
18 changes: 1 addition & 17 deletions worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ func (wp *WorkerPool) Start() {
}
wp.started = true

// TODO: need to fix Lua script to remove stale keys
// wp.removeStaleKeys()
// TODO: we should cleanup stale keys on startup from previously registered jobs
wp.writeConcurrencyControlsToRedis()
go wp.writeKnownJobsToRedis()

Expand Down Expand Up @@ -281,21 +280,6 @@ func (wp *WorkerPool) writeConcurrencyControlsToRedis() {
}
}

func (wp *WorkerPool) removeStaleKeys() {
if len(wp.jobTypes) == 0 {
return
}

conn := wp.pool.Get()
defer conn.Close()
staleKeysScript := redis.NewScript(1, redisRemoveStaleKeys)
for k := range wp.jobTypes {
if _, err := staleKeysScript.Do(conn, redisKeyJobs(wp.namespace, k)); err != nil {
logError("remove_stale_keys", err)
}
}
}

func newJobTypeGeneric(name string, opts JobOptions, handler GenericHandler) *jobType {
return &jobType{
Name: name,
Expand Down
41 changes: 0 additions & 41 deletions worker_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,47 +189,6 @@ func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) {
assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1)))
}

func TestWorkerPoolStartCleansStaleJobLocks(t *testing.T) {
pool := newTestPool(":6379")
ns, job1 := "work", "job1"
wp := setupTestWorkerPool(pool, ns, job1, 1, JobOptions{Priority: 1, MaxConcurrency: 5})

conn := pool.Get()
defer conn.Close()
// create a stale lock (no jobs in progress)
_, err := conn.Do("SET", redisKeyJobsLock(ns, job1), "1")
assert.NoError(t, err)

// make sure stale lock is deleted
wp.removeStaleKeys()
lockKey, err := conn.Do("GET", redisKeyJobsLock(ns, job1))
assert.NoError(t, err)
assert.Nil(t, lockKey)
wp.Stop()
}

func TestWorkerPoolStartSkipsInProgressQueueLocks(t *testing.T) {
pool := newTestPool(":6379")
ns, job1 := "work", "job1"
wp := setupTestWorkerPool(pool, ns, job1, 1, JobOptions{Priority: 1, MaxConcurrency: 2})

conn := pool.Get()
defer conn.Close()
// create a queue lock
_, err := conn.Do("SET", redisKeyJobsLock(ns, job1), "1")
assert.NoError(t, err)
// set jobs in progress key
_, err = conn.Do("SET", redisKeyJobsInProgress(ns, "1", job1), "1")
assert.NoError(t, err)

// make sure active queue locks are not deleted
wp.removeStaleKeys()
lockKey, err := conn.Do("GET", redisKeyJobsLock(ns, job1))
assert.NoError(t, err)
assert.NotNil(t, lockKey)
wp.Stop()
}

// Test Helpers
func (t *TestContext) SleepyJob(job *Job) error {
sleepTime := time.Duration(job.ArgInt64("sleep"))
Expand Down

0 comments on commit 4fef315

Please sign in to comment.