Skip to content
This repository has been archived by the owner on Sep 20, 2022. It is now read-only.

Support env prefix to redis keys #43

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

package jobs

import ()

// configType holds different config variables
type configType struct {
Db databaseConfig
Db databaseConfig
envPrefix string
}

// databaseConfig holds config variables specific to the database
Expand Down Expand Up @@ -38,4 +37,19 @@ var Config = configType{
// "" (an empty string).
Password: "",
},
envPrefix: "",
}

func (c *configType) SetEnvPrefix(p string) {
if p == "" {
return
}
c.envPrefix = p
}
func (c *configType) GetKeyPrefix() string {
var hardCodedPrefix = "jobs:"
if c.envPrefix == "" {
return hardCodedPrefix
}
return c.envPrefix + ":" + hardCodedPrefix
}
7 changes: 4 additions & 3 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ package jobs

import (
"fmt"
"github.com/garyburd/redigo/redis"
"time"

"github.com/garyburd/redigo/redis"
)

// Job represents a discrete piece of work to be done by a worker.
Expand Down Expand Up @@ -117,7 +118,7 @@ func (j *Job) Duration() time.Duration {
// Key returns the key used for the hash in redis which stores all the
// fields for this job.
func (j *Job) Key() string {
return "jobs:" + j.id
return Config.GetKeyPrefix() + j.id
}

// IsRecurring returns true iff the job is recurring
Expand Down Expand Up @@ -176,7 +177,7 @@ func (t *transaction) saveJob(job *Job) {
// add the job id to the time index with a score equal to the job's time field.
// If the job has been destroyed, addJobToTimeIndex will have no effect.
func (t *transaction) addJobToTimeIndex(job *Job) {
t.addJobToSet(job, Keys.JobsTimeIndex, float64(job.time))
t.addJobToSet(job, Keys.JobsTimeIndex.Key(), float64(job.time))
}

// Refresh mutates the job by setting its fields to the most recent data
Expand Down
2 changes: 1 addition & 1 deletion job_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
// key returns the key used for the sorted set in redis which will hold
// all jobs with this status.
func (status Status) Key() string {
return "jobs:" + string(status)
return Config.GetKeyPrefix() + string(status)
}

// Count returns the number of jobs that currently have the given status
Expand Down
6 changes: 3 additions & 3 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (p *Pool) addToPoolSet() error {
p.RLock()
thisId := p.id
p.RUnlock()
if _, err := conn.Do("SADD", Keys.ActivePools, thisId); err != nil {
if _, err := conn.Do("SADD", Keys.ActivePools.Key(), thisId); err != nil {
return err
}
return nil
Expand All @@ -150,7 +150,7 @@ func (p *Pool) removeFromPoolSet() error {
p.RLock()
thisId := p.id
p.RUnlock()
if _, err := conn.Do("SREM", Keys.ActivePools, thisId); err != nil {
if _, err := conn.Do("SREM", Keys.ActivePools.Key(), thisId); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -210,7 +210,7 @@ func (p *Pool) pongKey() string {
func (p *Pool) purgeStalePools() error {
conn := redisPool.Get()
defer conn.Close()
poolIds, err := redis.Strings(conn.Do("SMEMBERS", Keys.ActivePools))
poolIds, err := redis.Strings(conn.Do("SMEMBERS", Keys.ActivePools.Key()))
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ func TestPoolIdSet(t *testing.T) {
if err := pool.Start(); err != nil {
t.Errorf("Unexpected error in pool.Start(): %s", err.Error())
}
expectSetContains(t, Keys.ActivePools, pool.id)
expectSetContains(t, Keys.ActivePools.Key(), pool.id)
pool.Close()
if err := pool.Wait(); err != nil {
t.Errorf("Unexpected error in pool.Wait(): %s", err.Error())
}
expectSetDoesNotContain(t, Keys.ActivePools, pool.id)
expectSetDoesNotContain(t, Keys.ActivePools.Key(), pool.id)
}

// TestGetNextJobs tests the getNextJobs function, which queries the database to find
Expand Down Expand Up @@ -940,7 +940,7 @@ func TestStalePoolsArePurged(t *testing.T) {
}

// At this point, the stale pool should have been fully purged.
expectSetDoesNotContain(t, Keys.ActivePools, oldId)
expectSetDoesNotContain(t, Keys.ActivePools.Key(), oldId)
expectJobFieldEquals(t, job, "poolId", newPool.id, stringConverter)
}

Expand Down
16 changes: 11 additions & 5 deletions redis_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@

package jobs

type redisKey string

// keys stores any constant redis keys. By storing them all here,
// we avoid using string literals which are prone to typos.
var Keys = struct {
// jobsTimeIndex is the key for a sorted set which keeps all outstanding
// jobs sorted by their time field.
JobsTimeIndex string
JobsTimeIndex redisKey
// jobsTemp is the key for a temporary set which is created and then destroyed
// during the process of getting the next jobs in the queue.
JobsTemp string
JobsTemp redisKey
// activePools is the key for a set which holds the pool ids for all active
// pools.
ActivePools string
ActivePools redisKey
}{
JobsTimeIndex: "jobs:time",
JobsTemp: "jobs:temp",
JobsTimeIndex: "time",
JobsTemp: "temp",
ActivePools: "pools:active",
}

func (rk *redisKey) Key() string {
return Config.GetKeyPrefix() + string(*rk)
}
62 changes: 35 additions & 27 deletions scripts.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ var (
local jobId = ARGV[1]
local setName = ARGV[2]
local score = ARGV[3]
local jobKey = 'jobs:' .. jobId
local prefix = ARGV[4]
local jobKey = prefix .. jobId
-- Make sure the job hasn't already been destroyed
local exists = redis.call('EXISTS', jobKey)
if exists ~= 1 then
Expand All @@ -54,15 +55,16 @@ redis.call('ZADD', setName, score, jobId)`)

-- Assign args to variables for easy reference
local jobId = ARGV[1]
local jobKey = 'jobs:' .. jobId
local prefix = ARGV[2]
local jobKey = prefix .. jobId
-- Remove the job from the status set
local status = redis.call('HGET', jobKey, 'status')
if status ~= '' then
local statusSet = 'jobs:' .. status
local statusSet = prefix .. status
redis.call('ZREM', statusSet, jobId)
end
-- Remove the job from the time index
redis.call('ZREM', 'jobs:time', jobId)
redis.call('ZREM', prefix .. 'time', jobId)
-- Remove the main hash for the job
redis.call('DEL', jobKey)`)
getJobsByIdsScript = redis.NewScript(0, `-- Copyright 2015 Alex Browne. All rights reserved.
Expand Down Expand Up @@ -107,13 +109,14 @@ redis.call('DEL', jobKey)`)

-- Assign keys to variables for easy access
local setKey = ARGV[1]
local prefix = ARGV[2]
-- Get all the ids from the set name
local jobIds = redis.call('ZREVRANGE', setKey, 0, -1)
local allJobs = {}
if #jobIds > 0 then
-- Iterate over the ids and find each job
for i, jobId in ipairs(jobIds) do
local jobKey = 'jobs:' .. jobId
local jobKey = prefix .. jobId
local jobFields = redis.call('HGETALL', jobKey)
-- Add the id itself to the fields
jobFields[#jobFields+1] = 'id'
Expand Down Expand Up @@ -169,30 +172,31 @@ return allJobs`)
local n = ARGV[1]
local currentTime = ARGV[2]
local poolId = ARGV[3]
local prefix = ARGV[4]
-- Copy the time index set to a new temporary set
redis.call('ZUNIONSTORE', 'jobs:temp', 1, 'jobs:time')
redis.call('ZUNIONSTORE', prefix .. 'temp', 1, prefix .. 'time')
-- Trim the new temporary set we just created to leave only the jobs which have a time
-- parameter in the past
redis.call('ZREMRANGEBYSCORE', 'jobs:temp', currentTime, '+inf')
redis.call('ZREMRANGEBYSCORE', prefix .. 'temp', currentTime, '+inf')
-- Intersect the jobs which are ready based on their time with those in the
-- queued set. Use the weights parameter to set the scores entirely based on the
-- queued set, effectively sorting the jobs by priority. Store the results in the
-- temporary set.
redis.call('ZINTERSTORE', 'jobs:temp', 2, 'jobs:queued', 'jobs:temp', 'WEIGHTS', 1, 0)
redis.call('ZINTERSTORE', prefix .. 'temp', 2, prefix .. 'queued', prefix .. 'temp', 'WEIGHTS', 1, 0)
-- Trim the temp set, so it contains only the first n jobs ordered by
-- priority
redis.call('ZREMRANGEBYRANK', 'jobs:temp', 0, -n - 1)
redis.call('ZREMRANGEBYRANK', prefix .. 'temp', 0, -n - 1)
-- Get all job ids from the temp set
local jobIds = redis.call('ZREVRANGE', 'jobs:temp', 0, -1)
local jobIds = redis.call('ZREVRANGE', prefix .. 'temp', 0, -1)
local allJobs = {}
if #jobIds > 0 then
-- Add job ids to the executing set
redis.call('ZUNIONSTORE', 'jobs:executing', 2, 'jobs:executing', 'jobs:temp')
redis.call('ZUNIONSTORE', prefix .. 'executing', 2, prefix .. 'executing', prefix .. 'temp')
-- Now we are ready to construct our response.
for i, jobId in ipairs(jobIds) do
local jobKey = 'jobs:' .. jobId
local jobKey = prefix .. jobId
-- Remove the job from the queued set
redis.call('ZREM', 'jobs:queued', jobId)
redis.call('ZREM', prefix .. 'queued', jobId)
-- Set the poolId field for the job
redis.call('HSET', jobKey, 'poolId', poolId)
-- Set the job status to executing
Expand All @@ -207,7 +211,7 @@ if #jobIds > 0 then
end
end
-- Delete the temporary set
redis.call('DEL', 'jobs:temp')
redis.call('DEL', prefix .. 'temp')
-- Return all the fields for all the jobs
return allJobs`)
purgeStalePoolScript = redis.NewScript(0, `-- Copyright 2015 Alex Browne. All rights reserved.
Expand All @@ -227,25 +231,26 @@ return allJobs`)

-- Assign args to variables for easy reference
local stalePoolId = ARGV[1]
local prefix = ARGV[2]
-- Check if the stale pool is in the set of active pools first
local isActive = redis.call('SISMEMBER', 'pools:active', stalePoolId)
local isActive = redis.call('SISMEMBER', prefix .. 'pools:active', stalePoolId)
if isActive then
-- Remove the stale pool from the set of active pools
redis.call('SREM', 'pools:active', stalePoolId)
redis.call('SREM', prefix .. 'pools:active', stalePoolId)
-- Get all the jobs in the executing set
local jobIds = redis.call('ZRANGE', 'jobs:executing', 0, -1)
local jobIds = redis.call('ZRANGE', prefix .. 'executing', 0, -1)
for i, jobId in ipairs(jobIds) do
local jobKey = 'jobs:' .. jobId
local jobKey = prefix .. jobId
-- Check the poolId field
-- If the poolId is equal to the stale id, then this job is stuck
-- in the executing set even though no worker is actually executing it
local poolId = redis.call('HGET', jobKey, 'poolId')
if poolId == stalePoolId then
local jobPriority = redis.call('HGET', jobKey, 'priority')
-- Move the job into the queued set
redis.call('ZADD', 'jobs:queued', jobPriority, jobId)
redis.call('ZADD', prefix .. 'queued', jobPriority, jobId)
-- Remove the job from the executing set
redis.call('ZREM', 'jobs:executing', jobId)
redis.call('ZREM', prefix .. 'executing', jobId)
-- Set the job status to queued and the pool id to blank
redis.call('HMSET', jobKey, 'status', 'queued', 'poolId', '')
end
Expand Down Expand Up @@ -273,7 +278,8 @@ end

-- Assign args to variables for easy reference
local jobId = ARGV[1]
local jobKey = 'jobs:' .. jobId
local prefix = ARGV[2]
local jobKey = prefix .. jobId
-- Make sure the job hasn't already been destroyed
local exists = redis.call('EXISTS', jobKey)
if exists ~= 1 then
Expand All @@ -294,12 +300,12 @@ end
-- Get the job priority (used as score)
local jobPriority = redis.call('HGET', jobKey, 'priority')
-- Add the job to the appropriate new set
local newStatusSet = 'jobs:' .. newStatus
local newStatusSet = prefix .. newStatus
redis.call('ZADD', newStatusSet, jobPriority, jobId)
-- Remove the job from the old status set
local oldStatus = redis.call('HGET', jobKey, 'status')
if ((oldStatus ~= '') and (oldStatus ~= newStatus)) then
local oldStatusSet = 'jobs:' .. oldStatus
local oldStatusSet = prefix .. oldStatus
redis.call('ZREM', oldStatusSet, jobId)
end
-- Set the job status in the hash
Expand Down Expand Up @@ -330,7 +336,8 @@ end`)
local jobId = ARGV[1]
local fieldName = ARGV[2]
local fieldVal = ARGV[3]
local jobKey = 'jobs:' .. jobId
local prefix = ARGV[4]
local jobKey = prefix .. jobId
-- Make sure the job hasn't already been destroyed
local exists = redis.call('EXISTS', jobKey)
if exists ~= 1 then
Expand All @@ -354,20 +361,21 @@ redis.call('HSET', jobKey, fieldName, fieldVal)`)
-- Assign args to variables for easy reference
local jobId = ARGV[1]
local newStatus = ARGV[2]
local jobKey = 'jobs:' .. jobId
local prefix = ARGV[3]
local jobKey = prefix .. jobId
-- Make sure the job hasn't already been destroyed
local exists = redis.call('EXISTS', jobKey)
if exists ~= 1 then
return
end
local newStatusSet = 'jobs:' .. newStatus
local newStatusSet = prefix .. newStatus
-- Add the job to the new status set
local jobPriority = redis.call('HGET', jobKey, 'priority')
redis.call('ZADD', newStatusSet, jobPriority, jobId)
-- Remove the job from the old status set
local oldStatus = redis.call('HGET', jobKey, 'status')
if ((oldStatus ~= '') and (oldStatus ~= newStatus)) then
local oldStatusSet = 'jobs:' .. oldStatus
local oldStatusSet = prefix .. oldStatus
redis.call('ZREM', oldStatusSet, jobId)
end
-- Set the status field
Expand Down
3 changes: 2 additions & 1 deletion scripts/add_job_to_set.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
local jobId = ARGV[1]
local setName = ARGV[2]
local score = ARGV[3]
local jobKey = 'jobs:' .. jobId
local prefix = ARGV[4]
local jobKey = prefix .. jobId
-- Make sure the job hasn't already been destroyed
local exists = redis.call('EXISTS', jobKey)
if exists ~= 1 then
Expand Down
7 changes: 4 additions & 3 deletions scripts/destroy_job.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@

-- Assign args to variables for easy reference
local jobId = ARGV[1]
local jobKey = 'jobs:' .. jobId
local prefix = ARGV[2]
local jobKey = prefix .. jobId
-- Remove the job from the status set
local status = redis.call('HGET', jobKey, 'status')
if status ~= '' then
local statusSet = 'jobs:' .. status
local statusSet = prefix .. status
redis.call('ZREM', statusSet, jobId)
end
-- Remove the job from the time index
redis.call('ZREM', '{{.timeIndexSet}}', jobId)
redis.call('ZREM', prefix .. '{{.timeIndexSet}}', jobId)
-- Remove the main hash for the job
redis.call('DEL', jobKey)
Loading