Skip to content

Commit

Permalink
FEATURE: unique jobs
Browse files Browse the repository at this point in the history
Unique jobs let you only have one job with a given name/arguments on the queue at once. This includes both the normal queue as well as the scheduled queue.
  • Loading branch information
cypriss committed Jul 15, 2016
1 parent 0ee600d commit f0fb3b3
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 7 deletions.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ gocraft/work lets you enqueue and processes background jobs in Go. Jobs are dura
* Middleware on jobs -- good for metrics instrumentation, logging, etc.
* If a job fails, it will be retried a specified number of times.
* Schedule jobs to happen in the future.
* Enqueue unique jobs so that only one job with a given name/arguments exists in the queue at once.
* Web UI to manage failed jobs and observe the system.
* Periodically enqueue jobs on a cron-like schedule.

Expand Down Expand Up @@ -184,6 +185,17 @@ enqueuer.EnqueueIn("send_welcome_email", secondsInTheFuture, work.Q{"address": "

```

### Unique Jobs

You can enqueue unique jobs so that only one job with a given name/arguments exists in the queue at once. For instance, you might have a worker that expires the cache of an object. It doesn't make sense for multiple such jobs to exist at once. Also note that unique jobs are supported for normal enqueues as well as scheduled enqueues.

```go
enqueuer := work.NewEnqueuer("my_app_namespace", redisPool)
ok, err := enqueuer.EnqueueUnique("clear_cache", work.Q{"object_id_": "123"}) // ok=true
ok, err = enqueuer.EnqueueUnique("clear_cache", work.Q{"object_id_": "123"}) // ok=false -- this duplicate job isn't enqueued.
ok, err = enqueuer.EnqueueUniqueIn("clear_cache", 300, work.Q{"object_id_": "789"}) // ok=true (diff id)
```

### Periodic Enqueueing (Cron)

You can periodically enqueue jobs on your gocraft/work cluster using your worker pool. The [scheduling specification](https://godoc.org/github.com/robfig/cron#hdr-CRON_Expression_Format) uses a Cron syntax where the fields represent seconds, minutes, hours, day of the month, month, and week of the day, respectively. Even if you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once.
Expand Down Expand Up @@ -265,6 +277,12 @@ You'll see a view that looks like this:
* If a process crashes hard (eg, the power on the server turns off or the kernal freezes), some jobs may be in progress and we won't want to lose them. They're safe in their in-progress queue.
* The reaper will look for worker pools without a heartbeat. It will scan their in-progress queues and requeue anything it finds.

### Unique jobs

* You can enqueue unique jobs such that a given name/arguments are on the queue at once.
* Both normal queues and the scheduled queue are considered.
* When a unique job is enqueued, we'll atomically set a redis key that includes the job name and arguments and enqueue the job.
* When the job is processed, we'll delete that key to permit another job to be enqueued.

### Periodic Jobs

Expand Down
93 changes: 87 additions & 6 deletions enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@ type Enqueuer struct {
Namespace string // eg, "myapp-work"
Pool *redis.Pool

queuePrefix string // eg, "myapp-work:jobs:"
knownJobs map[string]int64
queuePrefix string // eg, "myapp-work:jobs:"
knownJobs map[string]int64
enqueueUniqueScript *redis.Script
enqueueUniqueInScript *redis.Script
}

// NewEnqueuer creates a new enqueuer with the specified Redis namespace and Redis pool.
func NewEnqueuer(namespace string, pool *redis.Pool) *Enqueuer {
return &Enqueuer{
Namespace: namespace,
Pool: pool,
queuePrefix: redisKeyJobsPrefix(namespace),
knownJobs: make(map[string]int64),
Namespace: namespace,
Pool: pool,
queuePrefix: redisKeyJobsPrefix(namespace),
knownJobs: make(map[string]int64),
enqueueUniqueScript: redis.NewScript(2, redisLuaEnqueueUnique),
enqueueUniqueInScript: redis.NewScript(2, redisLuaEnqueueUniqueIn),
}
}

Expand Down Expand Up @@ -82,6 +86,83 @@ func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[stri
return nil
}

// EnqueueUnique enqueues a job unless a job is already enqueued with the same name and arguments. The already-enqueued job can be in the normal work queue or in the scheduled job queue. Once a worker begins processing a job, another job with the same name and arguments can be enqueued again. Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once.
// In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for schedule jobs.
// EnqueueUnique returns true if the job is enqueued and false if it wasn't enqueued.
func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (bool, error) {
uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, args)
if err != nil {
return false, err
}

job := &Job{
Name: jobName,
ID: makeIdentifier(),
EnqueuedAt: nowEpochSeconds(),
Args: args,
Unique: true,
}

rawJSON, err := job.serialize()
if err != nil {
return false, err
}

conn := e.Pool.Get()
defer conn.Close()

if err := e.addToKnownJobs(conn, jobName); err != nil {
return false, err
}

scriptArgs := make([]interface{}, 0, 3)
scriptArgs = append(scriptArgs, e.queuePrefix+jobName) // KEY[1]
scriptArgs = append(scriptArgs, uniqueKey) // KEY[2]
scriptArgs = append(scriptArgs, rawJSON)

res, err := redis.String(e.enqueueUniqueScript.Do(conn, scriptArgs...))

return res == "ok", err
}

// EnqueueUniqueIn enqueues a unique job in the scheduled job queue for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs.
func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (bool, error) {
uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, args)
if err != nil {
return false, err
}

job := &Job{
Name: jobName,
ID: makeIdentifier(),
EnqueuedAt: nowEpochSeconds(),
Args: args,
Unique: true,
}

rawJSON, err := job.serialize()
if err != nil {
return false, err
}

conn := e.Pool.Get()
defer conn.Close()

if err := e.addToKnownJobs(conn, jobName); err != nil {
return false, err
}

scriptArgs := make([]interface{}, 0, 4)
scriptArgs = append(scriptArgs, redisKeyScheduled(e.Namespace)) // KEY[1]
scriptArgs = append(scriptArgs, uniqueKey) // KEY[2]
scriptArgs = append(scriptArgs, rawJSON) // ARGV[1]
scriptArgs = append(scriptArgs, nowEpochSeconds()+secondsFromNow) // ARGV[2]

res, err := redis.String(e.enqueueUniqueInScript.Do(conn, scriptArgs...))

return res == "ok", err
}

func (e *Enqueuer) addToKnownJobs(conn redis.Conn, jobName string) error {
needSadd := true
now := time.Now().Unix()
Expand Down
113 changes: 113 additions & 0 deletions enqueue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package work

import (
"fmt"
"github.com/stretchr/testify/assert"
"testing"
"time"
Expand Down Expand Up @@ -79,3 +80,115 @@ func TestEnqueueIn(t *testing.T) {
assert.EqualValues(t, 1, j.ArgInt64("a"))
assert.NoError(t, j.ArgError())
}

func TestEnqueueUnique(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)
enqueuer := NewEnqueuer(ns, pool)

ok, err := enqueuer.EnqueueUnique("wat", Q{"a": 1, "b": "cool"})
assert.NoError(t, err)
assert.True(t, ok)

ok, err = enqueuer.EnqueueUnique("wat", Q{"a": 1, "b": "cool"})
assert.NoError(t, err)
assert.False(t, ok)

ok, err = enqueuer.EnqueueUnique("wat", Q{"a": 1, "b": "coolio"})
assert.NoError(t, err)
assert.True(t, ok)

ok, err = enqueuer.EnqueueUnique("wat", nil)
assert.NoError(t, err)
assert.True(t, ok)

ok, err = enqueuer.EnqueueUnique("wat", nil)
assert.NoError(t, err)
assert.False(t, ok)

ok, err = enqueuer.EnqueueUnique("taw", nil)
assert.NoError(t, err)
assert.True(t, ok)

// Process the queues. Ensure the right numbero of jobs was processed
var wats, taws int64
wp := NewWorkerPool(TestContext{}, 3, ns, pool)
wp.JobWithOptions("wat", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error {
wats++
return nil
})
wp.JobWithOptions("taw", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error {
taws++
return fmt.Errorf("ohno")
})
wp.Start()
wp.Drain()
wp.Stop()

assert.EqualValues(t, 3, wats)
assert.EqualValues(t, 1, taws)

// Enqueue again. Ensure we can.
ok, err = enqueuer.EnqueueUnique("wat", Q{"a": 1, "b": "cool"})
assert.NoError(t, err)
assert.True(t, ok)

ok, err = enqueuer.EnqueueUnique("wat", Q{"a": 1, "b": "coolio"})
assert.NoError(t, err)
assert.True(t, ok)

// Even though taw resulted in an error, we should still be able to re-queue it.
// This could result in multiple taws enqueued at the same time in a production system.
ok, err = enqueuer.EnqueueUnique("taw", nil)
assert.NoError(t, err)
assert.True(t, ok)
}

func TestEnqueueUniqueIn(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)
enqueuer := NewEnqueuer(ns, pool)

// Enqueue two unique jobs -- ensure one job sticks.
ok, err := enqueuer.EnqueueUniqueIn("wat", 300, Q{"a": 1, "b": "cool"})
assert.NoError(t, err)
assert.True(t, ok)

ok, err = enqueuer.EnqueueUniqueIn("wat", 10, Q{"a": 1, "b": "cool"})
assert.NoError(t, err)
assert.False(t, ok)

// Get the job
score, j := jobOnZset(pool, redisKeyScheduled(ns))

assert.True(t, score > time.Now().Unix()+290) // We don't want to overwrite the time
assert.True(t, score <= time.Now().Unix()+300)

assert.Equal(t, "wat", j.Name)
assert.True(t, len(j.ID) > 10) // Something is in it
assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds
assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds
assert.Equal(t, "cool", j.ArgString("b"))
assert.EqualValues(t, 1, j.ArgInt64("a"))
assert.NoError(t, j.ArgError())
assert.True(t, j.Unique)

// Now try to enqueue more stuff and ensure it
ok, err = enqueuer.EnqueueUniqueIn("wat", 300, Q{"a": 1, "b": "coolio"})
assert.NoError(t, err)
assert.True(t, ok)

ok, err = enqueuer.EnqueueUniqueIn("wat", 300, nil)
assert.NoError(t, err)
assert.True(t, ok)

ok, err = enqueuer.EnqueueUniqueIn("wat", 300, nil)
assert.NoError(t, err)
assert.False(t, ok)

ok, err = enqueuer.EnqueueUniqueIn("taw", 300, nil)
assert.NoError(t, err)
assert.True(t, ok)
}
1 change: 1 addition & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Job struct {
ID string `json:"id"`
EnqueuedAt int64 `json:"t"`
Args map[string]interface{} `json:"args"`
Unique bool `json:"unique,omitempty"`

// Inputs when retrying
Fails int64 `json:"fails,omitempty"` // number of times this job has failed
Expand Down
47 changes: 46 additions & 1 deletion redis.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package work

import "fmt"
import (
"bytes"
"encoding/json"
"fmt"
)

func redisNamespacePrefix(namespace string) string {
l := len(namespace)
Expand Down Expand Up @@ -52,6 +56,24 @@ func redisKeyHeartbeat(namespace, workerPoolID string) string {
return redisNamespacePrefix(namespace) + "worker_pools:" + workerPoolID
}

func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) (string, error) {
var buf bytes.Buffer

buf.WriteString(redisNamespacePrefix(namespace))
buf.WriteString("unique:")
buf.WriteString(jobName)
buf.WriteRune(':')

if args != nil {
err := json.NewEncoder(&buf).Encode(args)
if err != nil {
return "", err
}
}

return buf.String(), nil
}

func redisKeyLastPeriodicEnqueue(namespace string) string {
return redisNamespacePrefix(namespace) + "last_periodic_enqueue"
}
Expand Down Expand Up @@ -130,3 +152,26 @@ for i=1,jobCount do
end
return requeuedCount
`

// KEYS[1] = job queue to push onto
// KEYS[2] = Unique job's key. Test for existance and set if we push.
// ARGV[1] = job
var redisLuaEnqueueUnique = `
if redis.call('set', KEYS[2], '1', 'NX', 'EX', '86400') then
redis.call('lpush', KEYS[1], ARGV[1])
return 'ok'
end
return 'dup'
`

// KEYS[1] = scheduled job queue
// KEYS[2] = Unique job's key. Test for existance and set if we push.
// ARGV[1] = job
// ARGV[2] = epoch seconds for job to be run at
var redisLuaEnqueueUniqueIn = `
if redis.call('set', KEYS[2], '1', 'NX', 'EX', '86400') then
redis.call('zadd', KEYS[1], ARGV[2], ARGV[1])
return 'ok'
end
return 'dup'
`
13 changes: 13 additions & 0 deletions todo.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
Enqueue API change
- return job
- update documentation

Client refactor:
- delete/retry dead jobs should take the explicit params instead of job

Client:
- delete scheduled jobs. Take into account unique jobs.

refactor:
- clean up enqueue code -- bit too much duplication.

IDEAS/TODO:
----
- zero context each time -- see if that affects performance
Expand Down
17 changes: 17 additions & 0 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ func (w *worker) fetchJob() (*Job, error) {
}

func (w *worker) processJob(job *Job) {
if job.Unique {
w.deleteUniqueJob(job)
}
if jt, ok := w.jobTypes[job.Name]; ok {
w.observeStarted(job.Name, job.ID, job.Args)
job.observer = w.observer // for Checkin
Expand All @@ -196,6 +199,20 @@ func (w *worker) processJob(job *Job) {
}
}

func (w *worker) deleteUniqueJob(job *Job) {
uniqueKey, err := redisKeyUniqueJob(w.namespace, job.Name, job.Args)
if err != nil {
logError("worker.delete_unique_job.key", err)
}
conn := w.pool.Get()
defer conn.Close()

_, err = conn.Do("DEL", uniqueKey)
if err != nil {
logError("worker.delete_unique_job.del", err)
}
}

func (w *worker) removeJobFromInProgress(job *Job) {
conn := w.pool.Get()
defer conn.Close()
Expand Down

0 comments on commit f0fb3b3

Please sign in to comment.