Skip to content
This repository has been archived by the owner on Sep 20, 2022. It is now read-only.

Commit

Permalink
passing tests
Browse files Browse the repository at this point in the history
(cherry picked from commit 2ceff7e)
  • Loading branch information
yadvendar committed Mar 14, 2017
1 parent 18a51ad commit 0eba5d1
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 25 deletions.
3 changes: 0 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ func (c *configType) SetEnvPrefix(p string) {
return
}
c.envPrefix = p
Keys.JobsTimeIndex = c.GetKeyPrefix() + Keys.JobsTimeIndex
Keys.JobsTemp = c.GetKeyPrefix() + Keys.JobsTemp
Keys.ActivePools = c.GetKeyPrefix() + Keys.ActivePools
}
func (c *configType) GetKeyPrefix() string {
var hardCodedPrefix = "jobs:"
Expand Down
2 changes: 1 addition & 1 deletion job.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (t *transaction) saveJob(job *Job) {
// add the job id to the time index with a score equal to the job's time field.
// If the job has been destroyed, addJobToTimeIndex will have no effect.
func (t *transaction) addJobToTimeIndex(job *Job) {
t.addJobToSet(job, Keys.JobsTimeIndex, float64(job.time))
t.addJobToSet(job, Keys.JobsTimeIndex.Key(), float64(job.time))
}

// Refresh mutates the job by setting its fields to the most recent data
Expand Down
6 changes: 3 additions & 3 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (p *Pool) addToPoolSet() error {
p.RLock()
thisId := p.id
p.RUnlock()
if _, err := conn.Do("SADD", Keys.ActivePools, thisId); err != nil {
if _, err := conn.Do("SADD", Keys.ActivePools.Key(), thisId); err != nil {
return err
}
return nil
Expand All @@ -150,7 +150,7 @@ func (p *Pool) removeFromPoolSet() error {
p.RLock()
thisId := p.id
p.RUnlock()
if _, err := conn.Do("SREM", Keys.ActivePools, thisId); err != nil {
if _, err := conn.Do("SREM", Keys.ActivePools.Key(), thisId); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -210,7 +210,7 @@ func (p *Pool) pongKey() string {
func (p *Pool) purgeStalePools() error {
conn := redisPool.Get()
defer conn.Close()
poolIds, err := redis.Strings(conn.Do("SMEMBERS", Keys.ActivePools))
poolIds, err := redis.Strings(conn.Do("SMEMBERS", Keys.ActivePools.Key()))
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ func TestPoolIdSet(t *testing.T) {
if err := pool.Start(); err != nil {
t.Errorf("Unexpected error in pool.Start(): %s", err.Error())
}
expectSetContains(t, Keys.ActivePools, pool.id)
expectSetContains(t, Keys.ActivePools.Key(), pool.id)
pool.Close()
if err := pool.Wait(); err != nil {
t.Errorf("Unexpected error in pool.Wait(): %s", err.Error())
}
expectSetDoesNotContain(t, Keys.ActivePools, pool.id)
expectSetDoesNotContain(t, Keys.ActivePools.Key(), pool.id)
}

// TestGetNextJobs tests the getNextJobs function, which queries the database to find
Expand Down Expand Up @@ -940,7 +940,7 @@ func TestStalePoolsArePurged(t *testing.T) {
}

// At this point, the stale pool should have been fully purged.
expectSetDoesNotContain(t, Keys.ActivePools, oldId)
expectSetDoesNotContain(t, Keys.ActivePools.Key(), oldId)
expectJobFieldEquals(t, job, "poolId", newPool.id, stringConverter)
}

Expand Down
12 changes: 9 additions & 3 deletions redis_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@

package jobs

type redisKey string

// keys stores any constant redis keys. By storing them all here,
// we avoid using string literals which are prone to typos.
var Keys = struct {
// jobsTimeIndex is the key for a sorted set which keeps all outstanding
// jobs sorted by their time field.
JobsTimeIndex string
JobsTimeIndex redisKey
// jobsTemp is the key for a temporary set which is created and then destroyed
// during the process of getting the next jobs in the queue.
JobsTemp string
JobsTemp redisKey
// activePools is the key for a set which holds the pool ids for all active
// pools.
ActivePools string
ActivePools redisKey
}{
JobsTimeIndex: "time",
JobsTemp: "temp",
ActivePools: "pools:active",
}

func (rk *redisKey) Key() string {
return Config.GetKeyPrefix() + string(*rk)
}
17 changes: 9 additions & 8 deletions scripts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
package jobs

import (
"github.com/garyburd/redigo/redis"
"reflect"
"testing"
"time"

"github.com/garyburd/redigo/redis"
)

func TestPopNextJobsScript(t *testing.T) {
Expand All @@ -21,7 +22,7 @@ func TestPopNextJobsScript(t *testing.T) {
// Set up the database
tx0 := newTransaction()
// One set will mimic the ready and sorted jobs
tx0.command("ZADD", redis.Args{Keys.JobsTimeIndex, pastTime, "two", pastTime, "four"}, nil)
tx0.command("ZADD", redis.Args{Keys.JobsTimeIndex.Key(), pastTime, "two", pastTime, "four"}, nil)
// One set will mimic the queued set
tx0.command("ZADD", redis.Args{StatusQueued.Key(), 1, "one", 2, "two", 3, "three", 4, "four"}, nil)
// One set will mimic the executing set
Expand Down Expand Up @@ -67,7 +68,7 @@ func TestPopNextJobsScript(t *testing.T) {
if !reflect.DeepEqual(expectedQueued, gotQueued) {
t.Errorf("Ids in the queued set were incorrect.\n\tExpected: %v\n\tBut got: %v", expectedQueued, gotQueued)
}
expectKeyNotExists(t, Keys.JobsTemp)
expectKeyNotExists(t, Keys.JobsTemp.Key())
}

func TestRetryOrFailJobScript(t *testing.T) {
Expand Down Expand Up @@ -218,7 +219,7 @@ func TestPurgeStalePoolScript(t *testing.T) {
// Add both pools to the set of active pools
conn := redisPool.Get()
defer conn.Close()
if _, err := conn.Do("SADD", Keys.ActivePools, stalePoolId, activePoolId); err != nil {
if _, err := conn.Do("SADD", Keys.ActivePools.Key(), stalePoolId, activePoolId); err != nil {
t.Errorf("Unexpected error adding pools to set: %s", err)
}

Expand All @@ -231,8 +232,8 @@ func TestPurgeStalePoolScript(t *testing.T) {

// Check the result
// The active pools set should contain only the activePoolId
expectSetDoesNotContain(t, Keys.ActivePools, stalePoolId)
expectSetContains(t, Keys.ActivePools, activePoolId)
expectSetDoesNotContain(t, Keys.ActivePools.Key(), stalePoolId)
expectSetContains(t, Keys.ActivePools.Key(), activePoolId)
// All the active jobs should still be executing
for _, job := range activeJobs {
if err := job.Refresh(); err != nil {
Expand Down Expand Up @@ -338,15 +339,15 @@ func TestAddJobToSetScript(t *testing.T) {
// Add the job to the time index with a score of 7 days ago
tx := newTransaction()
expectedScore := float64(time.Now().Add(-7 * 24 * time.Hour).UTC().UnixNano())
tx.addJobToSet(job, Keys.JobsTimeIndex, expectedScore)
tx.addJobToSet(job, Keys.JobsTimeIndex.Key(), expectedScore)
if err := tx.exec(); err != nil {
t.Errorf("Unexpected err in tx.exec(): %s", err.Error())
}

// Make sure the job was added to the set properly
conn := redisPool.Get()
defer conn.Close()
score, err := redis.Float64(conn.Do("ZSCORE", Keys.JobsTimeIndex, job.id))
score, err := redis.Float64(conn.Do("ZSCORE", Keys.JobsTimeIndex.Key(), job.id))
if err != nil {
t.Errorf("Unexpected error in ZSCORE: %s", err.Error())
}
Expand Down
8 changes: 4 additions & 4 deletions test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func expectJobInStatusSet(t *testing.T, j *Job, status Status) {
func expectJobInTimeIndex(t *testing.T, j *Job) {
conn := redisPool.Get()
defer conn.Close()
gotIds, err := redis.Strings(conn.Do("ZRANGEBYSCORE", Keys.JobsTimeIndex, j.time, j.time))
gotIds, err := redis.Strings(conn.Do("ZRANGEBYSCORE", Keys.JobsTimeIndex.Key(), j.time, j.time))
if err != nil {
t.Errorf("Unexpected error: %s", err.Error())
}
Expand All @@ -237,7 +237,7 @@ func expectJobInTimeIndex(t *testing.T, j *Job) {
}
}
// If we reached here, we did not find the job we were looking for
t.Errorf("job:%s was not found in set %s", j.id, Keys.JobsTimeIndex)
t.Errorf("job:%s was not found in set %s", j.id, Keys.JobsTimeIndex.Key())
}

// expectJobNotInStatusSet sets an error via t.Errorf if job is in the status set
Expand All @@ -262,14 +262,14 @@ func expectJobNotInStatusSet(t *testing.T, j *Job, status Status) {
func expectJobNotInTimeIndex(t *testing.T, j *Job) {
conn := redisPool.Get()
defer conn.Close()
gotIds, err := redis.Strings(conn.Do("ZRANGEBYSCORE", Keys.JobsTimeIndex, j.time, j.time))
gotIds, err := redis.Strings(conn.Do("ZRANGEBYSCORE", Keys.JobsTimeIndex.Key(), j.time, j.time))
if err != nil {
t.Errorf("Unexpected error: %s", err.Error())
}
for _, id := range gotIds {
if id == j.id {
// We found the job, but it wasn't supposed to be here!
t.Errorf("job:%s was found in set %s but expected it to be removed", j.id, Keys.JobsTimeIndex)
t.Errorf("job:%s was found in set %s but expected it to be removed", j.id, Keys.JobsTimeIndex.Key())
}
}
}
Expand Down

0 comments on commit 0eba5d1

Please sign in to comment.