diff --git a/redis.go b/redis.go index 934ded0a..9da02018 100644 --- a/redis.go +++ b/redis.go @@ -341,22 +341,3 @@ if redis.call('set', KEYS[2], '1', 'NX', 'EX', '86400') then end return 'dup' ` - -// KEYS[1] = jobs run queue -var redisRemoveStaleKeys = fmt.Sprintf(` --- getLockKey will be inserted below -%s --- getConcurrencyKey will be inserted below -%s - --- TODO: need something more efficient than KEYS cmd -local function isInProgress(jobQueue) - return redis.call('keys', jobQueue .. ':*:inprogress') -end - -local jobQueue = KEYS[1] -if next(isInProgress(jobQueue)) == nil then - redis.call('del', getLockKey(jobQueue)) - redis.call('del', getConcurrencyKey(jobQueue)) -end -return 0`, redisLuaJobsLockedKey, redisLuaJobsConcurrencyKey) diff --git a/worker_pool.go b/worker_pool.go index a0f89069..08e3d652 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -174,8 +174,7 @@ func (wp *WorkerPool) Start() { } wp.started = true - // TODO: need to fix Lua script to remove stale keys - // wp.removeStaleKeys() + // TODO: we should cleanup stale keys on startup from previously registered jobs wp.writeConcurrencyControlsToRedis() go wp.writeKnownJobsToRedis() @@ -281,21 +280,6 @@ func (wp *WorkerPool) writeConcurrencyControlsToRedis() { } } -func (wp *WorkerPool) removeStaleKeys() { - if len(wp.jobTypes) == 0 { - return - } - - conn := wp.pool.Get() - defer conn.Close() - staleKeysScript := redis.NewScript(1, redisRemoveStaleKeys) - for k := range wp.jobTypes { - if _, err := staleKeysScript.Do(conn, redisKeyJobs(wp.namespace, k)); err != nil { - logError("remove_stale_keys", err) - } - } -} - func newJobTypeGeneric(name string, opts JobOptions, handler GenericHandler) *jobType { return &jobType{ Name: name, diff --git a/worker_pool_test.go b/worker_pool_test.go index e4f3d15e..5b95ef11 100644 --- a/worker_pool_test.go +++ b/worker_pool_test.go @@ -189,47 +189,6 @@ func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1))) } -func TestWorkerPoolStartCleansStaleJobLocks(t *testing.T) { - pool := newTestPool(":6379") - ns, job1 := "work", "job1" - wp := setupTestWorkerPool(pool, ns, job1, 1, JobOptions{Priority: 1, MaxConcurrency: 5}) - - conn := pool.Get() - defer conn.Close() - // create a stale lock (no jobs in progress) - _, err := conn.Do("SET", redisKeyJobsLock(ns, job1), "1") - assert.NoError(t, err) - - // make sure stale lock is deleted - wp.removeStaleKeys() - lockKey, err := conn.Do("GET", redisKeyJobsLock(ns, job1)) - assert.NoError(t, err) - assert.Nil(t, lockKey) - wp.Stop() -} - -func TestWorkerPoolStartSkipsInProgressQueueLocks(t *testing.T) { - pool := newTestPool(":6379") - ns, job1 := "work", "job1" - wp := setupTestWorkerPool(pool, ns, job1, 1, JobOptions{Priority: 1, MaxConcurrency: 2}) - - conn := pool.Get() - defer conn.Close() - // create a queue lock - _, err := conn.Do("SET", redisKeyJobsLock(ns, job1), "1") - assert.NoError(t, err) - // set jobs in progress key - _, err = conn.Do("SET", redisKeyJobsInProgress(ns, "1", job1), "1") - assert.NoError(t, err) - - // make sure active queue locks are not deleted - wp.removeStaleKeys() - lockKey, err := conn.Do("GET", redisKeyJobsLock(ns, job1)) - assert.NoError(t, err) - assert.NotNil(t, lockKey) - wp.Stop() -} - // Test Helpers func (t *TestContext) SleepyJob(job *Job) error { sleepTime := time.Duration(job.ArgInt64("sleep"))