Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Beats to increase publisher internal queue size #22650

Merged
merged 8 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -876,14 +876,10 @@ same journal. {pull}18467[18467]
- Add initial SIP protocol support {pull}21221[21221]
- Add support for overriding the published index on a per-protocol/flow basis. {pull}22134[22134]
- Change build process for x-pack distribution {pull}21979[21979]

*Packetbeat*

`host` metadata fields when processing network data from network tap or mirror
port. {pull}19209[19209]

- Tuned the internal queue size to reduce the chances of events being dropped. {pull}22650[22650]

*Functionbeat*

- Add basic ECS categorization and `cloud` fields. {pull}19174[19174]
- Add support for parallelization factor for kinesis. {pull}20727[20727]

Expand All @@ -899,6 +895,7 @@ port. {pull}19209[19209]
- Add file.pe and process.pe fields to ProcessCreate & LoadImage events in Sysmon module. {issue}17335[17335] {pull}22217[22217]

*Elastic Log Driver*

- Add support for `docker logs` command {pull}19531[19531]

==== Deprecated
Expand Down
38 changes: 25 additions & 13 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type Beat struct {

keystore keystore.Keystore
processing processing.Supporter

InputQueueSize int // Size of the producer queue used by most queues.
}

type beatConfig struct {
Expand Down Expand Up @@ -334,29 +336,37 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
return nil, errors.New(msg)
}
}
pipeline, err := pipeline.Load(b.Info,
pipeline.Monitors{
Metrics: reg,
Telemetry: monitoring.GetNamespace("state").GetRegistry(),
Logger: logp.L().Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
},
b.Config.Pipeline,
b.processing,
b.makeOutputFactory(b.Config.Output),
)

var publisher *pipeline.Pipeline
monitors := pipeline.Monitors{
Metrics: reg,
Telemetry: monitoring.GetNamespace("state").GetRegistry(),
Logger: logp.L().Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
}
outputFactory := b.makeOutputFactory(b.Config.Output)
settings := pipeline.Settings{
WaitClose: 0,
WaitCloseMode: pipeline.NoWaitOnClose,
Processors: b.processing,
InputQueueSize: b.InputQueueSize,
}
if settings.InputQueueSize > 0 {
publisher, err = pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings)
} else {
publisher, err = pipeline.Load(b.Info, monitors, b.Config.Pipeline, b.processing, outputFactory)
}
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %+v", err)
}

reload.Register.MustRegister("output", b.makeOutputReloader(pipeline.OutputReloader()))
reload.Register.MustRegister("output", b.makeOutputReloader(publisher.OutputReloader()))

// TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet,
// but refine publisher to disconnect clients on stop automatically
// defer pipeline.Close()

