Skip to content

Commit

Permalink
FEATURE: add MaxConcurrency control to job options (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
shdunning authored Apr 3, 2017
1 parent f5d6a0b commit e59a139
Show file tree
Hide file tree
Showing 12 changed files with 370 additions and 53 deletions.
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ gocraft/work lets you enqueue and processes background jobs in Go. Jobs are dura
* Enqueue unique jobs so that only one job with a given name/arguments exists in the queue at once.
* Web UI to manage failed jobs and observe the system.
* Periodically enqueue jobs on a cron-like schedule.
* Pause / unpause jobs and control concurrency within and across processes

## Enqueue new jobs

Expand Down Expand Up @@ -245,7 +246,8 @@ You'll see a view that looks like this:
### Processing a job

* To process a job, a worker will execute a Lua script to atomically move a job its queue to an in-progress queue.
* The worker will then run the job. The job will either finish successfully or result in an error or panic.
* A job is dequeued and moved to in-progress if the job queue is not paused and the number of active jobs does not exceed concurrency limit for the job type
* The worker will then run the job and increment the job lock. The job will either finish successfully or result in an error or panic.
* If the process completely crashes, the reaper will eventually find it in its in-progress queue and requeue it.
* If the job is successful, we'll simply remove the job from the in-progress queue.
* If the job returns an error or panic, we'll see how many retries a job has left. If it doesn't have any, we'll move it to the dead queue. If it has retries left, we'll consume a retry and add the job to the retry queue.
Expand Down Expand Up @@ -294,6 +296,18 @@ You'll see a view that looks like this:
* You can pause jobs from being processed from a specific queue by setting a "paused" redis key (see `redisKeyJobsPaused`)
* Conversely, jobs in the queue will resume being processed once the paused redis key is removed

## Job concurrency

* You can control job concurrency using `JobOptions{MaxConcurrency: <num>}`.
* Unlike the WorkerPool concurrency, this controls the limit on the number jobs of that type that can be active at one time by within a single redis instance
* This works by putting a precondition on enqueuing function, meaning a new job will not be scheduled if we are at or over a job's `MaxConcurrency` limit
* A redis key (see `redisKeyJobsLock`) is used as a counting semaphore in order to track job concurrency per job type
* The default value is `0`, which means "no limit on job concurrency"
* **Note:** if you want to run jobs "single threaded" then you can set the `MaxConcurrency` accordingly:
```go
worker_pool.JobWithOptions(jobName, JobOptions{MaxConcurrency: 1}, (*Context).WorkFxn)
```

### Terminology reference
* "worker pool" - a pool of workers
* "worker" - an individual worker in a single goroutine. Gets a job from redis, does job, gets next job...
Expand All @@ -310,6 +324,7 @@ You'll see a view that looks like this:
* "scheduled jobs" - jobs enqueued to be run in th future will be put on a scheduled job queue.
* "dead jobs" - if a job exceeds its MaxFails count, it will be put on the dead job queue.
* "paused jobs" - if paused key is present for a queue, then no jobs from that queue will be processed by any workers until that queue's paused key is removed
* "job concurrency" - the number of jobs being actively processed of a particular type across worker pool processes but within a single redis instance

## Benchmarks

Expand Down
8 changes: 5 additions & 3 deletions dead_pool_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,13 @@ func (r *deadPoolReaper) reap() error {
}

func (r *deadPoolReaper) requeueInProgressJobs(poolID string, jobTypes []string) error {
redisRequeueScript := redis.NewScript(len(jobTypes)*3, redisLuaRpoplpushMultiCmd)
numArgs := len(jobTypes) * 2
redisRequeueScript := redis.NewScript(numArgs, redisLuaReenqueueJob)
var scriptArgs = make([]interface{}, 0, numArgs)

var scriptArgs = make([]interface{}, 0, len(jobTypes)*3)
for _, jobType := range jobTypes {
scriptArgs = append(scriptArgs, redisKeyJobsInProgress(r.namespace, poolID, jobType), redisKeyJobs(r.namespace, jobType), redisKeyJobsPaused(r.namespace, jobType))
// pops from in progress, push into job queue and decrement the queue lock
scriptArgs = append(scriptArgs, redisKeyJobsInProgress(r.namespace, poolID, jobType), redisKeyJobs(r.namespace, jobType))
}

conn := r.pool.Get()
Expand Down
7 changes: 7 additions & 0 deletions dead_pool_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func TestDeadPoolReaper(t *testing.T) {
// Test requeueing jobs
_, err = conn.Do("lpush", redisKeyJobsInProgress(ns, "2", "type1"), "foo")
assert.NoError(t, err)
_, err = conn.Do("incr", redisKeyJobsLock(ns, "type1"))
assert.NoError(t, err)

// Ensure 0 jobs in jobs queue
jobsCount, err := redis.Int(conn.Do("llen", redisKeyJobs(ns, "type1")))
Expand All @@ -79,6 +81,11 @@ func TestDeadPoolReaper(t *testing.T) {
jobsCount, err = redis.Int(conn.Do("llen", redisKeyJobsInProgress(ns, "2", "type1")))
assert.NoError(t, err)
assert.Equal(t, 0, jobsCount)

// Lock count should get decremented
lockCount, err := redis.Int(conn.Do("get", redisKeyJobsLock(ns, "type1")))
assert.NoError(t, err)
assert.Equal(t, 0, lockCount)
}

func TestDeadPoolReaperNoHeartbeat(t *testing.T) {
Expand Down
5 changes: 4 additions & 1 deletion enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[stri
return scheduledJob, nil
}

// EnqueueUnique enqueues a job unless a job is already enqueued with the same name and arguments. The already-enqueued job can be in the normal work queue or in the scheduled job queue. Once a worker begins processing a job, another job with the same name and arguments can be enqueued again. Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once.
// EnqueueUnique enqueues a job unless a job is already enqueued with the same name and arguments.
// The already-enqueued job can be in the normal work queue or in the scheduled job queue.
// Once a worker begins processing a job, another job with the same name and arguments can be enqueued again.
// Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once.
// In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs.
// EnqueueUnique returns the job if it was enqueued and nil if it wasn't
func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (*Job, error) {
Expand Down
12 changes: 9 additions & 3 deletions enqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package work

import (
"fmt"
"github.com/stretchr/testify/assert"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestEnqueue(t *testing.T) {
Expand Down Expand Up @@ -101,7 +103,7 @@ func TestEnqueueUnique(t *testing.T) {
ns := "work"
cleanKeyspace(ns, pool)
enqueuer := NewEnqueuer(ns, pool)

var mutex = &sync.Mutex{}
job, err := enqueuer.EnqueueUnique("wat", Q{"a": 1, "b": "cool"})
assert.NoError(t, err)
if assert.NotNil(t, job) {
Expand Down Expand Up @@ -134,15 +136,19 @@ func TestEnqueueUnique(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, job)

// Process the queues. Ensure the right numbero of jobs was processed
// Process the queues. Ensure the right number of jobs were processed
var wats, taws int64
wp := NewWorkerPool(TestContext{}, 3, ns, pool)
wp.JobWithOptions("wat", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error {
mutex.Lock()
wats++
mutex.Unlock()
return nil
})
wp.JobWithOptions("taw", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error {
mutex.Lock()
taws++
mutex.Unlock()
return fmt.Errorf("ohno")
})
wp.Start()
Expand Down
4 changes: 1 addition & 3 deletions priority_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@ type sampleItem struct {
// payload:
redisJobs string
redisJobsInProg string
redisJobsPaused string
}

func (s *prioritySampler) add(priority uint, redisJobs, redisJobsInProg, redisJobsPaused string) {
func (s *prioritySampler) add(priority uint, redisJobs, redisJobsInProg string) {
sample := sampleItem{
priority: priority,
redisJobs: redisJobs,
redisJobsInProg: redisJobsInProg,
redisJobsPaused: redisJobsPaused,
}
s.samples = append(s.samples, sample)
s.sum += priority
Expand Down
8 changes: 4 additions & 4 deletions priority_sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (

func TestPrioritySampler(t *testing.T) {
ps := prioritySampler{}
ps.add(5, "jobs.5", "jobsinprog.5", "jobs.5.paused")
ps.add(2, "jobs.2a", "jobsinprog.2a", "jobs.2a.paused")
ps.add(1, "jobs.1b", "jobsinprog.1b", "jobs.1b.paused")
ps.add(5, "jobs.5", "jobsinprog.5")
ps.add(2, "jobs.2a", "jobsinprog.2a")
ps.add(1, "jobs.1b", "jobsinprog.1b")

var c5 = 0
var c2 = 0
Expand Down Expand Up @@ -41,7 +41,7 @@ func TestPrioritySampler(t *testing.T) {
func BenchmarkPrioritySampler(b *testing.B) {
ps := prioritySampler{}
for i := 0; i < 200; i++ {
ps.add(uint(i)+1, "jobs."+fmt.Sprint(i), "jobsinprog."+fmt.Sprint(i), "jobspaused."+fmt.Sprint(i))
ps.add(uint(i)+1, "jobs."+fmt.Sprint(i), "jobsinprog."+fmt.Sprint(i))
}

b.ResetTimer()
Expand Down
137 changes: 119 additions & 18 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,19 @@ func redisKeyHeartbeat(namespace, workerPoolID string) string {
return redisNamespacePrefix(namespace) + "worker_pools:" + workerPoolID
}

var pauseKeySuffix = "paused"
func redisKeyJobsPaused(namespace, jobName string) string {
return fmt.Sprintf("%s:%s", redisKeyJobs(namespace, jobName), "paused")
return redisKeyJobs(namespace, jobName) + ":" + pauseKeySuffix
}

var lockKeySuffix = "lock"
func redisKeyJobsLock(namespace, jobName string) string {
return redisKeyJobs(namespace, jobName) + ":" + lockKeySuffix
}

var concurrencyKeySuffix = "max_concurrency"
func redisKeyJobsConcurrency(namespace, jobName string) string {
return redisKeyJobs(namespace, jobName) + ":" + concurrencyKeySuffix
}

func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) (string, error) {
Expand All @@ -82,32 +93,104 @@ func redisKeyLastPeriodicEnqueue(namespace string) string {
return redisNamespacePrefix(namespace) + "last_periodic_enqueue"
}

// Used by Lua scripts below and needs to follow same naming convention as redisKeyJobs* functions above
// note: all assume the local var jobQueue is present, which is the the val of redisKeyJobs()
var redisLuaJobsPausedKey = fmt.Sprintf(`
local function getPauseKey(jobQueue)
return string.format("%%s:%s", jobQueue)
end`, pauseKeySuffix)

var redisLuaJobsLockedKey = fmt.Sprintf(`
local function getLockKey(jobQueue)
return string.format("%%s:%s", jobQueue)
end`, lockKeySuffix)

var redisLuaJobsConcurrencyKey = fmt.Sprintf(`
local function getConcurrencyKey(jobQueue)
return string.format("%%s:%s", jobQueue)
end`, concurrencyKeySuffix)

// Used to fetch the next job to run
//
// KEYS[1] = the 1st job queue we want to try, eg, "work:jobs:emails"
// KEYS[2] = the 1st job queue's in prog queue, eg, "work:jobs:emails:97c84119d13cb54119a38743:inprogress"
// KEYS[3] = the 1st job queue's paused key, eg, "work:jobs:emails:paused"
// KEYS[4] = the 2nd job queue...
// KEYS[5] = the 2nd job queue's in prog queue...
// KEYS[6] = the 2nd job queue's paused key...
// KEYS[3] = the 2nd job queue...
// KEYS[4] = the 2nd job queue's in prog queue...
// ...
// KEYS[N] = the last job queue...
// KEYS[N+1] = the last job queue's in prog queue...
// KEYS[N+2] = the last job queue's paused key...
var redisLuaRpoplpushMultiCmd = `
local function isPaused(pausekey)
return redis.call('get', pausekey)
var redisLuaFetchJob = fmt.Sprintf(`
-- getPauseKey will be inserted below
%s
-- getLockKey will be inserted below
%s
-- getConcurrencyKey will be inserted below
%s
local function haveJobs(jobQueue)
return redis.call('llen', jobQueue) > 0
end
local res
local function isPaused(pauseKey)
return redis.call('get', pauseKey)
end
local function canRun(lockKey, maxConcurrency)
local activeJobs = tonumber(redis.call('get', lockKey))
if (not maxConcurrency or maxConcurrency == 0) or (not activeJobs or activeJobs < maxConcurrency) then
-- default case: maxConcurrency not defined or set to 0 means no cap on concurrent jobs OR
-- maxConcurrency set, but lock does not yet exist OR
-- maxConcurrency set, lock is set, but not yet at max concurrency
redis.call('incr', lockKey)
return true
else
-- we are at max capacity for running jobs
return false
end
end
local res, jobQueue, inProgQueue, pauseKey, lockKey, maxConcurrency
local keylen = #KEYS
for i=1,keylen,3 do
if not isPaused(KEYS[i+2]) then
res = redis.call('rpoplpush', KEYS[i], KEYS[i+1])
if res then
return {res, KEYS[i], KEYS[i+1]}
end
for i=1,keylen,2 do
jobQueue = KEYS[i]
inProgQueue = KEYS[i+1]
pauseKey = getPauseKey(jobQueue)
lockKey = getLockKey(jobQueue)
maxConcurrency = tonumber(redis.call('get', getConcurrencyKey(jobQueue)))
if haveJobs(jobQueue) and not isPaused(pauseKey) and canRun(lockKey, maxConcurrency) then
res = redis.call('rpoplpush', jobQueue, inProgQueue)
return {res, jobQueue, inProgQueue}
end
end
return nil
`
return nil`, redisLuaJobsPausedKey, redisLuaJobsLockedKey, redisLuaJobsConcurrencyKey)

// Used by the reaper to re-enqueue jobs that were in progress
//
// KEYS[1] = the 1st job's in progress queue
// KEYS[2] = the 1st job's job queue
// KEYS[3] = the 2nd job's in progress queue
// KEYS[4] = the 2nd job's job queue
// ...
// KEYS[N] = the last job's in progress queue
// KEYS[N+1] = the last job's job queue
var redisLuaReenqueueJob = fmt.Sprintf(`
-- getLockKey inserted below
%s
local keylen = #KEYS
local res, jobQueue, inProgQueue
for i=1,keylen,2 do
inProgQueue = KEYS[i]
jobQueue = KEYS[i+1]
res = redis.call('rpoplpush', inProgQueue, jobQueue)
if res then
redis.call('decr', getLockKey(jobQueue))
return {res, inProgQueue, jobQueue}
end
end
return nil`, redisLuaJobsLockedKey)

// KEYS[1] = zset of jobs (retry or scheduled), eg work:retry
// KEYS[2] = zset of dead, eg work:dead. If we don't know the jobName of a job, we'll put it in dead.
Expand Down Expand Up @@ -258,3 +341,21 @@ if redis.call('set', KEYS[2], '1', 'NX', 'EX', '86400') then
end
return 'dup'
`

// KEYS[1] = jobs run queue
var redisRemoveStaleKeys = fmt.Sprintf(`
-- getLockKey will be inserted below
%s
-- getConcurrencyKey will be inserted below
%s
local function isInProgress(jobQueue)
return redis.call('keys', jobQueue .. ':*:inprogress')
end
local jobQueue = KEYS[1]
if next(isInProgress(jobQueue)) == nil then
redis.call('del', getLockKey(jobQueue))
redis.call('del', getConcurrencyKey(jobQueue))
end
return 0`, redisLuaJobsLockedKey, redisLuaJobsConcurrencyKey)
Loading

0 comments on commit e59a139

Please sign in to comment.