Skip to content

Commit

Permalink
TWEAK: make worker pool start/stop idempotent
Browse files Browse the repository at this point in the history
  • Loading branch information
cypriss committed Jul 6, 2016
1 parent 6c1de6e commit 7ec320a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
12 changes: 11 additions & 1 deletion worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type WorkerPool struct {
contextType reflect.Type
jobTypes map[string]*jobType
middleware []*middlewareHandler
started bool

workers []*worker
heartbeater *workerPoolHeartbeater
Expand Down Expand Up @@ -139,8 +140,12 @@ func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn interfa

// Start starts the workers and associated processes.
func (wp *WorkerPool) Start() {
if wp.started {
return
}
wp.started = true

go wp.writeKnownJobsToRedis()
// todo: what if already started?
for _, w := range wp.workers {
go w.start()
}
Expand All @@ -152,6 +157,11 @@ func (wp *WorkerPool) Start() {

// Stop stops the workers and associated processes.
func (wp *WorkerPool) Stop() {
if !wp.started {
return
}
wp.started = false

wg := sync.WaitGroup{}
for _, w := range wp.workers {
wg.Add(1)
Expand Down
10 changes: 10 additions & 0 deletions worker_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,13 @@ func TestWorkerPoolMiddlewareValidations(t *testing.T) {
}
}
}

func TestWorkerPoolStartStop(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
wp := NewWorkerPool(TestContext{}, 10, ns, pool)
wp.Start()
wp.Start()
wp.Stop()
wp.Stop()
}

0 comments on commit 7ec320a

Please sign in to comment.