diff --git a/libbeat/outputs/outest/batch.go b/libbeat/outputs/outest/batch.go index 11d4bf1a266..4aeab9cbb5f 100644 --- a/libbeat/outputs/outest/batch.go +++ b/libbeat/outputs/outest/batch.go @@ -41,7 +41,6 @@ const ( BatchRetry BatchRetryEvents BatchCancelled - BatchCancelledEvents ) func NewBatch(in ...beat.Event) *Batch { @@ -76,10 +75,6 @@ func (b *Batch) Cancelled() { b.doSignal(BatchSignal{Tag: BatchCancelled}) } -func (b *Batch) CancelledEvents(events []publisher.Event) { - b.doSignal(BatchSignal{Tag: BatchCancelledEvents, Events: events}) -} - func (b *Batch) doSignal(sig BatchSignal) { b.Signals = append(b.Signals, sig) if b.OnSignal != nil { diff --git a/libbeat/publisher/event.go b/libbeat/publisher/event.go index 29625319e7c..e127d012d87 100644 --- a/libbeat/publisher/event.go +++ b/libbeat/publisher/event.go @@ -24,17 +24,25 @@ import ( // Batch is used to pass a batch of events to the outputs and asynchronously listening // for signals from these outpts. After a batch is processed (completed or -// errors), one of the signal methods must be called. +// errors), one of the signal methods must be called. In normal operation +// every batch will eventually receive an ACK() or a Drop(). type Batch interface { Events() []Event - // signals + // All events have been acknowledged by the output. ACK() + + // Give up on these events permanently without sending. Drop() + + // Try sending this batch again Retry() + + // Try sending the events in this list again; all others are acknowledged. RetryEvents(events []Event) + + // Send was aborted, try again but don't decrease the batch's TTL counter. Cancelled() - CancelledEvents(events []Event) } // Event is used by the publisher pipeline and broker to pass additional diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index edb5a3f1eb3..29d243257d1 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -201,7 +201,6 @@ func (c *client) logger() *logp.Logger { } func (c *client) onClosing() { - c.pipeline.observer.clientClosing() if c.eventer != nil { c.eventer.Closing() } diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/client_worker.go similarity index 93% rename from libbeat/publisher/pipeline/output.go rename to libbeat/publisher/pipeline/client_worker.go index 00c3fc54281..008d774303d 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/client_worker.go @@ -29,9 +29,8 @@ import ( ) type worker struct { - id uint observer outputObserver - qu workQueue + qu chan publisher.Batch done chan struct{} } @@ -46,14 +45,12 @@ type netClientWorker struct { worker client outputs.NetworkClient - batchSize int - batchSizer func() int - logger logger + logger logger tracer *apm.Tracer } -func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client, logger logger, tracer *apm.Tracer) outputWorker { +func makeClientWorker(observer outputObserver, qu chan publisher.Batch, client outputs.Client, logger logger, tracer *apm.Tracer) outputWorker { w := worker{ observer: observer, qu: qu, @@ -102,7 +99,6 @@ func (w *clientWorker) run() { if batch == nil { continue } - w.observer.outBatchSend(len(batch.Events())) if err := w.client.Publish(context.TODO(), batch); err != nil { return } diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/client_worker_test.go similarity index 91% rename from libbeat/publisher/pipeline/output_test.go rename to libbeat/publisher/pipeline/client_worker_test.go index 7bde6e137ab..921847ab0be 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/client_worker_test.go @@ -52,8 +52,8 @@ func TestMakeClientWorker(t *testing.T) { logger := makeBufLogger(t) - wqu := makeWorkQueue() - retryer := newRetryer(logger, nilObserver, wqu, nil) + workQueue := make(chan publisher.Batch) + retryer := newStandaloneRetryer(workQueue) defer retryer.close() var published atomic.Uint @@ -64,13 +64,13 @@ func TestMakeClientWorker(t *testing.T) { client := ctor(publishFn) - worker := makeClientWorker(nilObserver, wqu, client, logger, nil) + worker := makeClientWorker(nilObserver, workQueue, client, logger, nil) defer worker.Close() for i := uint(0); i < numBatches; i++ { batch := randomBatch(50, 150).withRetryer(retryer) numEvents += uint(len(batch.Events())) - wqu <- batch + workQueue <- batch } // Give some time for events to be published @@ -114,14 +114,16 @@ func TestReplaceClientWorker(t *testing.T) { logger := makeBufLogger(t) - wqu := makeWorkQueue() - retryer := newRetryer(logger, nilObserver, wqu, nil) + workQueue := make(chan publisher.Batch) + retryer := newStandaloneRetryer(workQueue) defer retryer.close() var batches []publisher.Batch var numEvents int for i := uint(0); i < numBatches; i++ { - batch := randomBatch(minEventsInBatch, maxEventsInBatch).withRetryer(retryer) + batch := randomBatch( + minEventsInBatch, maxEventsInBatch, + ).withRetryer(retryer) batch.events[0].Content.Private = i numEvents += batch.Len() batches = append(batches, batch) @@ -133,7 +135,7 @@ func TestReplaceClientWorker(t *testing.T) { defer wg.Done() for _, batch := range batches { t.Logf("publish batch: %v", batch.(*mockBatch).events[0].Content.Private) - wqu <- batch + workQueue <- batch } }() @@ -156,7 +158,7 @@ func TestReplaceClientWorker(t *testing.T) { } client := ctor(blockingPublishFn) - worker := makeClientWorker(nilObserver, wqu, client, logger, nil) + worker := makeClientWorker(nilObserver, workQueue, client, logger, nil) // Allow the worker to make *some* progress before we close it timeout := 10 * time.Second @@ -183,7 +185,7 @@ func TestReplaceClientWorker(t *testing.T) { } client = ctor(countingPublishFn) - makeClientWorker(nilObserver, wqu, client, logger, nil) + makeClientWorker(nilObserver, workQueue, client, logger, nil) wg.Wait() // Make sure that all events have eventually been published @@ -214,8 +216,8 @@ func TestMakeClientTracer(t *testing.T) { logger := makeBufLogger(t) - wqu := makeWorkQueue() - retryer := newRetryer(logger, nilObserver, wqu, nil) + workQueue := make(chan publisher.Batch) + retryer := newStandaloneRetryer(workQueue) defer retryer.close() var published atomic.Uint @@ -229,13 +231,13 @@ func TestMakeClientTracer(t *testing.T) { recorder := apmtest.NewRecordingTracer() defer recorder.Close() - worker := makeClientWorker(nilObserver, wqu, client, logger, recorder.Tracer) + worker := makeClientWorker(nilObserver, workQueue, client, logger, recorder.Tracer) defer worker.Close() for i := 0; i < numBatches; i++ { batch := randomBatch(10, 15).withRetryer(retryer) numEvents += uint(len(batch.Events())) - wqu <- batch + workQueue <- batch } // Give some time for events to be published diff --git a/libbeat/publisher/pipeline/consumer.go b/libbeat/publisher/pipeline/consumer.go index d8f4288b011..242cb62409b 100644 --- a/libbeat/publisher/pipeline/consumer.go +++ b/libbeat/publisher/pipeline/consumer.go @@ -18,206 +18,221 @@ package pipeline import ( - "errors" "sync" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) // eventConsumer collects and forwards events from the queue to the outputs work queue. -// The eventConsumer is managed by the controller and receives additional pause signals -// from the retryer in case of too many events failing to be send or if retryer -// is receiving cancelled batches from outputs to be closed on output reloading. +// It accepts retry requests from batches it vends, which will resend them +// to the next available output. type eventConsumer struct { logger *logp.Logger - ctx *batchContext - pause atomic.Bool - wait atomic.Bool - sig chan consumerSignal - wg sync.WaitGroup + // eventConsumer calls the observer methods eventsRetry and eventsDropped. + observer outputObserver - queue queue.Queue - consumer queue.Consumer + // When the output changes, the new target is sent to the worker routine + // on this channel. Clients should call eventConsumer.setTarget(). + targetChan chan consumerTarget - out *outputGroup -} + // Failed batches are sent to this channel to retry. Clients should call + // eventConsumer.retry(). + retryChan chan retryRequest -type consumerSignal struct { - tag consumerEventTag - consumer queue.Consumer - out *outputGroup -} + // Closing this channel signals consumer shutdown. Clients should call + // eventConsumer.close(). + done chan struct{} -type consumerEventTag uint8 + // This waitgroup is released when this eventConsumer's worker + // goroutines return. + wg sync.WaitGroup -const ( - sigConsumerCheck consumerEventTag = iota - sigConsumerUpdateOutput - sigConsumerUpdateInput - sigStop -) + // The queue the eventConsumer will retrieve batches from. + queue queue.Queue +} -var errStopped = errors.New("stopped") +// consumerTarget specifies the output channel and parameters needed for +// eventConsumer to generate a batch. +type consumerTarget struct { + ch chan publisher.Batch + timeToLive int + batchSize int +} + +// retryRequest is used by ttlBatch to add itself back to the eventConsumer +// queue for distribution to an output. +type retryRequest struct { + batch *ttlBatch + decreaseTTL bool +} func newEventConsumer( log *logp.Logger, queue queue.Queue, - ctx *batchContext, + observer outputObserver, ) *eventConsumer { - consumer := queue.Consumer() c := &eventConsumer{ - logger: log, - sig: make(chan consumerSignal, 3), - out: nil, - + logger: log, + observer: observer, queue: queue, - consumer: consumer, - ctx: ctx, - } - c.pause.Store(true) + targetChan: make(chan consumerTarget), + retryChan: make(chan retryRequest), + done: make(chan struct{}), + } c.wg.Add(1) go func() { defer c.wg.Done() - c.loop(consumer) + c.run() }() return c } -func (c *eventConsumer) close() { - c.consumer.Close() - c.sig <- consumerSignal{tag: sigStop} - c.wg.Wait() -} +func (c *eventConsumer) run() { + log := c.logger -func (c *eventConsumer) sigWait() { - c.wait.Store(true) - c.sigHint() -} + log.Debug("start pipeline event consumer") -func (c *eventConsumer) sigUnWait() { - c.wait.Store(false) - c.sigHint() -} + // Create a queueReader to run our queue fetches in the background + c.wg.Add(1) + queueReader := makeQueueReader() + go func() { + defer c.wg.Done() + queueReader.run(log) + }() -func (c *eventConsumer) sigPause() { - c.pause.Store(true) - c.sigHint() -} + var ( + // Whether there's an outstanding request to queueReader + pendingRead bool -func (c *eventConsumer) sigContinue() { - c.pause.Store(false) - c.sigHint() -} + // The batches waiting to be retried. + retryBatches []*ttlBatch -func (c *eventConsumer) sigHint() { - // send signal to unblock a consumer trying to publish events. - // With flags being set atomically, multiple signals can be compressed into one - // signal -> drop if queue is not empty - select { - case c.sig <- consumerSignal{tag: sigConsumerCheck}: - default: - } -} + // The batch read from the queue and waiting to be sent, if any + queueBatch *ttlBatch -func (c *eventConsumer) updOutput(grp *outputGroup) { - // close consumer to break consumer worker from pipeline - c.consumer.Close() + // The output channel (and associated parameters) that will receive + // the batches we're loading. + target consumerTarget - // update output - c.sig <- consumerSignal{ - tag: sigConsumerUpdateOutput, - out: grp, - } + // The queue.Consumer we get the raw batches from. Reset whenever + // the target changes. + consumer queue.Consumer = c.queue.Consumer() + ) - // update eventConsumer with new queue connection - c.consumer = c.queue.Consumer() - c.sig <- consumerSignal{ - tag: sigConsumerUpdateInput, - consumer: c.consumer, - } -} +outerLoop: + for { + // If possible, start reading the next batch in the background. + if queueBatch == nil && !pendingRead { + pendingRead = true + queueReader.req <- queueReaderRequest{ + consumer: consumer, + retryer: c, + batchSize: target.batchSize, + timeToLive: target.timeToLive, + } + } -func (c *eventConsumer) loop(consumer queue.Consumer) { - log := c.logger + var active *ttlBatch + // Choose the active batch: if we have batches to retry, use the first + // one. Otherwise, use a new batch if we have one. + if len(retryBatches) > 0 { + active = retryBatches[0] + } else if queueBatch != nil { + active = queueBatch + } - log.Debug("start pipeline event consumer") + // If we have a batch, we'll point the output channel at the target + // and try to send to it. Otherwise, it will remain nil, and sends + // to it will always block, so the output case of the select below + // will be ignored. + var outputChan chan publisher.Batch + if active != nil { + outputChan = target.ch + } - var ( - out workQueue - batch Batch - paused = true - ) + // Now we can block until the next state change. + select { + case outputChan <- active: + // Successfully sent a batch to the output workers + if len(retryBatches) > 0 { + // This was a retry, report it to the observer + c.observer.eventsRetry(len(active.Events())) + retryBatches = retryBatches[1:] + } else { + // This was directly from the queue, clear the value so we can + // fetch a new one + queueBatch = nil + } - handleSignal := func(sig consumerSignal) error { - switch sig.tag { - case sigStop: - return errStopped + case target = <-c.targetChan: - case sigConsumerCheck: + case queueBatch = <-queueReader.resp: + pendingRead = false - case sigConsumerUpdateOutput: - c.out = sig.out + case req := <-c.retryChan: + alive := true + if req.decreaseTTL { + countFailed := len(req.batch.Events()) - case sigConsumerUpdateInput: - consumer = sig.consumer - } + alive = req.batch.reduceTTL() - paused = c.paused() - if c.out != nil && batch != nil { - out = c.out.workQueue - } else { - out = nil - } - return nil - } + countDropped := countFailed - len(req.batch.Events()) + c.observer.eventsDropped(countDropped) - for { - if !paused && c.out != nil && consumer != nil && batch == nil { - out = c.out.workQueue - queueBatch, err := consumer.Get(c.out.batchSize) - if err != nil { - out = nil - consumer = nil - continue - } - if queueBatch != nil { - batch = newBatch(c.ctx, queueBatch, c.out.timeToLive) + if !alive { + log.Info("Drop batch") + req.batch.Drop() + continue + } } + retryBatches = append(retryBatches, req.batch) - paused = c.paused() - if paused || batch == nil { - out = nil - } + case <-c.done: + break outerLoop } + } - select { - case sig := <-c.sig: - if err := handleSignal(sig); err != nil { - return - } - continue - default: - } + // Close the queue.Consumer, otherwise queueReader can get blocked + // waiting on a read. + consumer.Close() - select { - case sig := <-c.sig: - if err := handleSignal(sig); err != nil { - return - } - case out <- batch: - batch = nil - if paused { - out = nil - } + // Close the queueReader request channel so it knows to shutdown. + close(queueReader.req) + + // If there's an outstanding request, we need to read the response + // to unblock it, but we won't pass on the value. + if pendingRead { + batch := <-queueReader.resp + if batch != nil { + // Inform any listeners that we couldn't deliver this batch. + batch.Drop() } } } -func (c *eventConsumer) paused() bool { - return c.pause.Load() || c.wait.Load() +func (c *eventConsumer) setTarget(target consumerTarget) { + select { + case c.targetChan <- target: + case <-c.done: + } +} + +func (c *eventConsumer) retry(batch *ttlBatch, decreaseTTL bool) { + select { + case c.retryChan <- retryRequest{batch: batch, decreaseTTL: decreaseTTL}: + // The batch is back in eventConsumer's retry queue + case <-c.done: + // The consumer has already shut down, drop the batch + batch.Drop() + } +} + +func (c *eventConsumer) close() { + close(c.done) + c.wg.Wait() } diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 2a15ea86e16..3a7089e0f81 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -36,25 +36,24 @@ type outputController struct { monitors Monitors observer outputObserver - queue queue.Queue - workQueue workQueue + workQueue chan publisher.Batch - retryer *retryer consumer *eventConsumer out *outputGroup } // outputGroup configures a group of load balanced outputs with shared work queue. type outputGroup struct { - workQueue workQueue + // workQueue is a channel that receives event batches that + // are ready to send. Each output worker in outputs reads from + // workQueue for events to send. + workQueue chan publisher.Batch outputs []outputWorker batchSize int timeToLive int // event lifetime } -type workQueue chan publisher.Batch - // outputWorker instances pass events from the shared workQueue to the outputs.Client // instances. type outputWorker interface { @@ -67,29 +66,17 @@ func newOutputController( observer outputObserver, queue queue.Queue, ) *outputController { - c := &outputController{ + return &outputController{ beat: beat, monitors: monitors, observer: observer, - queue: queue, - workQueue: makeWorkQueue(), + workQueue: make(chan publisher.Batch), + consumer: newEventConsumer(monitors.Logger, queue, observer), } - - ctx := &batchContext{} - c.consumer = newEventConsumer(monitors.Logger, queue, ctx) - c.retryer = newRetryer(monitors.Logger, observer, c.workQueue, c.consumer) - ctx.observer = observer - ctx.retryer = c.retryer - - c.consumer.sigContinue() - - return c } func (c *outputController) Close() error { - c.consumer.sigPause() c.consumer.close() - c.retryer.close() close(c.workQueue) if c.out != nil { @@ -97,11 +84,21 @@ func (c *outputController) Close() error { out.Close() } } - return nil } func (c *outputController) Set(outGrp outputs.Group) { + // Set consumer to empty target to pause it while we reload + c.consumer.setTarget(consumerTarget{}) + + // Close old outputWorkers, so they send their remaining events + // back to eventConsumer's retry channel + if c.out != nil { + for _, w := range c.out.outputs { + w.Close() + } + } + // create new output group with the shared work queue clients := outGrp.Clients worker := make([]outputWorker, len(clients)) @@ -116,35 +113,15 @@ func (c *outputController) Set(outGrp outputs.Group) { batchSize: outGrp.BatchSize, } - // update consumer and retryer - c.consumer.sigPause() - if c.out != nil { - for range c.out.outputs { - c.retryer.sigOutputRemoved() - } - } - for range clients { - c.retryer.sigOutputAdded() - } - c.consumer.updOutput(grp) - - // close old group, so events are send to new workQueue via retryer - if c.out != nil { - for _, w := range c.out.outputs { - w.Close() - } - } - c.out = grp - // restart consumer (potentially blocked by retryer) - c.consumer.sigContinue() - - c.observer.updateOutputGroup() -} - -func makeWorkQueue() workQueue { - return workQueue(make(chan publisher.Batch, 0)) + // Resume consumer targeting the new work queue + c.consumer.setTarget( + consumerTarget{ + ch: c.workQueue, + batchSize: grp.batchSize, + timeToLive: grp.timeToLive, + }) } // Reload the output diff --git a/libbeat/publisher/pipeline/monitoring.go b/libbeat/publisher/pipeline/monitoring.go index edcfb75a535..754ea030855 100644 --- a/libbeat/publisher/pipeline/monitoring.go +++ b/libbeat/publisher/pipeline/monitoring.go @@ -30,7 +30,6 @@ type observer interface { type pipelineObserver interface { clientConnected() - clientClosing() clientClosed() } @@ -47,12 +46,8 @@ type queueObserver interface { } type outputObserver interface { - updateOutputGroup() - eventsFailed(int) eventsDropped(int) eventsRetry(int) - outBatchSend(int) - outBatchACKed(int) } // metricsObserver is used by many component in the publisher pipeline, to report @@ -119,9 +114,6 @@ func (o *metricsObserver) cleanup() { // (pipeline) pipeline did finish creating a new client instance func (o *metricsObserver) clientConnected() { o.vars.clients.Inc() } -// (client) close being called on client -func (o *metricsObserver) clientClosing() {} - // (client) client finished processing close func (o *metricsObserver) clientClosed() { o.vars.clients.Dec() } @@ -171,12 +163,6 @@ func (o *metricsObserver) queueMaxEvents(n int) { // pipeline output events // -// (controller) new output group is about to be loaded -func (o *metricsObserver) updateOutputGroup() {} - -// (retryer) new failed batch has been received -func (o *metricsObserver) eventsFailed(int) {} - // (retryer) number of events dropped by retryer func (o *metricsObserver) eventsDropped(n int) { o.vars.dropped.Add(uint64(n)) @@ -187,19 +173,12 @@ func (o *metricsObserver) eventsRetry(n int) { o.vars.retry.Add(uint64(n)) } -// (output) number of events to be forwarded to the output client -func (o *metricsObserver) outBatchSend(int) {} - -// (output) number of events acked by the output batch -func (o *metricsObserver) outBatchACKed(int) {} - type emptyObserver struct{} var nilObserver observer = (*emptyObserver)(nil) func (*emptyObserver) cleanup() {} func (*emptyObserver) clientConnected() {} -func (*emptyObserver) clientClosing() {} func (*emptyObserver) clientClosed() {} func (*emptyObserver) newEvent() {} func (*emptyObserver) filteredEvent() {} @@ -207,9 +186,5 @@ func (*emptyObserver) publishedEvent() {} func (*emptyObserver) failedPublishEvent() {} func (*emptyObserver) queueACKed(n int) {} func (*emptyObserver) queueMaxEvents(int) {} -func (*emptyObserver) updateOutputGroup() {} -func (*emptyObserver) eventsFailed(int) {} func (*emptyObserver) eventsDropped(int) {} func (*emptyObserver) eventsRetry(int) {} -func (*emptyObserver) outBatchSend(int) {} -func (*emptyObserver) outBatchACKed(int) {} diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index be3a5e66362..c7e7f0b47fb 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -70,9 +70,6 @@ type Pipeline struct { waitCloseTimeout time.Duration waitCloser *waitCloser - // pipeline ack - eventSema *sema - // closeRef signal propagation support guardStartSigPropagation sync.Once sigNewClient chan *client @@ -185,7 +182,6 @@ func New( maxEvents = 64000 } p.observer.queueMaxEvents(maxEvents) - p.eventSema = newSema(maxEvents) p.output = newOutputController(beat, monitors, p.observer, p.queue) p.output.Set(out) diff --git a/libbeat/publisher/pipeline/queue_reader.go b/libbeat/publisher/pipeline/queue_reader.go new file mode 100644 index 00000000000..10f724ce5b5 --- /dev/null +++ b/libbeat/publisher/pipeline/queue_reader.go @@ -0,0 +1,63 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pipeline + +import ( + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/publisher/queue" +) + +// queueReader is a standalone stateless helper goroutine to dispatch +// reads of the queue without blocking eventConsumer's main loop. +type queueReader struct { + req chan queueReaderRequest // "give me a batch for this target" + resp chan *ttlBatch // "here is your batch, or nil" +} + +type queueReaderRequest struct { + consumer queue.Consumer + retryer retryer + batchSize int + timeToLive int +} + +func makeQueueReader() queueReader { + qr := queueReader{ + req: make(chan queueReaderRequest, 1), + resp: make(chan *ttlBatch), + } + return qr +} + +func (qr *queueReader) run(logger *logp.Logger) { + logger.Debug("pipeline event consumer queue reader: start") + for { + req, ok := <-qr.req + if !ok { + // The request channel is closed, we're shutting down + logger.Debug("pipeline event consumer queue reader: stop") + return + } + queueBatch, _ := req.consumer.Get(req.batchSize) + var batch *ttlBatch + if queueBatch != nil { + batch = newBatch(req.retryer, queueBatch, req.timeToLive) + } + qr.resp <- batch + } +} diff --git a/libbeat/publisher/pipeline/retry.go b/libbeat/publisher/pipeline/retry.go deleted file mode 100644 index 77f439f2fad..00000000000 --- a/libbeat/publisher/pipeline/retry.go +++ /dev/null @@ -1,231 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package pipeline - -import ( - "sync" -) - -// retryer is responsible for accepting and managing failed send attempts. It -// will also accept not yet published events from outputs being dynamically closed -// by the controller. Cancelled batches will be forwarded to the new workQueue, -// without updating the events retry counters. -// If too many batches (number of outputs/3) are stored in the retry buffer, -// will the consumer be paused, until some batches have been processed by some -// outputs. -type retryer struct { - logger logger - observer outputObserver - - done chan struct{} - - consumer interruptor - - sig chan retryerSignal - out workQueue - in retryQueue - doneWaiter sync.WaitGroup -} - -type interruptor interface { - sigWait() - sigUnWait() -} - -type retryQueue chan batchEvent - -type retryerSignal struct { - tag retryerEventTag - channel workQueue -} - -type batchEvent struct { - tag retryerBatchTag - batch Batch -} - -type retryerEventTag uint8 - -const ( - sigRetryerOutputAdded retryerEventTag = iota - sigRetryerOutputRemoved - sigRetryerUpdateOutput -) - -type retryerBatchTag uint8 - -const ( - retryBatch retryerBatchTag = iota - cancelledBatch -) - -func newRetryer( - log logger, - observer outputObserver, - out workQueue, - c interruptor, -) *retryer { - r := &retryer{ - logger: log, - observer: observer, - done: make(chan struct{}), - sig: make(chan retryerSignal, 3), - in: retryQueue(make(chan batchEvent, 3)), - out: out, - consumer: c, - doneWaiter: sync.WaitGroup{}, - } - r.doneWaiter.Add(1) - go r.loop() - return r -} - -func (r *retryer) close() { - close(r.done) - //Block until loop() is properly closed - r.doneWaiter.Wait() -} - -func (r *retryer) sigOutputAdded() { - r.sig <- retryerSignal{tag: sigRetryerOutputAdded} -} - -func (r *retryer) sigOutputRemoved() { - r.sig <- retryerSignal{tag: sigRetryerOutputRemoved} -} - -func (r *retryer) retry(b Batch) { - r.in <- batchEvent{tag: retryBatch, batch: b} -} - -func (r *retryer) cancelled(b Batch) { - r.in <- batchEvent{tag: cancelledBatch, batch: b} -} - -func (r *retryer) loop() { - defer r.doneWaiter.Done() - var ( - out workQueue - consumerBlocked bool - - active Batch - activeSize int - buffer []Batch - numOutputs int - - log = r.logger - ) - - for { - select { - case <-r.done: - return - case evt := <-r.in: - var ( - countFailed int - countDropped int - batch = evt.batch - countRetry = len(batch.Events()) - alive = true - ) - - if evt.tag == retryBatch { - countFailed = len(batch.Events()) - r.observer.eventsFailed(countFailed) - - alive = batch.reduceTTL() - - countRetry = len(batch.Events()) - countDropped = countFailed - countRetry - r.observer.eventsDropped(countDropped) - } - - if !alive { - log.Info("Drop batch") - batch.Drop() - } else { - out = r.out - buffer = append(buffer, batch) - out = r.out - active = buffer[0] - activeSize = len(active.Events()) - if !consumerBlocked { - consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer)) - } - } - - case out <- active: - r.observer.eventsRetry(activeSize) - - buffer = buffer[1:] - active, activeSize = nil, 0 - - if len(buffer) == 0 { - out = nil - } else { - active = buffer[0] - activeSize = len(active.Events()) - } - - if consumerBlocked { - consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer)) - } - - case sig := <-r.sig: - switch sig.tag { - case sigRetryerOutputAdded: - numOutputs++ - if consumerBlocked { - consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer)) - } - case sigRetryerOutputRemoved: - numOutputs-- - if !consumerBlocked { - consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer)) - } - } - } - } -} - -func (r *retryer) checkConsumerBlock(numOutputs, numBatches int) bool { - consumerBlocked := blockConsumer(numOutputs, numBatches) - if r.consumer == nil { - return consumerBlocked - } - - if consumerBlocked { - r.logger.Info("retryer: send wait signal to consumer") - if r.consumer != nil { - r.consumer.sigWait() - } - r.logger.Info(" done") - } else { - r.logger.Info("retryer: send unwait signal to consumer") - if r.consumer != nil { - r.consumer.sigUnWait() - } - r.logger.Info(" done") - } - - return consumerBlocked -} - -func blockConsumer(numOutputs, numBatches int) bool { - return numBatches/3 >= numOutputs -} diff --git a/libbeat/publisher/pipeline/testing.go b/libbeat/publisher/pipeline/testing.go index 431b2a6e39e..75250ece0f1 100644 --- a/libbeat/publisher/pipeline/testing.go +++ b/libbeat/publisher/pipeline/testing.go @@ -25,7 +25,6 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" - "github.com/elastic/beats/v7/libbeat/publisher/queue" ) type mockPublishFn func(publisher.Batch) error @@ -54,24 +53,6 @@ type mockNetworkClient struct { func (c *mockNetworkClient) Connect() error { return nil } -type mockQueue struct{} - -func (q mockQueue) Close() error { return nil } -func (q mockQueue) BufferConfig() queue.BufferConfig { return queue.BufferConfig{} } -func (q mockQueue) Producer(cfg queue.ProducerConfig) queue.Producer { return mockProducer{} } -func (q mockQueue) Consumer() queue.Consumer { return mockConsumer{} } - -type mockProducer struct{} - -func (p mockProducer) Publish(event publisher.Event) bool { return true } -func (p mockProducer) TryPublish(event publisher.Event) bool { return true } -func (p mockProducer) Cancel() int { return 0 } - -type mockConsumer struct{} - -func (c mockConsumer) Get(eventCount int) (queue.Batch, error) { return &batch{}, nil } -func (c mockConsumer) Close() error { return nil } - type mockBatch struct { mu sync.Mutex events []publisher.Event @@ -81,7 +62,6 @@ type mockBatch struct { onDrop func() onRetry func() onCancelled func() - onReduceTTL func() bool } func (b *mockBatch) Events() []publisher.Event { @@ -95,23 +75,12 @@ func (b *mockBatch) ACK() { signalFn(b.onACK) } func (b *mockBatch) Drop() { signalFn(b.onDrop) } func (b *mockBatch) Retry() { signalFn(b.onRetry) } func (b *mockBatch) Cancelled() { signalFn(b.onCancelled) } + func (b *mockBatch) RetryEvents(events []publisher.Event) { b.updateEvents(events) signalFn(b.onRetry) } -func (b *mockBatch) reduceTTL() bool { - if b.onReduceTTL != nil { - return b.onReduceTTL() - } - return true -} - -func (b *mockBatch) CancelledEvents(events []publisher.Event) { - b.updateEvents(events) - signalFn(b.onCancelled) -} - func (b *mockBatch) updateEvents(events []publisher.Event) { b.mu.Lock() defer b.mu.Unlock() @@ -124,17 +93,63 @@ func (b *mockBatch) Len() int { return len(b.events) } -func (b *mockBatch) withRetryer(r *retryer) *mockBatch { - return &mockBatch{ - events: b.events, - onACK: b.onACK, - onDrop: b.onDrop, - onRetry: func() { r.retry(b) }, - onCancelled: func() { r.cancelled(b) }, - onReduceTTL: b.onReduceTTL, +func (b *mockBatch) withRetryer(r standaloneRetryer) *mockBatch { + wrapper := &mockBatch{ + events: b.events, + onACK: b.onACK, + onDrop: b.onDrop, + } + wrapper.onRetry = func() { r.retryChan <- wrapper } + wrapper.onCancelled = func() { r.retryChan <- wrapper } + return wrapper +} + +// standaloneRetryer is a helper that can be used to simulate retry +// behavior when unit testing outputWorker without a full pipeline. (In +// a live pipeline the retry calls are handled by the eventConsumer). +type standaloneRetryer struct { + workQueue chan publisher.Batch + retryChan chan publisher.Batch + done chan struct{} +} + +func newStandaloneRetryer(workQueue chan publisher.Batch) standaloneRetryer { + sr := standaloneRetryer{ + workQueue: workQueue, + retryChan: make(chan publisher.Batch), + done: make(chan struct{}), + } + go sr.run() + return sr +} + +func (sr standaloneRetryer) run() { + var batches []publisher.Batch + for { + var active publisher.Batch + var outChan chan publisher.Batch + // If we have a batch to send, set the batch and output channel. + // Otherwise they'll be nil, and the select statement below will + // ignore them. + if len(batches) > 0 { + active = batches[0] + outChan = sr.workQueue + } + select { + case batch := <-sr.retryChan: + batches = append(batches, batch) + case outChan <- active: + batches = batches[1:] + case <-sr.done: + return + } } } +func (sr standaloneRetryer) close() { + close(sr.done) +} + func signalFn(fn func()) { if fn != nil { fn() diff --git a/libbeat/publisher/pipeline/batch.go b/libbeat/publisher/pipeline/ttl_batch.go similarity index 63% rename from libbeat/publisher/pipeline/batch.go rename to libbeat/publisher/pipeline/ttl_batch.go index 54ba2058d74..4b3bdb6489b 100644 --- a/libbeat/publisher/pipeline/batch.go +++ b/libbeat/publisher/pipeline/ttl_batch.go @@ -24,99 +24,81 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher/queue" ) -type Batch interface { - publisher.Batch - - reduceTTL() bool +type retryer interface { + retry(batch *ttlBatch, decreaseTTL bool) } -type batch struct { +type ttlBatch struct { original queue.Batch - ctx *batchContext - ttl int - events []publisher.Event -} -type batchContext struct { - observer outputObserver - retryer *retryer + // The internal hook back to the eventConsumer, used to implement the + // publisher.Batch retry interface. + retryer retryer + + // How many retries until we drop this batch. -1 means it can't be dropped. + ttl int + + // The cached events returned from original.Events(). If some but not + // all of the events are ACKed, those ones are removed from the list. + events []publisher.Event } var batchPool = sync.Pool{ New: func() interface{} { - return &batch{} + return &ttlBatch{} }, } -func newBatch(ctx *batchContext, original queue.Batch, ttl int) *batch { +func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch { if original == nil { panic("empty batch") } - b := batchPool.Get().(*batch) - *b = batch{ + b := batchPool.Get().(*ttlBatch) + *b = ttlBatch{ original: original, - ctx: ctx, + retryer: retryer, ttl: ttl, events: original.Events(), } return b } -func releaseBatch(b *batch) { - *b = batch{} // clear batch +func releaseBatch(b *ttlBatch) { + *b = ttlBatch{} // clear batch batchPool.Put(b) } -func (b *batch) Events() []publisher.Event { +func (b *ttlBatch) Events() []publisher.Event { return b.events } -func (b *batch) ACK() { - if b.ctx != nil { - b.ctx.observer.outBatchACKed(len(b.events)) - } +func (b *ttlBatch) ACK() { b.original.ACK() releaseBatch(b) } -func (b *batch) Drop() { +func (b *ttlBatch) Drop() { b.original.ACK() releaseBatch(b) } -func (b *batch) Retry() { - b.ctx.retryer.retry(b) +func (b *ttlBatch) Retry() { + b.retryer.retry(b, true) } -func (b *batch) Cancelled() { - b.ctx.retryer.cancelled(b) +func (b *ttlBatch) Cancelled() { + b.retryer.retry(b, false) } -func (b *batch) RetryEvents(events []publisher.Event) { - b.updEvents(events) - b.Retry() -} - -func (b *batch) CancelledEvents(events []publisher.Event) { - b.updEvents(events) - b.Cancelled() -} - -func (b *batch) updEvents(events []publisher.Event) { - l1 := len(b.events) - l2 := len(events) - if l1 > l2 { - // report subset of events not to be retried as ACKed - b.ctx.observer.outBatchACKed(l1 - l2) - } - +func (b *ttlBatch) RetryEvents(events []publisher.Event) { b.events = events + b.Retry() } // reduceTTL reduces the time to live for all events that have no 'guaranteed' // sending requirements. reduceTTL returns true if the batch is still alive. -func (b *batch) reduceTTL() bool { +func (b *ttlBatch) reduceTTL() bool { if b.ttl <= 0 { return true } diff --git a/libbeat/publisher/pipeline/util.go b/libbeat/publisher/pipeline/util.go deleted file mode 100644 index db656f4ae9e..00000000000 --- a/libbeat/publisher/pipeline/util.go +++ /dev/null @@ -1,55 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package pipeline - -import "sync" - -type sema struct { - // simulate cancellable counting semaphore using counter + mutex + cond - mutex sync.Mutex - cond sync.Cond - count, max int -} - -func newSema(max int) *sema { - s := &sema{max: max} - s.cond.L = &s.mutex - return s -} - -func (s *sema) inc() { - s.mutex.Lock() - for s.count == s.max { - s.cond.Wait() - } - s.mutex.Unlock() -} - -func (s *sema) release(n int) { - s.mutex.Lock() - old := s.count - s.count -= n - if old == s.max { - if n == 1 { - s.cond.Signal() - } else { - s.cond.Broadcast() - } - } - s.mutex.Unlock() -}