Skip to content

Commit

Permalink
Add sleep backoffs as option when creating new worker pools (gocraft#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
beornf authored and hoffoo committed Jul 30, 2018
1 parent 2716425 commit 23b73b8
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 27 deletions.
38 changes: 22 additions & 16 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (
const fetchKeysPerJobType = 6

type worker struct {
workerID string
poolID string
namespace string
pool *redis.Pool
jobTypes map[string]*jobType
middleware []*middlewareHandler
contextType reflect.Type
workerID string
poolID string
namespace string
pool *redis.Pool
jobTypes map[string]*jobType
sleepBackoffs []int64
middleware []*middlewareHandler
contextType reflect.Type

redisFetchScript *redis.Script
sampler prioritySampler
Expand All @@ -31,16 +32,21 @@ type worker struct {
doneDrainingChan chan struct{}
}

func newWorker(namespace string, poolID string, pool *redis.Pool, contextType reflect.Type, middleware []*middlewareHandler, jobTypes map[string]*jobType) *worker {
func newWorker(namespace string, poolID string, pool *redis.Pool, contextType reflect.Type, middleware []*middlewareHandler, jobTypes map[string]*jobType, sleepBackoffs []int64) *worker {
workerID := makeIdentifier()
ob := newObserver(namespace, pool, workerID)

if len(sleepBackoffs) == 0 {
sleepBackoffs = sleepBackoffsInMilliseconds
}

w := &worker{
workerID: workerID,
poolID: poolID,
namespace: namespace,
pool: pool,
contextType: contextType,
workerID: workerID,
poolID: poolID,
namespace: namespace,
pool: pool,
contextType: contextType,
sleepBackoffs: sleepBackoffs,

observer: ob,

Expand Down Expand Up @@ -126,10 +132,10 @@ func (w *worker) loop() {
}
consequtiveNoJobs++
idx := consequtiveNoJobs
if idx >= int64(len(sleepBackoffsInMilliseconds)) {
idx = int64(len(sleepBackoffsInMilliseconds)) - 1
if idx >= int64(len(w.sleepBackoffs)) {
idx = int64(len(w.sleepBackoffs)) - 1
}
timer.Reset(time.Duration(sleepBackoffsInMilliseconds[idx]) * time.Millisecond)
timer.Reset(time.Duration(w.sleepBackoffs[idx]) * time.Millisecond)
}
}
}
Expand Down
35 changes: 24 additions & 11 deletions worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (

// WorkerPool represents a pool of workers. It forms the primary API of gocraft/work. WorkerPools provide the public API of gocraft/work. You can attach jobs and middlware to them. You can start and stop them. Based on their concurrency setting, they'll spin up N worker goroutines.
type WorkerPool struct {
workerPoolID string
concurrency uint
namespace string // eg, "myapp-work"
pool *redis.Pool
workerPoolID string
concurrency uint
namespace string // eg, "myapp-work"
pool *redis.Pool
sleepBackoffs []int64

contextType reflect.Type
jobTypes map[string]*jobType
Expand Down Expand Up @@ -62,6 +63,11 @@ type JobOptions struct {
Backoff BackoffCalculator // If not set, uses the default backoff algorithm
}

// WorkerPoolOptions can be passed to NewWorkerPoolWithOptions.
type WorkerPoolOptions struct {
SleepBackoffs []int64 // Sleep backoffs in milliseconds
}

// GenericHandler is a job handler without any custom context.
type GenericHandler func(*Job) error

Expand All @@ -80,23 +86,30 @@ type middlewareHandler struct {
// NewWorkerPool creates a new worker pool. ctx should be a struct literal whose type will be used for middleware and handlers.
// concurrency specifies how many workers to spin up - each worker can process jobs concurrently.
func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, pool *redis.Pool) *WorkerPool {
return NewWorkerPoolWithOptions(ctx, concurrency, namespace, pool, WorkerPoolOptions{})
}

// NewWorkerPoolWithOptions creates a new worker pool as per the NewWorkerPool function, but permits you to specify
// additional options such as sleep backoffs.
func NewWorkerPoolWithOptions(ctx interface{}, concurrency uint, namespace string, pool *redis.Pool, workerPoolOpts WorkerPoolOptions) *WorkerPool {
if pool == nil {
panic("NewWorkerPool needs a non-nil *redis.Pool")
}

ctxType := reflect.TypeOf(ctx)
validateContextType(ctxType)
wp := &WorkerPool{
workerPoolID: makeIdentifier(),
concurrency: concurrency,
namespace: namespace,
pool: pool,
contextType: ctxType,
jobTypes: make(map[string]*jobType),
workerPoolID: makeIdentifier(),
concurrency: concurrency,
namespace: namespace,
pool: pool,
sleepBackoffs: workerPoolOpts.SleepBackoffs,
contextType: ctxType,
jobTypes: make(map[string]*jobType),
}

for i := uint(0); i < wp.concurrency; i++ {
w := newWorker(wp.namespace, wp.workerPoolID, wp.pool, wp.contextType, nil, wp.jobTypes)
w := newWorker(wp.namespace, wp.workerPoolID, wp.pool, wp.contextType, nil, wp.jobTypes, wp.sleepBackoffs)
wp.workers = append(wp.workers, w)
}

Expand Down

0 comments on commit 23b73b8

Please sign in to comment.