From 660daa1bb82ab0d636d821c4085310a6b5cf7ab0 Mon Sep 17 00:00:00 2001 From: Alex K <8418476+fearful-symmetry@users.noreply.github.com> Date: Thu, 10 Oct 2019 10:53:11 -0700 Subject: [PATCH] Fix race in publisher/pipeline retry loop (#13922) * add WaitGroup to retry loop * move workQueue close loop in controller * tweak WG usage * remove extra newlines --- libbeat/publisher/pipeline/controller.go | 5 ++-- libbeat/publisher/pipeline/retry.go | 29 +++++++++++++++--------- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index cbd1b5205374..885d0cd02941 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -84,6 +84,8 @@ func newOutputController( func (c *outputController) Close() error { c.consumer.sigPause() + c.consumer.close() + c.retryer.close() if c.out != nil { for _, out := range c.out.outputs { @@ -92,9 +94,6 @@ func (c *outputController) Close() error { close(c.out.workQueue) } - c.consumer.close() - c.retryer.close() - return nil } diff --git a/libbeat/publisher/pipeline/retry.go b/libbeat/publisher/pipeline/retry.go index 250713b03a7f..9dd3385c40ee 100644 --- a/libbeat/publisher/pipeline/retry.go +++ b/libbeat/publisher/pipeline/retry.go @@ -18,6 +18,8 @@ package pipeline import ( + "sync" + "github.com/elastic/beats/libbeat/logp" ) @@ -36,9 +38,10 @@ type retryer struct { consumer *eventConsumer - sig chan retryerSignal - out workQueue - in retryQueue + sig chan retryerSignal + out workQueue + in retryQueue + doneWaiter sync.WaitGroup } type retryQueue chan batchEvent @@ -75,20 +78,24 @@ func newRetryer( c *eventConsumer, ) *retryer { r := &retryer{ - logger: log, - observer: observer, - done: make(chan struct{}), - sig: make(chan retryerSignal, 3), - in: retryQueue(make(chan batchEvent, 3)), - out: out, - consumer: c, + logger: log, + observer: observer, + done: make(chan struct{}), + sig: make(chan retryerSignal, 3), + in: retryQueue(make(chan batchEvent, 3)), + out: out, + consumer: c, + doneWaiter: sync.WaitGroup{}, } + r.doneWaiter.Add(1) go r.loop() return r } func (r *retryer) close() { close(r.done) + //Block until loop() is properly closed + r.doneWaiter.Wait() } func (r *retryer) sigOutputAdded() { @@ -115,6 +122,7 @@ func (r *retryer) cancelled(b *Batch) { } func (r *retryer) loop() { + defer r.doneWaiter.Done() var ( out workQueue consumerBlocked bool @@ -131,7 +139,6 @@ func (r *retryer) loop() { select { case <-r.done: return - case evt := <-r.in: var ( countFailed int