Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for queue settings under outputs #36788

Merged
merged 13 commits into from
Oct 19, 2023
Prev Previous commit
Next Next commit
support for reloading
  • Loading branch information
leehinman committed Oct 19, 2023
commit b1a86665fd5591d9b431670eda19ea26d56e95c7
4 changes: 2 additions & 2 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ is collected by it.
- Add support for AWS external IDs. {issue}36321[36321] {pull}36322[36322]
- [Enhanncement for host.ip and host.mac] Disabling netinfo.enabled option of add-host-metadata processor {pull}36506[36506]
Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will disable the netinfo.enabled option of add_host_metadata processor
- allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}36693[36693]
- elasticsearch output now supports `idle_connection_timeout`. {issue}35615[35615] {pull}36693[36693]
- allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}36788[36788]
- elasticsearch output now supports `idle_connection_timeout`. {issue}35615[35615] {pull}36788[36788]

*Auditbeat*

Expand Down
6 changes: 5 additions & 1 deletion libbeat/outputs/console/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package console

import "github.com/elastic/beats/v7/libbeat/outputs/codec"
import (
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/elastic-agent-libs/config"
)

type Config struct {
Codec codec.Config `config:"codec"`
Expand All @@ -26,6 +29,7 @@ type Config struct {
Pretty bool `config:"pretty"`

BatchSize int
Queue config.Namespace `config:"queue"`
}

var defaultConfig = Config{}
14 changes: 3 additions & 11 deletions libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"os"
"runtime"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
Expand All @@ -43,13 +42,6 @@ type console struct {
index string
}

type consoleEvent struct {
Timestamp time.Time `json:"@timestamp" struct:"@timestamp"`

// Note: stdlib json doesn't support inlining :( -> use `codec: 2`, to generate proper event
Fields interface{} `struct:",inline"`
}

func init() {
outputs.RegisterType("console", makeConsole)
}
Expand Down Expand Up @@ -82,18 +74,18 @@ func makeConsole(
index := beat.Beat
c, err := newConsole(index, observer, enc)
if err != nil {
return outputs.Fail(fmt.Errorf("console output initialization failed with: %v", err))
return outputs.Fail(fmt.Errorf("console output initialization failed with: %w", err))
}

// check stdout actually being available
if runtime.GOOS != "windows" {
if _, err = c.out.Stat(); err != nil {
err = fmt.Errorf("console output initialization failed with: %v", err)
err = fmt.Errorf("console output initialization failed with: %w", err)
return outputs.Fail(err)
}
}

return outputs.Success(config.BatchSize, 0, c)
return outputs.Success(config.Queue, config.BatchSize, 0, c)
}

func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) {
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/elasticsearch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type elasticsearchConfig struct {
AllowOlderVersion bool `config:"allow_older_versions"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`
Queue config.Namespace `config:"queue"`
}

type Backoff struct {
Expand Down
34 changes: 17 additions & 17 deletions libbeat/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ func makeES(
return outputs.Fail(err)
}

config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
esConfig := defaultConfig
if err := cfg.Unpack(&esConfig); err != nil {
return outputs.Fail(err)
}

policy, err := newNonIndexablePolicy(config.NonIndexablePolicy)
policy, err := newNonIndexablePolicy(esConfig.NonIndexablePolicy)
if err != nil {
log.Errorf("error while creating file identifier: %v", err)
return outputs.Fail(err)
Expand All @@ -65,12 +65,12 @@ func makeES(
return outputs.Fail(err)
}

if proxyURL := config.Transport.Proxy.URL; proxyURL != nil && !config.Transport.Proxy.Disable {
if proxyURL := esConfig.Transport.Proxy.URL; proxyURL != nil && !esConfig.Transport.Proxy.Disable {
log.Debugf("breaking down proxy URL. Scheme: '%s', host[:port]: '%s', path: '%s'", proxyURL.Scheme, proxyURL.Host, proxyURL.Path)
log.Infof("Using proxy URL: %s", proxyURL)
}

params := config.Params
params := esConfig.Params
if len(params) == 0 {
params = nil
}
Expand All @@ -84,7 +84,7 @@ func makeES(

clients := make([]outputs.NetworkClient, len(hosts))
for i, host := range hosts {
esURL, err := common.MakeURL(config.Protocol, config.Path, host, 9200)
esURL, err := common.MakeURL(esConfig.Protocol, esConfig.Path, host, 9200)
if err != nil {
log.Errorf("Invalid host param set: %s, Error: %+v", host, err)
return outputs.Fail(err)
Expand All @@ -95,17 +95,17 @@ func makeES(
ConnectionSettings: eslegclient.ConnectionSettings{
URL: esURL,
Beatname: beat.Beat,
Kerberos: config.Kerberos,
Username: config.Username,
Password: config.Password,
APIKey: config.APIKey,
Kerberos: esConfig.Kerberos,
Username: esConfig.Username,
Password: esConfig.Password,
APIKey: esConfig.APIKey,
Parameters: params,
Headers: config.Headers,
CompressionLevel: config.CompressionLevel,
Headers: esConfig.Headers,
CompressionLevel: esConfig.CompressionLevel,
Observer: observer,
EscapeHTML: config.EscapeHTML,
Transport: config.Transport,
IdleConnTimeout: config.Transport.IdleConnTimeout,
EscapeHTML: esConfig.EscapeHTML,
Transport: esConfig.Transport,
IdleConnTimeout: esConfig.Transport.IdleConnTimeout,
},
Index: index,
Pipeline: pipeline,
Expand All @@ -116,11 +116,11 @@ func makeES(
return outputs.Fail(err)
}

client = outputs.WithBackoff(client, config.Backoff.Init, config.Backoff.Max)
client = outputs.WithBackoff(client, esConfig.Backoff.Init, esConfig.Backoff.Max)
clients[i] = client
}

return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, clients)
return outputs.SuccessNet(esConfig.Queue, esConfig.LoadBalance, esConfig.BulkMaxSize, esConfig.MaxRetries, clients)
}

func buildSelectors(
Expand Down
24 changes: 13 additions & 11 deletions libbeat/outputs/fileout/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,31 @@ import (
"fmt"

"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/file"
)

type config struct {
Path string `config:"path"`
Filename string `config:"filename"`
RotateEveryKb uint `config:"rotate_every_kb" validate:"min=1"`
NumberOfFiles uint `config:"number_of_files"`
Codec codec.Config `config:"codec"`
Permissions uint32 `config:"permissions"`
RotateOnStartup bool `config:"rotate_on_startup"`
type fileOutConfig struct {
Path string `config:"path"`
Filename string `config:"filename"`
RotateEveryKb uint `config:"rotate_every_kb" validate:"min=1"`
NumberOfFiles uint `config:"number_of_files"`
Codec codec.Config `config:"codec"`
Permissions uint32 `config:"permissions"`
RotateOnStartup bool `config:"rotate_on_startup"`
Queue config.Namespace `config:"queue"`
}

func defaultConfig() config {
return config{
func defaultConfig() fileOutConfig {
return fileOutConfig{
NumberOfFiles: 7,
RotateEveryKb: 10 * 1024,
Permissions: 0600,
RotateOnStartup: true,
}
}

func (c *config) Validate() error {
func (c *fileOutConfig) Validate() error {
if c.NumberOfFiles < 2 || c.NumberOfFiles > file.MaxBackupsLimit {
return fmt.Errorf("the number_of_files to keep should be between 2 and %v",
file.MaxBackupsLimit)
Expand Down
10 changes: 5 additions & 5 deletions libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func makeFileout(
observer outputs.Observer,
cfg *c.C,
) (outputs.Group, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
foConfig := defaultConfig()
if err := cfg.Unpack(&foConfig); err != nil {
return outputs.Fail(err)
}

Expand All @@ -64,14 +64,14 @@ func makeFileout(
beat: beat,
observer: observer,
}
if err := fo.init(beat, config); err != nil {
if err := fo.init(beat, foConfig); err != nil {
return outputs.Fail(err)
}

return outputs.Success(-1, 0, fo)
return outputs.Success(foConfig.Queue, -1, 0, fo)
}

func (out *fileOutput) init(beat beat.Info, c config) error {
func (out *fileOutput) init(beat beat.Info, c fileOutConfig) error {
var path string
if c.Filename != "" {
path = filepath.Join(c.Path, c.Filename)
Expand Down
7 changes: 1 addition & 6 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type kafkaConfig struct {
Codec codec.Config `config:"codec"`
Sasl kafka.SaslConfig `config:"sasl"`
EnableFAST bool `config:"enable_krb5_fast"`
Queue config.Namespace `config:"queue"`
}

type metaConfig struct {
Expand All @@ -101,12 +102,6 @@ var compressionModes = map[string]sarama.CompressionCodec{
"snappy": sarama.CompressionSnappy,
}

const (
saslTypePlaintext = sarama.SASLTypePlaintext
saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256
saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512
)

func defaultConfig() kafkaConfig {
return kafkaConfig{
Hosts: nil,
Expand Down
12 changes: 6 additions & 6 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func makeKafka(
log := logp.NewLogger(logSelector)
log.Debug("initialize kafka output")

config, err := readConfig(cfg)
kConfig, err := readConfig(cfg)
if err != nil {
return outputs.Fail(err)
}
Expand All @@ -57,7 +57,7 @@ func makeKafka(
return outputs.Fail(err)
}

libCfg, err := newSaramaConfig(log, config)
libCfg, err := newSaramaConfig(log, kConfig)
if err != nil {
return outputs.Fail(err)
}
Expand All @@ -67,21 +67,21 @@ func makeKafka(
return outputs.Fail(err)
}

codec, err := codec.CreateEncoder(beat, config.Codec)
codec, err := codec.CreateEncoder(beat, kConfig.Codec)
if err != nil {
return outputs.Fail(err)
}

client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, config.Key, topic, config.Headers, codec, libCfg)
client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, kConfig.Key, topic, kConfig.Headers, codec, libCfg)
if err != nil {
return outputs.Fail(err)
}

retry := 0
if config.MaxRetries < 0 {
if kConfig.MaxRetries < 0 {
retry = -1
}
return outputs.Success(config.BulkMaxSize, retry, client)
return outputs.Success(kConfig.Queue, kConfig.BulkMaxSize, retry, client)
}

func buildTopicSelector(cfg *config.C) (outil.Selector, error) {
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/logstash/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Config struct {
Proxy transport.ProxyConfig `config:",inline"`
Backoff Backoff `config:"backoff"`
EscapeHTML bool `config:"escape_html"`
Queue config.Namespace `config:"queue"`
}

type Backoff struct {
Expand Down
18 changes: 9 additions & 9 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func makeLogstash(
observer outputs.Observer,
cfg *conf.C,
) (outputs.Group, error) {
config, err := readConfig(cfg, beat)
lsConfig, err := readConfig(cfg, beat)
if err != nil {
return outputs.Fail(err)
}
Expand All @@ -51,14 +51,14 @@ func makeLogstash(
return outputs.Fail(err)
}

tls, err := tlscommon.LoadTLSConfig(config.TLS)
tls, err := tlscommon.LoadTLSConfig(lsConfig.TLS)
if err != nil {
return outputs.Fail(err)
}

transp := transport.Config{
Timeout: config.Timeout,
Proxy: &config.Proxy,
Timeout: lsConfig.Timeout,
Proxy: &lsConfig.Proxy,
TLS: tls,
Stats: observer,
}
Expand All @@ -72,18 +72,18 @@ func makeLogstash(
return outputs.Fail(err)
}

if config.Pipelining > 0 {
client, err = newAsyncClient(beat, conn, observer, config)
if lsConfig.Pipelining > 0 {
client, err = newAsyncClient(beat, conn, observer, lsConfig)
} else {
client, err = newSyncClient(beat, conn, observer, config)
client, err = newSyncClient(beat, conn, observer, lsConfig)
}
if err != nil {
return outputs.Fail(err)
}

client = outputs.WithBackoff(client, config.Backoff.Init, config.Backoff.Max)
client = outputs.WithBackoff(client, lsConfig.Backoff.Init, lsConfig.Backoff.Max)
clients[i] = client
}

return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, clients)
return outputs.SuccessNet(lsConfig.Queue, lsConfig.LoadBalance, lsConfig.BulkMaxSize, lsConfig.MaxRetries, clients)
}
2 changes: 2 additions & 0 deletions libbeat/outputs/redis/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/transport"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)
Expand All @@ -40,6 +41,7 @@ type redisConfig struct {
Db int `config:"db"`
DataType string `config:"datatype"`
Backoff backoff `config:"backoff"`
Queue config.Namespace `config:"queue"`
}

type backoff struct {
Expand Down
Loading