b.Publisher = pipeline
b.Publisher = publisher
beater, err := bt(&b.Beat, sub)
if err != nil {
return nil, err
Expand Down Expand Up @@ -579,6 +589,8 @@ func (b *Beat) handleFlags() error {
func (b *Beat) configure(settings Settings) error {
var err error

b.InputQueueSize = settings.InputQueueSize

cfg, err := cfgfile.Load("", settings.ConfigOverrides)
if err != nil {
return fmt.Errorf("error loading config file: %v", err)
Expand Down
5 changes: 5 additions & 0 deletions libbeat/cmd/instance/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,9 @@ type Settings struct {
Processing processing.SupportFactory

Umask *int

// InputQueueSize is the size for the internal publisher queue in the
// publisher pipeline. This is only useful when the Beat plans to use
// beat.DropIfFull PublishMode. Leave as zero for default.
InputQueueSize int
}
5 changes: 3 additions & 2 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func LoadWithSettings(

name := beatInfo.Name

queueBuilder, err := createQueueBuilder(config.Queue, monitors)
queueBuilder, err := createQueueBuilder(config.Queue, monitors, settings.InputQueueSize)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -169,6 +169,7 @@ func loadOutput(
func createQueueBuilder(
config common.ConfigNamespace,
monitors Monitors,
inQueueSize int,
) (func(queue.ACKListener) (queue.Queue, error), error) {
queueType := defaultQueueType
if b := config.Name(); b != "" {
Expand All @@ -191,6 +192,6 @@ func createQueueBuilder(
}

return func(ackListener queue.ACKListener) (queue.Queue, error) {
return queueFactory(ackListener, monitors.Logger, queueConfig)
return queueFactory(ackListener, monitors.Logger, queueConfig, inQueueSize)
}, nil
}
2 changes: 2 additions & 0 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ type Settings struct {
WaitCloseMode WaitCloseMode

Processors processing.Supporter

InputQueueSize int
}

// WaitCloseMode enumerates the possible behaviors of WaitClose in a pipeline.
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func init() {
// queueFactory matches the queue.Factory interface, and is used to add the
// disk queue to the registry.
func queueFactory(
ackListener queue.ACKListener, logger *logp.Logger, cfg *common.Config,
ackListener queue.ACKListener, logger *logp.Logger, cfg *common.Config, _ int, // input queue size param is unused.
) (queue.Queue, error) {
settings, err := SettingsForUserConfig(cfg)
if err != nil {
Expand Down
27 changes: 22 additions & 5 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ import (
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

const (
minInputQueueSize = 20
maxInputQueueSizeRatio = 0.1
)

type broker struct {
done chan struct{}

Expand Down Expand Up @@ -56,6 +61,7 @@ type Settings struct {
FlushMinEvents int
FlushTimeout time.Duration
WaitOnClose bool
InputQueueSize int
}

type ackChan struct {
Expand All @@ -82,7 +88,7 @@ func init() {
}

func create(
ackListener queue.ACKListener, logger *logp.Logger, cfg *common.Config,
ackListener queue.ACKListener, logger *logp.Logger, cfg *common.Config, inQueueSize int,
) (queue.Queue, error) {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
Expand All @@ -98,6 +104,7 @@ func create(
Events: config.Events,
FlushMinEvents: config.FlushMinEvents,
FlushTimeout: config.FlushTimeout,
InputQueueSize: inQueueSize,
}), nil
}

Expand All @@ -108,16 +115,14 @@ func NewQueue(
logger logger,
settings Settings,
) queue.Queue {
// define internal channel size for producer/client requests
// to the broker
chanSize := 20

var (
sz = settings.Events
minEvents = settings.FlushMinEvents
flushTimeout = settings.FlushTimeout
)

chanSize := AdjustInputQueueSize(settings.InputQueueSize, sz)

if minEvents < 1 {
minEvents = 1
}
Expand Down Expand Up @@ -298,3 +303,15 @@ func (l *chanList) reverse() {
l.prepend(tmp.pop())
}
}

// AdjustInputQueueSize decides the size for the input queue.
func AdjustInputQueueSize(requested, mainQueueSize int) (actual int) {
actual = requested
if max := int(float64(mainQueueSize) * maxInputQueueSizeRatio); mainQueueSize > 0 && actual > max {
actual = max
}
if actual < minInputQueueSize {
actual = minInputQueueSize
}
return actual
}
29 changes: 29 additions & 0 deletions libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package memqueue

import (
"flag"
"math"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest"
)
Expand Down Expand Up @@ -82,3 +85,29 @@ func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.Queu
})
}
}

func TestAdjustInputQueueSize(t *testing.T) {
t.Run("zero yields default value (main queue size=0)", func(t *testing.T) {
assert.Equal(t, minInputQueueSize, AdjustInputQueueSize(0, 0))
})
t.Run("zero yields default value (main queue size=10)", func(t *testing.T) {
assert.Equal(t, minInputQueueSize, AdjustInputQueueSize(0, 10))
})
t.Run("can't go below min", func(t *testing.T) {
assert.Equal(t, minInputQueueSize, AdjustInputQueueSize(1, 0))
})
t.Run("can set any value within bounds", func(t *testing.T) {
for q, mainQueue := minInputQueueSize+1, 4096; q < int(float64(mainQueue)*maxInputQueueSizeRatio); q += 10 {
assert.Equal(t, q, AdjustInputQueueSize(q, mainQueue))
}
})
t.Run("can set any value if no upper bound", func(t *testing.T) {
for q := minInputQueueSize + 1; q < math.MaxInt32; q *= 2 {
assert.Equal(t, q, AdjustInputQueueSize(q, 0))
}
})
t.Run("can't go above upper bound", func(t *testing.T) {
mainQueue := 4096
assert.Equal(t, int(float64(mainQueue)*maxInputQueueSizeRatio), AdjustInputQueueSize(mainQueue, mainQueue))
})
}
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

// Factory for creating a queue used by a pipeline instance.
type Factory func(ACKListener, *logp.Logger, *common.Config) (Queue, error)
type Factory func(ACKListener, *logp.Logger, *common.Config, int) (Queue, error)

// ACKListener listens to special events to be send by queue implementations.
type ACKListener interface {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/spool/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func init() {
}

func create(
ackListener queue.ACKListener, logp *logp.Logger, cfg *common.Config,
ackListener queue.ACKListener, logp *logp.Logger, cfg *common.Config, inQueueSize int,
) (queue.Queue, error) {
cfgwarn.Beta("Spooling to disk is beta")

Expand Down
9 changes: 5 additions & 4 deletions packetbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ func PacketbeatSettings() instance.Settings {
runFlags.AddGoFlag(flag.CommandLine.Lookup("dump"))

return instance.Settings{
RunFlags: runFlags,
Name: Name,
HasDashboards: true,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
RunFlags: runFlags,
Name: Name,
HasDashboards: true,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
InputQueueSize: 400,
}
}

Expand Down