-
Notifications
You must be signed in to change notification settings - Fork 343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding support for enqueue unique by specified key #110
Changes from 6 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -230,3 +230,127 @@ func TestEnqueueUniqueIn(t *testing.T) { | |
assert.NoError(t, err) | ||
assert.NotNil(t, job) | ||
} | ||
|
||
func TestEnqueueUniqueByKey(t *testing.T) { | ||
var arg3 string | ||
var arg4 string | ||
|
||
pool := newTestPool(":6379") | ||
ns := "work" | ||
cleanKeyspace(ns, pool) | ||
enqueuer := NewEnqueuer(ns, pool) | ||
var mutex = &sync.Mutex{} | ||
job, err := enqueuer.EnqueueUniqueByKey("wat", Q{"a": 3, "b": "foo"}, Q{"key": "123"}) | ||
assert.NoError(t, err) | ||
if assert.NotNil(t, job) { | ||
assert.Equal(t, "wat", job.Name) | ||
assert.True(t, len(job.ID) > 10) // Something is in it | ||
assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds | ||
assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds | ||
assert.Equal(t, "foo", job.ArgString("b")) | ||
assert.EqualValues(t, 3, job.ArgInt64("a")) | ||
assert.NoError(t, job.ArgError()) | ||
} | ||
|
||
job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 3, "b": "bar"}, Q{"key": "123"}) | ||
assert.NoError(t, err) | ||
assert.Nil(t, job) | ||
|
||
job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 4, "b": "baz"}, Q{"key": "124"}) | ||
assert.NoError(t, err) | ||
assert.NotNil(t, job) | ||
|
||
job, err = enqueuer.EnqueueUniqueByKey("taw", nil, Q{"key": "125"}) | ||
assert.NoError(t, err) | ||
assert.NotNil(t, job) | ||
|
||
// Process the queues. Ensure the right number of jobs were processed | ||
var wats, taws int64 | ||
wp := NewWorkerPool(TestContext{}, 3, ns, pool) | ||
wp.JobWithOptions("wat", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error { | ||
mutex.Lock() | ||
argA := job.Args["a"].(float64) | ||
argB := job.Args["b"].(string) | ||
if argA == 3 { | ||
arg3 = argB | ||
} | ||
if argA == 4 { | ||
arg4 = argB | ||
} | ||
|
||
wats++ | ||
mutex.Unlock() | ||
return nil | ||
}) | ||
wp.JobWithOptions("taw", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error { | ||
mutex.Lock() | ||
taws++ | ||
mutex.Unlock() | ||
return fmt.Errorf("ohno") | ||
}) | ||
wp.Start() | ||
wp.Drain() | ||
wp.Stop() | ||
|
||
assert.EqualValues(t, 2, wats) | ||
assert.EqualValues(t, 1, taws) | ||
|
||
// Chewck that arguments got updated to new value | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo |
||
assert.EqualValues(t, "bar", arg3) | ||
assert.EqualValues(t, "baz", arg4) | ||
|
||
// Enqueue again. Ensure we can. | ||
job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 1, "b": "cool"}, Q{"key": "123"}) | ||
assert.NoError(t, err) | ||
assert.NotNil(t, job) | ||
|
||
job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 1, "b": "coolio"}, Q{"key": "124"}) | ||
assert.NoError(t, err) | ||
assert.NotNil(t, job) | ||
|
||
// Even though taw resulted in an error, we should still be able to re-queue it. | ||
// This could result in multiple taws enqueued at the same time in a production system. | ||
job, err = enqueuer.EnqueueUniqueByKey("taw", nil, Q{"key": "123"}) | ||
assert.NoError(t, err) | ||
assert.NotNil(t, job) | ||
} | ||
|
||
func EnqueueUniqueByKeyIn(t *testing.T) { | ||
pool := newTestPool(":6379") | ||
ns := "work" | ||
cleanKeyspace(ns, pool) | ||
enqueuer := NewEnqueuer(ns, pool) | ||
|
||
// Enqueue two unique jobs -- ensure one job sticks. | ||
job, err := enqueuer.EnqueueUniqueByKeyIn("wat", 300, Q{"a": 1, "b": "cool"}, Q{"key": "123"}) | ||
assert.NoError(t, err) | ||
if assert.NotNil(t, job) { | ||
assert.Equal(t, "wat", job.Name) | ||
assert.True(t, len(job.ID) > 10) // Something is in it | ||
assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds | ||
assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds | ||
assert.Equal(t, "cool", job.ArgString("b")) | ||
assert.EqualValues(t, 1, job.ArgInt64("a")) | ||
assert.NoError(t, job.ArgError()) | ||
assert.EqualValues(t, job.EnqueuedAt+300, job.RunAt) | ||
} | ||
|
||
job, err = enqueuer.EnqueueUniqueByKeyIn("wat", 10, Q{"a": 1, "b": "cool"}, Q{"key": "123"}) | ||
assert.NoError(t, err) | ||
assert.Nil(t, job) | ||
|
||
// Get the job | ||
score, j := jobOnZset(pool, redisKeyScheduled(ns)) | ||
|
||
assert.True(t, score > time.Now().Unix()+290) // We don't want to overwrite the time | ||
assert.True(t, score <= time.Now().Unix()+300) | ||
|
||
assert.Equal(t, "wat", j.Name) | ||
assert.True(t, len(j.ID) > 10) // Something is in it | ||
assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds | ||
assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds | ||
assert.Equal(t, "cool", j.ArgString("b")) | ||
assert.EqualValues(t, 1, j.ArgInt64("a")) | ||
assert.NoError(t, j.ArgError()) | ||
assert.True(t, j.Unique) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -348,22 +348,28 @@ return requeuedCount | |
// KEYS[1] = job queue to push onto | ||
// KEYS[2] = Unique job's key. Test for existence and set if we push. | ||
// ARGV[1] = job | ||
// ARGV[2] = updated job or just a 1 if arguments don't update | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
var redisLuaEnqueueUnique = ` | ||
if redis.call('set', KEYS[2], '1', 'NX', 'EX', '86400') then | ||
if redis.call('set', KEYS[2], ARGV[2], 'NX', 'EX', '86400') then | ||
redis.call('lpush', KEYS[1], ARGV[1]) | ||
return 'ok' | ||
else | ||
redis.call('set', KEYS[2], ARGV[2], 'EX', '86400') | ||
end | ||
return 'dup' | ||
` | ||
|
||
// KEYS[1] = scheduled job queue | ||
// KEYS[2] = Unique job's key. Test for existence and set if we push. | ||
// ARGV[1] = job | ||
// ARGV[2] = epoch seconds for job to be run at | ||
// ARGV[2] = updated job or just a 1 if arguments don't update | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here, I think |
||
// ARGV[3] = epoch seconds for job to be run at | ||
var redisLuaEnqueueUniqueIn = ` | ||
if redis.call('set', KEYS[2], '1', 'NX', 'EX', '86400') then | ||
redis.call('zadd', KEYS[1], ARGV[2], ARGV[1]) | ||
if redis.call('set', KEYS[2], ARGV[2], 'NX', 'EX', '86400') then | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is the TTL not following the |
||
redis.call('zadd', KEYS[1], ARGV[3], ARGV[1]) | ||
return 'ok' | ||
else | ||
redis.call('set', KEYS[2], ARGV[2], 'EX', '86400') | ||
end | ||
return 'dup' | ||
` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 i think
EnqueueUniqueInByKey
is a better fit for the name.