diff --git a/worker.go b/worker.go index 06863cb7..913d5fc8 100644 --- a/worker.go +++ b/worker.go @@ -12,13 +12,14 @@ import ( const fetchKeysPerJobType = 6 type worker struct { - workerID string - poolID string - namespace string - pool *redis.Pool - jobTypes map[string]*jobType - middleware []*middlewareHandler - contextType reflect.Type + workerID string + poolID string + namespace string + pool *redis.Pool + jobTypes map[string]*jobType + sleepBackoffs []int64 + middleware []*middlewareHandler + contextType reflect.Type redisFetchScript *redis.Script sampler prioritySampler @@ -31,16 +32,21 @@ type worker struct { doneDrainingChan chan struct{} } -func newWorker(namespace string, poolID string, pool *redis.Pool, contextType reflect.Type, middleware []*middlewareHandler, jobTypes map[string]*jobType) *worker { +func newWorker(namespace string, poolID string, pool *redis.Pool, contextType reflect.Type, middleware []*middlewareHandler, jobTypes map[string]*jobType, sleepBackoffs []int64) *worker { workerID := makeIdentifier() ob := newObserver(namespace, pool, workerID) + if len(sleepBackoffs) == 0 { + sleepBackoffs = sleepBackoffsInMilliseconds + } + w := &worker{ - workerID: workerID, - poolID: poolID, - namespace: namespace, - pool: pool, - contextType: contextType, + workerID: workerID, + poolID: poolID, + namespace: namespace, + pool: pool, + contextType: contextType, + sleepBackoffs: sleepBackoffs, observer: ob, @@ -126,10 +132,10 @@ func (w *worker) loop() { } consequtiveNoJobs++ idx := consequtiveNoJobs - if idx >= int64(len(sleepBackoffsInMilliseconds)) { - idx = int64(len(sleepBackoffsInMilliseconds)) - 1 + if idx >= int64(len(w.sleepBackoffs)) { + idx = int64(len(w.sleepBackoffs)) - 1 } - timer.Reset(time.Duration(sleepBackoffsInMilliseconds[idx]) * time.Millisecond) + timer.Reset(time.Duration(w.sleepBackoffs[idx]) * time.Millisecond) } } } diff --git a/worker_pool.go b/worker_pool.go index 702922ac..a1a383d4 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -12,10 +12,11 @@ import ( // WorkerPool represents a pool of workers. It forms the primary API of gocraft/work. WorkerPools provide the public API of gocraft/work. You can attach jobs and middlware to them. You can start and stop them. Based on their concurrency setting, they'll spin up N worker goroutines. type WorkerPool struct { - workerPoolID string - concurrency uint - namespace string // eg, "myapp-work" - pool *redis.Pool + workerPoolID string + concurrency uint + namespace string // eg, "myapp-work" + pool *redis.Pool + sleepBackoffs []int64 contextType reflect.Type jobTypes map[string]*jobType @@ -62,6 +63,11 @@ type JobOptions struct { Backoff BackoffCalculator // If not set, uses the default backoff algorithm } +// WorkerPoolOptions can be passed to NewWorkerPoolWithOptions. +type WorkerPoolOptions struct { + SleepBackoffs []int64 // Sleep backoffs in milliseconds +} + // GenericHandler is a job handler without any custom context. type GenericHandler func(*Job) error @@ -80,6 +86,12 @@ type middlewareHandler struct { // NewWorkerPool creates a new worker pool. ctx should be a struct literal whose type will be used for middleware and handlers. // concurrency specifies how many workers to spin up - each worker can process jobs concurrently. func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, pool *redis.Pool) *WorkerPool { + return NewWorkerPoolWithOptions(ctx, concurrency, namespace, pool, WorkerPoolOptions{}) +} + +// NewWorkerPoolWithOptions creates a new worker pool as per the NewWorkerPool function, but permits you to specify +// additional options such as sleep backoffs. +func NewWorkerPoolWithOptions(ctx interface{}, concurrency uint, namespace string, pool *redis.Pool, workerPoolOpts WorkerPoolOptions) *WorkerPool { if pool == nil { panic("NewWorkerPool needs a non-nil *redis.Pool") } @@ -87,16 +99,17 @@ func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, pool *re ctxType := reflect.TypeOf(ctx) validateContextType(ctxType) wp := &WorkerPool{ - workerPoolID: makeIdentifier(), - concurrency: concurrency, - namespace: namespace, - pool: pool, - contextType: ctxType, - jobTypes: make(map[string]*jobType), + workerPoolID: makeIdentifier(), + concurrency: concurrency, + namespace: namespace, + pool: pool, + sleepBackoffs: workerPoolOpts.SleepBackoffs, + contextType: ctxType, + jobTypes: make(map[string]*jobType), } for i := uint(0); i < wp.concurrency; i++ { - w := newWorker(wp.namespace, wp.workerPoolID, wp.pool, wp.contextType, nil, wp.jobTypes) + w := newWorker(wp.namespace, wp.workerPoolID, wp.pool, wp.contextType, nil, wp.jobTypes, wp.sleepBackoffs) wp.workers = append(wp.workers, w) }