Skip to content

Commit

Permalink
Fix race in publisher/pipeline retry loop (#13922)
Browse files Browse the repository at this point in the history
* add WaitGroup to retry loop

* move workQueue close loop in controller

* tweak WG usage

* remove extra newlines
  • Loading branch information
fearful-symmetry authored Oct 10, 2019
1 parent 1c9cd3d commit 660daa1
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
5 changes: 2 additions & 3 deletions libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func newOutputController(

func (c *outputController) Close() error {
c.consumer.sigPause()
c.consumer.close()
c.retryer.close()

if c.out != nil {
for _, out := range c.out.outputs {
Expand All @@ -92,9 +94,6 @@ func (c *outputController) Close() error {
close(c.out.workQueue)
}

c.consumer.close()
c.retryer.close()

return nil
}

Expand Down
29 changes: 18 additions & 11 deletions libbeat/publisher/pipeline/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package pipeline

import (
"sync"

"github.com/elastic/beats/libbeat/logp"
)

Expand All @@ -36,9 +38,10 @@ type retryer struct {

consumer *eventConsumer

sig chan retryerSignal
out workQueue
in retryQueue
sig chan retryerSignal
out workQueue
in retryQueue
doneWaiter sync.WaitGroup
}

type retryQueue chan batchEvent
Expand Down Expand Up @@ -75,20 +78,24 @@ func newRetryer(
c *eventConsumer,
) *retryer {
r := &retryer{
logger: log,
observer: observer,
done: make(chan struct{}),
sig: make(chan retryerSignal, 3),
in: retryQueue(make(chan batchEvent, 3)),
out: out,
consumer: c,
logger: log,
observer: observer,
done: make(chan struct{}),
sig: make(chan retryerSignal, 3),
in: retryQueue(make(chan batchEvent, 3)),
out: out,
consumer: c,
doneWaiter: sync.WaitGroup{},
}
r.doneWaiter.Add(1)
go r.loop()
return r
}

func (r *retryer) close() {
close(r.done)
//Block until loop() is properly closed
r.doneWaiter.Wait()
}

func (r *retryer) sigOutputAdded() {
Expand All @@ -115,6 +122,7 @@ func (r *retryer) cancelled(b *Batch) {
}

func (r *retryer) loop() {
defer r.doneWaiter.Done()
var (
out workQueue
consumerBlocked bool
Expand All @@ -131,7 +139,6 @@ func (r *retryer) loop() {
select {
case <-r.done:
return

case evt := <-r.in:
var (
countFailed int
Expand Down

0 comments on commit 660daa1

Please sign in to comment.