Skip to content

Commit

Permalink
One poll loop per worker to avoid starvation
Browse files Browse the repository at this point in the history
  • Loading branch information
jinroh committed Jan 2, 2018
1 parent 456b325 commit ee58afb
Showing 1 changed file with 11 additions and 21 deletions.
32 changes: 11 additions & 21 deletions pkg/jobs/redis_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ const redisPrefix = "j/"

type redisBroker struct {
client redis.UniversalClient
queues map[string]chan *Job
nbWorkers int
workers []*Worker
running uint32
Expand All @@ -29,7 +28,6 @@ func NewRedisBroker(nbWorkers int, client redis.UniversalClient) Broker {
return &redisBroker{
client: client,
nbWorkers: nbWorkers,
queues: make(map[string]chan *Job),
closed: make(chan struct{}),
}
}
Expand All @@ -46,23 +44,20 @@ func (b *redisBroker) Start(ws WorkersList) error {
setNbSlots(b.nbWorkers)
joblog.Infof("Starting redis broker with %d workers", b.nbWorkers)

var keys []string
b.workers = make([]*Worker, 0, len(ws))
for workerType, conf := range ws {
ch := make(chan *Job)
w := &Worker{
Type: workerType,
Conf: conf,
}
b.queues[workerType] = ch
b.workers = append(b.workers, w)
keys = append(keys, redisPrefix+workerType)
if err := w.Start(ch); err != nil {
return err
}
go b.pollLoop(redisPrefix+workerType, ch)
}

go b.pollLoop(keys)

return nil
}

Expand All @@ -85,11 +80,13 @@ func (b *redisBroker) Shutdown(ctx context.Context) error {
fmt.Print(" shutting down redis broker...")
defer b.client.Close()

select {
case <-ctx.Done():
fmt.Println("failed:", ctx.Err())
return ctx.Err()
case <-b.closed:
for i := 0; i < len(b.workers); i++ {
select {
case <-ctx.Done():
fmt.Println("failed:", ctx.Err())
return ctx.Err()
case <-b.closed:
}
}

errs := make(chan error)
Expand All @@ -114,7 +111,7 @@ func (b *redisBroker) Shutdown(ctx context.Context) error {

var redisBRPopTimeout = 30 * time.Second

func (b *redisBroker) pollLoop(keys []string) {
func (b *redisBroker) pollLoop(key string, ch chan<- *Job) {
defer func() {
b.closed <- struct{}{}
}()
Expand All @@ -124,7 +121,7 @@ func (b *redisBroker) pollLoop(keys []string) {
return
}

results, err := b.client.BRPop(redisBRPopTimeout, keys...).Result()
results, err := b.client.BRPop(redisBRPopTimeout, key).Result()
if err != nil || len(results) < 2 {
time.Sleep(100 * time.Millisecond)
continue
Expand All @@ -143,13 +140,6 @@ func (b *redisBroker) pollLoop(keys []string) {
continue
}

workerType := key[len(redisPrefix):]
ch, ok := b.queues[workerType]
if !ok {
joblog.Warnf("Unknown workerType: %s", workerType)
continue
}

domain, jobID := parts[0], parts[1]
job, err := Get(domain, jobID)
if err != nil {
Expand Down

0 comments on commit ee58afb

Please sign in to comment.