Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for enqueue unique by specified key #110

Merged
merged 7 commits into from
Sep 12, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ job, err = enqueuer.EnqueueUnique("clear_cache", work.Q{"object_id_": "123"}) //
job, err = enqueuer.EnqueueUniqueIn("clear_cache", 300, work.Q{"object_id_": "789"}) // job != nil (diff id)
```

Alternatively, you can provide your own key for making a job unique. When another job is enqueued with the same key as a job already in the queue, it will simply update the arguments.
```go
enqueuer := work.NewEnqueuer("my_app_namespace", redisPool)
job, err := enqueuer.EnqueueUniqueByKey("clear_cache", work.Q{"object_id_": "123"}, map[string]interface{}{"my_key": "586"})
job, err = enqueuer.EnqueueUniqueByKeyIn("clear_cache", 300, work.Q{"object_id_": "789"}, map[string]interface{}{"my_key": "586"})
```

### Periodic Enqueueing (Cron)

You can periodically enqueue jobs on your gocraft/work cluster using your worker pool. The [scheduling specification](https://godoc.org/github.com/robfig/cron#hdr-CRON_Expression_Format) uses a Cron syntax where the fields represent seconds, minutes, hours, day of the month, month, and week of the day, respectively. Even if you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once.
Expand Down
140 changes: 85 additions & 55 deletions enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,83 +105,48 @@ func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[stri
// In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs.
// EnqueueUnique returns the job if it was enqueued and nil if it wasn't
func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (*Job, error) {
uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, args)
if err != nil {
return nil, err
}
return e.EnqueueUniqueByKey(jobName, args, nil)
}

job := &Job{
Name: jobName,
ID: makeIdentifier(),
EnqueuedAt: nowEpochSeconds(),
Args: args,
Unique: true,
}
// EnqueueUniqueIn enqueues a unique job in the scheduled job queue for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs.
func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) {
return e.EnqueueUniqueByKeyIn(jobName, secondsFromNow, args, nil)
}

rawJSON, err := job.serialize()
// EnqueueUniqueByKey enqueues a job unless a job is already enqueued with the same name and key, updating arguments.
// The already-enqueued job can be in the normal work queue or in the scheduled job queue.
// Once a worker begins processing a job, another job with the same name and key can be enqueued again.
// Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once.
// In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs.
// EnqueueUniqueByKey returns the job if it was enqueued and nil if it wasn't
func (e *Enqueuer) EnqueueUniqueByKey(jobName string, args map[string]interface{}, keyMap map[string]interface{}) (*Job, error) {
enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap)
if err != nil {
return nil, err
}

conn := e.Pool.Get()
defer conn.Close()

if err := e.addToKnownJobs(conn, jobName); err != nil {
return nil, err
}

scriptArgs := make([]interface{}, 0, 3)
scriptArgs = append(scriptArgs, e.queuePrefix+jobName) // KEY[1]
scriptArgs = append(scriptArgs, uniqueKey) // KEY[2]
scriptArgs = append(scriptArgs, rawJSON) // ARGV[1]
res, err := enqueue(nil)

res, err := redis.String(e.enqueueUniqueScript.Do(conn, scriptArgs...))
if res == "ok" && err == nil {
return job, nil
}
return nil, err
}

// EnqueueUniqueIn enqueues a unique job in the scheduled job queue for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs.
func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) {
uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, args)
if err != nil {
return nil, err
}

job := &Job{
Name: jobName,
ID: makeIdentifier(),
EnqueuedAt: nowEpochSeconds(),
Args: args,
Unique: true,
}

rawJSON, err := job.serialize()
// EnqueueUniqueByKeyIn enqueues a job in the scheduled job queue that is unique on specified key for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs.
// Subsequent calls with same key will update arguments
func (e *Enqueuer) EnqueueUniqueByKeyIn(jobName string, secondsFromNow int64, args map[string]interface{}, keyMap map[string]interface{}) (*ScheduledJob, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 i think EnqueueUniqueInByKey is a better fit for the name.

enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap)
if err != nil {
return nil, err
}

conn := e.Pool.Get()
defer conn.Close()

if err := e.addToKnownJobs(conn, jobName); err != nil {
return nil, err
}

scheduledJob := &ScheduledJob{
RunAt: nowEpochSeconds() + secondsFromNow,
Job: job,
}

scriptArgs := make([]interface{}, 0, 4)
scriptArgs = append(scriptArgs, redisKeyScheduled(e.Namespace)) // KEY[1]
scriptArgs = append(scriptArgs, uniqueKey) // KEY[2]
scriptArgs = append(scriptArgs, rawJSON) // ARGV[1]
scriptArgs = append(scriptArgs, scheduledJob.RunAt) // ARGV[2]

res, err := redis.String(e.enqueueUniqueInScript.Do(conn, scriptArgs...))

res, err := enqueue(&scheduledJob.RunAt)
if res == "ok" && err == nil {
return scheduledJob, nil
}
Expand Down Expand Up @@ -213,3 +178,68 @@ func (e *Enqueuer) addToKnownJobs(conn redis.Conn, jobName string) error {

return nil
}

type enqueueFnType func(*int64) (string, error)

func (e *Enqueuer) uniqueJobHelper(jobName string, args map[string]interface{}, keyMap map[string]interface{}) (enqueueFnType, *Job, error) {
useDefaultKeys := false
if keyMap == nil {
useDefaultKeys = true
keyMap = args
}

uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, keyMap)
if err != nil {
return nil, nil, err
}

job := &Job{
Name: jobName,
ID: makeIdentifier(),
EnqueuedAt: nowEpochSeconds(),
Args: args,
Unique: true,
UniqueKey: uniqueKey,
}

rawJSON, err := job.serialize()
if err != nil {
return nil, nil, err
}

enqueueFn := func(runAt *int64) (string, error) {
conn := e.Pool.Get()
defer conn.Close()

if err := e.addToKnownJobs(conn, jobName); err != nil {
return "", err
}

scriptArgs := []interface{}{}
script := e.enqueueUniqueScript

scriptArgs = append(scriptArgs, e.queuePrefix+jobName) // KEY[1]
scriptArgs = append(scriptArgs, uniqueKey) // KEY[2]
scriptArgs = append(scriptArgs, rawJSON) // ARGV[1]
if useDefaultKeys {
// keying on arguments so arguments can't be updated
// we'll just get them off the original job so to save space, make this "1"
scriptArgs = append(scriptArgs, "1") // ARGV[2]
} else {
// we'll use this for updated arguments since the job on the queue
// doesn't get updated
scriptArgs = append(scriptArgs, rawJSON) // ARGV[2]
}

if runAt != nil { // Scheduled job so different job queue with additional arg
scriptArgs[0] = redisKeyScheduled(e.Namespace) // KEY[1]
scriptArgs = append(scriptArgs, *runAt) // ARGV[3]

script = e.enqueueUniqueInScript
}

return redis.String(script.Do(conn, scriptArgs...))
}

return enqueueFn, job, nil
}
124 changes: 124 additions & 0 deletions enqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,127 @@ func TestEnqueueUniqueIn(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, job)
}

func TestEnqueueUniqueByKey(t *testing.T) {
var arg3 string
var arg4 string

pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)
enqueuer := NewEnqueuer(ns, pool)
var mutex = &sync.Mutex{}
job, err := enqueuer.EnqueueUniqueByKey("wat", Q{"a": 3, "b": "foo"}, Q{"key": "123"})
assert.NoError(t, err)
if assert.NotNil(t, job) {
assert.Equal(t, "wat", job.Name)
assert.True(t, len(job.ID) > 10) // Something is in it
assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds
assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds
assert.Equal(t, "foo", job.ArgString("b"))
assert.EqualValues(t, 3, job.ArgInt64("a"))
assert.NoError(t, job.ArgError())
}

job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 3, "b": "bar"}, Q{"key": "123"})
assert.NoError(t, err)
assert.Nil(t, job)

job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 4, "b": "baz"}, Q{"key": "124"})
assert.NoError(t, err)
assert.NotNil(t, job)

job, err = enqueuer.EnqueueUniqueByKey("taw", nil, Q{"key": "125"})
assert.NoError(t, err)
assert.NotNil(t, job)

// Process the queues. Ensure the right number of jobs were processed
var wats, taws int64
wp := NewWorkerPool(TestContext{}, 3, ns, pool)
wp.JobWithOptions("wat", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error {
mutex.Lock()
argA := job.Args["a"].(float64)
argB := job.Args["b"].(string)
if argA == 3 {
arg3 = argB
}
if argA == 4 {
arg4 = argB
}

wats++
mutex.Unlock()
return nil
})
wp.JobWithOptions("taw", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error {
mutex.Lock()
taws++
mutex.Unlock()
return fmt.Errorf("ohno")
})
wp.Start()
wp.Drain()
wp.Stop()

assert.EqualValues(t, 2, wats)
assert.EqualValues(t, 1, taws)

// Chewck that arguments got updated to new value
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

assert.EqualValues(t, "bar", arg3)
assert.EqualValues(t, "baz", arg4)

// Enqueue again. Ensure we can.
job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 1, "b": "cool"}, Q{"key": "123"})
assert.NoError(t, err)
assert.NotNil(t, job)

job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 1, "b": "coolio"}, Q{"key": "124"})
assert.NoError(t, err)
assert.NotNil(t, job)

// Even though taw resulted in an error, we should still be able to re-queue it.
// This could result in multiple taws enqueued at the same time in a production system.
job, err = enqueuer.EnqueueUniqueByKey("taw", nil, Q{"key": "123"})
assert.NoError(t, err)
assert.NotNil(t, job)
}

func EnqueueUniqueByKeyIn(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)
enqueuer := NewEnqueuer(ns, pool)

// Enqueue two unique jobs -- ensure one job sticks.
job, err := enqueuer.EnqueueUniqueByKeyIn("wat", 300, Q{"a": 1, "b": "cool"}, Q{"key": "123"})
assert.NoError(t, err)
if assert.NotNil(t, job) {
assert.Equal(t, "wat", job.Name)
assert.True(t, len(job.ID) > 10) // Something is in it
assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds
assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds
assert.Equal(t, "cool", job.ArgString("b"))
assert.EqualValues(t, 1, job.ArgInt64("a"))
assert.NoError(t, job.ArgError())
assert.EqualValues(t, job.EnqueuedAt+300, job.RunAt)
}

job, err = enqueuer.EnqueueUniqueByKeyIn("wat", 10, Q{"a": 1, "b": "cool"}, Q{"key": "123"})
assert.NoError(t, err)
assert.Nil(t, job)

// Get the job
score, j := jobOnZset(pool, redisKeyScheduled(ns))

assert.True(t, score > time.Now().Unix()+290) // We don't want to overwrite the time
assert.True(t, score <= time.Now().Unix()+300)

assert.Equal(t, "wat", j.Name)
assert.True(t, len(j.ID) > 10) // Something is in it
assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds
assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds
assert.Equal(t, "cool", j.ArgString("b"))
assert.EqualValues(t, 1, j.ArgInt64("a"))
assert.NoError(t, j.ArgError())
assert.True(t, j.Unique)
}
1 change: 1 addition & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Job struct {
EnqueuedAt int64 `json:"t"`
Args map[string]interface{} `json:"args"`
Unique bool `json:"unique,omitempty"`
UniqueKey string `json:"unique_key,omitempty"`

// Inputs when retrying
Fails int64 `json:"fails,omitempty"` // number of times this job has failed
Expand Down
14 changes: 10 additions & 4 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,22 +348,28 @@ return requeuedCount
// KEYS[1] = job queue to push onto
// KEYS[2] = Unique job's key. Test for existence and set if we push.
// ARGV[1] = job
// ARGV[2] = updated job or just a 1 if arguments don't update
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ARGV[2] is updated job args, right?

var redisLuaEnqueueUnique = `
if redis.call('set', KEYS[2], '1', 'NX', 'EX', '86400') then
if redis.call('set', KEYS[2], ARGV[2], 'NX', 'EX', '86400') then
redis.call('lpush', KEYS[1], ARGV[1])
return 'ok'
else
redis.call('set', KEYS[2], ARGV[2], 'EX', '86400')
end
return 'dup'
`

// KEYS[1] = scheduled job queue
// KEYS[2] = Unique job's key. Test for existence and set if we push.
// ARGV[1] = job
// ARGV[2] = epoch seconds for job to be run at
// ARGV[2] = updated job or just a 1 if arguments don't update
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, I think

// ARGV[3] = epoch seconds for job to be run at
var redisLuaEnqueueUniqueIn = `
if redis.call('set', KEYS[2], '1', 'NX', 'EX', '86400') then
redis.call('zadd', KEYS[1], ARGV[2], ARGV[1])
if redis.call('set', KEYS[2], ARGV[2], 'NX', 'EX', '86400') then
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the TTL not following the secondsFromNow value, instead being always 24h

redis.call('zadd', KEYS[1], ARGV[3], ARGV[1])
return 'ok'
else
redis.call('set', KEYS[2], ARGV[2], 'EX', '86400')
end
return 'dup'
`
Loading