Skip to content

Commit

Permalink
libbeat publisher pipeline cleanups (#28206)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored Dec 7, 2021
1 parent db9b410 commit 16d587b
Show file tree
Hide file tree
Showing 14 changed files with 363 additions and 626 deletions.
5 changes: 0 additions & 5 deletions libbeat/outputs/outest/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ const (
BatchRetry
BatchRetryEvents
BatchCancelled
BatchCancelledEvents
)

func NewBatch(in ...beat.Event) *Batch {
Expand Down Expand Up @@ -76,10 +75,6 @@ func (b *Batch) Cancelled() {
b.doSignal(BatchSignal{Tag: BatchCancelled})
}

func (b *Batch) CancelledEvents(events []publisher.Event) {
b.doSignal(BatchSignal{Tag: BatchCancelledEvents, Events: events})
}

func (b *Batch) doSignal(sig BatchSignal) {
b.Signals = append(b.Signals, sig)
if b.OnSignal != nil {
Expand Down
14 changes: 11 additions & 3 deletions libbeat/publisher/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,25 @@ import (

// Batch is used to pass a batch of events to the outputs and asynchronously listening
// for signals from these outpts. After a batch is processed (completed or
// errors), one of the signal methods must be called.
// errors), one of the signal methods must be called. In normal operation
// every batch will eventually receive an ACK() or a Drop().
type Batch interface {
Events() []Event

// signals
// All events have been acknowledged by the output.
ACK()

// Give up on these events permanently without sending.
Drop()

// Try sending this batch again
Retry()

// Try sending the events in this list again; all others are acknowledged.
RetryEvents(events []Event)

// Send was aborted, try again but don't decrease the batch's TTL counter.
Cancelled()
CancelledEvents(events []Event)
}

// Event is used by the publisher pipeline and broker to pass additional
Expand Down
1 change: 0 additions & 1 deletion libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ func (c *client) logger() *logp.Logger {
}

func (c *client) onClosing() {
c.pipeline.observer.clientClosing()
if c.eventer != nil {
c.eventer.Closing()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ import (
)

type worker struct {
id uint
observer outputObserver
qu workQueue
qu chan publisher.Batch
done chan struct{}
}

Expand All @@ -46,14 +45,12 @@ type netClientWorker struct {
worker
client outputs.NetworkClient

batchSize int
batchSizer func() int
logger logger
logger logger

tracer *apm.Tracer
}

func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client, logger logger, tracer *apm.Tracer) outputWorker {
func makeClientWorker(observer outputObserver, qu chan publisher.Batch, client outputs.Client, logger logger, tracer *apm.Tracer) outputWorker {
w := worker{
observer: observer,
qu: qu,
Expand Down Expand Up @@ -102,7 +99,6 @@ func (w *clientWorker) run() {
if batch == nil {
continue
}
w.observer.outBatchSend(len(batch.Events()))
if err := w.client.Publish(context.TODO(), batch); err != nil {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func TestMakeClientWorker(t *testing.T) {

logger := makeBufLogger(t)

wqu := makeWorkQueue()
retryer := newRetryer(logger, nilObserver, wqu, nil)
workQueue := make(chan publisher.Batch)
retryer := newStandaloneRetryer(workQueue)
defer retryer.close()

var published atomic.Uint
Expand All @@ -64,13 +64,13 @@ func TestMakeClientWorker(t *testing.T) {

client := ctor(publishFn)

worker := makeClientWorker(nilObserver, wqu, client, logger, nil)
worker := makeClientWorker(nilObserver, workQueue, client, logger, nil)
defer worker.Close()

for i := uint(0); i < numBatches; i++ {
batch := randomBatch(50, 150).withRetryer(retryer)
numEvents += uint(len(batch.Events()))
wqu <- batch
workQueue <- batch
}

// Give some time for events to be published
Expand Down Expand Up @@ -114,14 +114,16 @@ func TestReplaceClientWorker(t *testing.T) {

logger := makeBufLogger(t)

wqu := makeWorkQueue()
retryer := newRetryer(logger, nilObserver, wqu, nil)
workQueue := make(chan publisher.Batch)
retryer := newStandaloneRetryer(workQueue)
defer retryer.close()

var batches []publisher.Batch
var numEvents int
for i := uint(0); i < numBatches; i++ {
batch := randomBatch(minEventsInBatch, maxEventsInBatch).withRetryer(retryer)
batch := randomBatch(
minEventsInBatch, maxEventsInBatch,
).withRetryer(retryer)
batch.events[0].Content.Private = i
numEvents += batch.Len()
batches = append(batches, batch)
Expand All @@ -133,7 +135,7 @@ func TestReplaceClientWorker(t *testing.T) {
defer wg.Done()
for _, batch := range batches {
t.Logf("publish batch: %v", batch.(*mockBatch).events[0].Content.Private)
wqu <- batch
workQueue <- batch
}
}()

Expand All @@ -156,7 +158,7 @@ func TestReplaceClientWorker(t *testing.T) {
}

client := ctor(blockingPublishFn)
worker := makeClientWorker(nilObserver, wqu, client, logger, nil)
worker := makeClientWorker(nilObserver, workQueue, client, logger, nil)

// Allow the worker to make *some* progress before we close it
timeout := 10 * time.Second
Expand All @@ -183,7 +185,7 @@ func TestReplaceClientWorker(t *testing.T) {
}

client = ctor(countingPublishFn)
makeClientWorker(nilObserver, wqu, client, logger, nil)
makeClientWorker(nilObserver, workQueue, client, logger, nil)
wg.Wait()

// Make sure that all events have eventually been published
Expand Down Expand Up @@ -214,8 +216,8 @@ func TestMakeClientTracer(t *testing.T) {

logger := makeBufLogger(t)

wqu := makeWorkQueue()
retryer := newRetryer(logger, nilObserver, wqu, nil)
workQueue := make(chan publisher.Batch)
retryer := newStandaloneRetryer(workQueue)
defer retryer.close()

var published atomic.Uint
Expand All @@ -229,13 +231,13 @@ func TestMakeClientTracer(t *testing.T) {
recorder := apmtest.NewRecordingTracer()
defer recorder.Close()

worker := makeClientWorker(nilObserver, wqu, client, logger, recorder.Tracer)
worker := makeClientWorker(nilObserver, workQueue, client, logger, recorder.Tracer)
defer worker.Close()

for i := 0; i < numBatches; i++ {
batch := randomBatch(10, 15).withRetryer(retryer)
numEvents += uint(len(batch.Events()))
wqu <- batch
workQueue <- batch
}

// Give some time for events to be published
Expand Down
Loading

0 comments on commit 16d587b

Please sign in to comment.