diff --git a/pkg/jobs/redis_jobs.go b/pkg/jobs/redis_jobs.go index 7729468d4ab..794b786fe12 100644 --- a/pkg/jobs/redis_jobs.go +++ b/pkg/jobs/redis_jobs.go @@ -16,7 +16,6 @@ const redisPrefix = "j/" type redisBroker struct { client redis.UniversalClient - queues map[string]chan *Job nbWorkers int workers []*Worker running uint32 @@ -29,7 +28,6 @@ func NewRedisBroker(nbWorkers int, client redis.UniversalClient) Broker { return &redisBroker{ client: client, nbWorkers: nbWorkers, - queues: make(map[string]chan *Job), closed: make(chan struct{}), } } @@ -46,23 +44,20 @@ func (b *redisBroker) Start(ws WorkersList) error { setNbSlots(b.nbWorkers) joblog.Infof("Starting redis broker with %d workers", b.nbWorkers) - var keys []string + b.workers = make([]*Worker, 0, len(ws)) for workerType, conf := range ws { ch := make(chan *Job) w := &Worker{ Type: workerType, Conf: conf, } - b.queues[workerType] = ch b.workers = append(b.workers, w) - keys = append(keys, redisPrefix+workerType) if err := w.Start(ch); err != nil { return err } + go b.pollLoop(redisPrefix+workerType, ch) } - go b.pollLoop(keys) - return nil } @@ -85,11 +80,13 @@ func (b *redisBroker) Shutdown(ctx context.Context) error { fmt.Print(" shutting down redis broker...") defer b.client.Close() - select { - case <-ctx.Done(): - fmt.Println("failed:", ctx.Err()) - return ctx.Err() - case <-b.closed: + for i := 0; i < len(b.workers); i++ { + select { + case <-ctx.Done(): + fmt.Println("failed:", ctx.Err()) + return ctx.Err() + case <-b.closed: + } } errs := make(chan error) @@ -114,7 +111,7 @@ func (b *redisBroker) Shutdown(ctx context.Context) error { var redisBRPopTimeout = 30 * time.Second -func (b *redisBroker) pollLoop(keys []string) { +func (b *redisBroker) pollLoop(key string, ch chan<- *Job) { defer func() { b.closed <- struct{}{} }() @@ -124,7 +121,7 @@ func (b *redisBroker) pollLoop(keys []string) { return } - results, err := b.client.BRPop(redisBRPopTimeout, keys...).Result() + results, err := b.client.BRPop(redisBRPopTimeout, key).Result() if err != nil || len(results) < 2 { time.Sleep(100 * time.Millisecond) continue @@ -143,13 +140,6 @@ func (b *redisBroker) pollLoop(keys []string) { continue } - workerType := key[len(redisPrefix):] - ch, ok := b.queues[workerType] - if !ok { - joblog.Warnf("Unknown workerType: %s", workerType) - continue - } - domain, jobID := parts[0], parts[1] job, err := Get(domain, jobID) if err != nil {