Skip to content

Commit

Permalink
[libbeat] Cleaning up some unneeded helper types (#31290)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored Apr 14, 2022
1 parent f1ce912 commit 2f761cc
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 75 deletions.
18 changes: 7 additions & 11 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ import (
)

// client connects a beat with the processors and pipeline queue.
//
// TODO: All ackers currently drop any late incoming ACK. Some beats still might
// be interested in handling/waiting for event ACKs more globally
// -> add support for not dropping pending ACKs
type client struct {
pipeline *Pipeline
processors beat.Processor
Expand Down Expand Up @@ -101,8 +97,8 @@ func (c *client) publish(e beat.Event) {
event, err = c.processors.Run(event)
publish = event != nil
if err != nil {
// TODO: introduce dead-letter queue?

// If we introduce a dead-letter queue, this is where we should
// route the event to it.
log.Errorf("Failed to publish event: %v", err)
}
}
Expand All @@ -124,7 +120,7 @@ func (c *client) publish(e beat.Event) {
}

if c.reportEvents {
c.pipeline.waitCloser.inc()
c.pipeline.waitCloseGroup.Add(1)
}

var published bool
Expand All @@ -139,7 +135,7 @@ func (c *client) publish(e beat.Event) {
} else {
c.onDroppedOnPublish(e)
if c.reportEvents {
c.pipeline.waitCloser.dec(1)
c.pipeline.waitCloseGroup.Add(-1)
}
}
}
Expand Down Expand Up @@ -178,7 +174,7 @@ func (c *client) Close() error {
return nil
}

// unlink is the final step of closing a client. It cancells the connect of the
// unlink is the final step of closing a client. It cancels the connect of the
// client as producer to the queue.
func (c *client) unlink() {
log := c.logger()
Expand All @@ -189,7 +185,7 @@ func (c *client) unlink() {
if c.reportEvents {
log.Debugf("client: remove client events")
if n > 0 {
c.pipeline.waitCloser.dec(n)
c.pipeline.waitCloseGroup.Add(-n)
}
}

Expand Down Expand Up @@ -271,7 +267,7 @@ func (w *clientCloseWaiter) ACKEvents(n int) {
}

// The Close signal from the pipeline is ignored. Instead the client
// explicitely uses `signalClose` and `wait` before it continues with the
// explicitly uses `signalClose` and `wait` before it continues with the
// closing sequence.
func (w *clientCloseWaiter) Close() {}

Expand Down
82 changes: 18 additions & 64 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,10 @@ type Pipeline struct {

observer observer

eventer pipelineEventer

// wait close support
waitCloseMode WaitCloseMode
waitOnClose bool
waitCloseTimeout time.Duration
waitCloser *waitCloser
waitCloseGroup sync.WaitGroup

// closeRef signal propagation support
guardStartSigPropagation sync.Once
Expand Down Expand Up @@ -102,10 +100,6 @@ const (
// to ACK any outstanding events. This is independent of Clients asking for
// ACK and/or WaitClose. Clients can still optionally configure WaitClose themselves.
WaitOnPipelineClose

// WaitOnClientClose applies WaitClose timeout to each client connecting to
// the pipeline. Clients are still allowed to overwrite WaitClose with a timeout > 0s.
WaitOnClientClose
)

// OutputReloader interface, that can be queried from an active publisher pipeline.
Expand All @@ -117,16 +111,6 @@ type OutputReloader interface {
) error
}

type pipelineEventer struct {
observer queueObserver
waitClose *waitCloser
}

type waitCloser struct {
// keep track of total number of active events (minus dropped by processors)
events sync.WaitGroup
}

type queueFactory func(queue.ACKListener) (queue.Queue, error)

// New create a new Pipeline instance from a queue instance and a set of outputs.
Expand All @@ -149,24 +133,16 @@ func New(
beatInfo: beat,
monitors: monitors,
observer: nilObserver,
waitCloseMode: settings.WaitCloseMode,
waitOnClose: settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0,
waitCloseTimeout: settings.WaitClose,
processors: settings.Processors,
}

if monitors.Metrics != nil {
p.observer = newMetricsObserver(monitors.Metrics)
}
p.eventer.observer = p.observer

if settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0 {
p.waitCloser = &waitCloser{}

// waitCloser decrements counter on queue ACK (not per client)
p.eventer.waitClose = p.waitCloser
}

p.queue, err = queueFactory(&p.eventer)
p.queue, err = queueFactory(p)
if err != nil {
return nil, err
}
Expand All @@ -185,6 +161,16 @@ func New(
return p, nil
}

// OnACK implements the queue.ACKListener interface, so the queue can notify the
// Pipeline when events are acknowledged.
func (p *Pipeline) OnACK(n int) {
p.observer.queueACKed(n)

if p.waitOnClose {
p.waitCloseGroup.Add(-n)
}
}

// Close stops the pipeline, outputs and queue.
// If WaitClose with WaitOnPipelineClose mode is configured, Close will block
// for a duration of WaitClose, if there are still active events in the pipeline.
Expand All @@ -194,10 +180,10 @@ func (p *Pipeline) Close() error {

log.Debug("close pipeline")

if p.waitCloser != nil {
if p.waitOnClose {
ch := make(chan struct{})
go func() {
p.waitCloser.wait()
p.waitCloseGroup.Wait()
ch <- struct{}{}
}()

Expand All @@ -208,7 +194,6 @@ func (p *Pipeline) Close() error {
case <-time.After(p.waitCloseTimeout):
// timeout -> close pipeline with pending events
}

}

// Note: active clients are not closed / disconnected.
Expand Down Expand Up @@ -258,16 +243,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
}

waitClose := cfg.WaitClose
reportEvents := p.waitCloser != nil

switch p.waitCloseMode {
case NoWaitOnClose:

case WaitOnClientClose:
if waitClose <= 0 {
waitClose = p.waitCloseTimeout
}
}
reportEvents := p.waitOnClose

processors, err := p.createEventProcessing(cfg.Processing, publishDisabled)
if err != nil {
Expand Down Expand Up @@ -296,7 +272,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
cfg.Events.DroppedOnPublish(event)
}
if reportEvents {
p.waitCloser.dec(1)
p.waitCloseGroup.Add(-1)
}
}
}
Expand Down Expand Up @@ -420,28 +396,6 @@ func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bo
return p.processors.Create(cfg, noPublish)
}

func (e *pipelineEventer) OnACK(n int) {
e.observer.queueACKed(n)

if wc := e.waitClose; wc != nil {
wc.dec(n)
}
}

func (e *waitCloser) inc() {
e.events.Add(1)
}

func (e *waitCloser) dec(n int) {
for i := 0; i < n; i++ {
e.events.Done()
}
}

func (e *waitCloser) wait() {
e.events.Wait()
}

// OutputReloader returns a reloadable object for the output section of this pipeline
func (p *Pipeline) OutputReloader() OutputReloader {
return p.output
Expand Down

0 comments on commit 2f761cc

Please sign in to comment.