Skip to content

Commit e76abe1

Browse files
author
Steffen Siering
authored
Small pipeline enhancements (#12996)
Note: these changes are required to prepare for some refactoring in filebeat. Introduce OutputChooses publisher mode. Normally this is equivalent to the default mode for most Beats. But we introduce a new mode here, as filebeat sets the default mode to GuaranteedSend. This allows custom inputs to overwrite the default mode in filebeat in the future. Using beats.ProcessorList and the processors list to combine processors from different configurations is a little tricky. To simplify the task, we make sure that beat.ProcessorList implements processors.Processor. We also export processors.NewList, that can be used to compose a set of custom processors.Processor and beats.ProcessorList. The list returned by processors.NewList also implements beats.ProcessorList. It's not nice, but at least makes processors composable.
1 parent ea6869d commit e76abe1

File tree

5 files changed

+28
-7
lines changed

5 files changed

+28
-7
lines changed

CHANGELOG-developer.next.asciidoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,5 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
4444
- New ReporterV2 interfaces that can receive a context on `Fetch(ctx, reporter)`, or `Run(ctx, reporter)`. {pull}11981[11981]
4545
- Generate configuration from `mage` for all Beats. {pull}12618[12618]
4646
- Add ClientFactory to TCP input source to add SplitFunc/NetworkFuncs per client. {pull}8543[8543]
47+
- Introduce beat.OutputChooses publisher mode. {pull}12996[12996]
48+
- Ensure that beat.Processor, beat.ProcessorList, and processors.ProcessorList are compatible and can be composed more easily. {pull}12996[12996]

libbeat/beat/pipeline.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ type PipelineACKHandler struct {
129129
}
130130

131131
type ProcessorList interface {
132+
Processor
132133
All() []Processor
133134
}
134135

@@ -144,10 +145,16 @@ type Processor interface {
144145
type PublishMode uint8
145146

146147
const (
147-
// DefaultGuarantees are up to the pipeline configuration, as configured by the
148-
// operator.
148+
// DefaultGuarantees are up to the pipeline configuration itself.
149149
DefaultGuarantees PublishMode = iota
150150

151+
// OutputChooses mode fully depends on the output and its configuration.
152+
// Events might be dropped based on the users output configuration.
153+
// In this mode no events are dropped within the pipeline. Events are only removed
154+
// after the output has ACKed the events to the pipeline, even if the output
155+
// did drop the events.
156+
OutputChooses
157+
151158
// GuaranteedSend ensures events are retried until acknowledged by the output.
152159
// Normally guaranteed sending should be used with some client ACK-handling
153160
// to update state keeping track of the sending status.

libbeat/processors/processor.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,18 @@ type Processor interface {
4040
String() string
4141
}
4242

43-
func New(config PluginConfig) (*Processors, error) {
44-
procs := &Processors{
45-
log: logp.NewLogger(logName),
43+
// NewList creates a new empty processor list.
44+
// Additional processors can be added to the List field.
45+
func NewList(log *logp.Logger) *Processors {
46+
if log == nil {
47+
log = logp.NewLogger(logName)
4648
}
49+
return &Processors{log: log}
50+
}
51+
52+
// New creates a list of processors from a list of free user configurations.
53+
func New(config PluginConfig) (*Processors, error) {
54+
procs := NewList(nil)
4755

4856
for _, procConfig := range config {
4957
// Handle if/then/else processor which has multiple top-level keys.

libbeat/publisher/pipeline/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func validateClientConfig(c *beat.ClientConfig) error {
4141
withDrop := false
4242

4343
switch m := c.PublishMode; m {
44-
case beat.DefaultGuarantees, beat.GuaranteedSend:
44+
case beat.DefaultGuarantees, beat.GuaranteedSend, beat.OutputChooses:
4545
case beat.DropIfFull:
4646
withDrop = true
4747
default:

libbeat/publisher/pipeline/pipeline.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ import (
4949
// the output clients using a shared work queue for the active outputs.Group.
5050
// Processors in the pipeline are executed in the clients go-routine, before
5151
// entering the queue. No filtering/processing will occur on the output side.
52+
//
53+
// For client connecting to this pipeline, the default PublishMode is
54+
// OutputChooses.
5255
type Pipeline struct {
5356
beatInfo beat.Info
5457

@@ -273,14 +276,15 @@ func (p *Pipeline) Close() error {
273276
return nil
274277
}
275278

276-
// Connect creates a new client with default settings
279+
// Connect creates a new client with default settings.
277280
func (p *Pipeline) Connect() (beat.Client, error) {
278281
return p.ConnectWith(beat.ClientConfig{})
279282
}
280283

281284
// ConnectWith create a new Client for publishing events to the pipeline.
282285
// The client behavior on close and ACK handling can be configured by setting
283286
// the appropriate fields in the passed ClientConfig.
287+
// If not set otherwise the defaut publish mode is OutputChooses.
284288
func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
285289
var (
286290
canDrop bool

0 commit comments

Comments
 (0)