diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b0692fcaf056..25934dcdbc85 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -795,6 +795,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add ECS fields for x509 certs, event categorization, and related IP info. {pull}19167[19167] - Add 100-continue support {issue}15830[15830] {pull}19349[19349] - Add initial SIP protocol support {pull}21221[21221] +- Add support for overriding the published index on a per-protocol/flow basis. {pull}22134[22134] - Change build process for x-pack distribution {pull}21979[21979] diff --git a/libbeat/cfgfile/list.go b/libbeat/cfgfile/list.go index 9b62d95f6a99..38193ef52049 100644 --- a/libbeat/cfgfile/list.go +++ b/libbeat/cfgfile/list.go @@ -157,7 +157,9 @@ func (r *RunnerList) Has(hash uint64) bool { // HashConfig hashes a given common.Config func HashConfig(c *common.Config) (uint64, error) { var config map[string]interface{} - c.Unpack(&config) + if err := c.Unpack(&config); err != nil { + return 0, err + } return hashstructure.Hash(config, nil) } diff --git a/packetbeat/_meta/config/beat.reference.yml.tmpl b/packetbeat/_meta/config/beat.reference.yml.tmpl index 722c47102dc8..5ccc9bf5a92f 100644 --- a/packetbeat/_meta/config/beat.reference.yml.tmpl +++ b/packetbeat/_meta/config/beat.reference.yml.tmpl @@ -63,6 +63,9 @@ packetbeat.flows: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where flow events are indexed. + #index: my-custom-flow-index + {{header "Transaction protocols"}} packetbeat.protocols: @@ -73,6 +76,9 @@ packetbeat.protocols: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where this protocol's events are indexed. + #index: my-custom-icmp-index + - type: amqp # Enable AMQP monitoring. Default: true #enabled: true @@ -113,6 +119,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-amqp-index + - type: cassandra #Cassandra port for traffic monitoring. ports: [9042] @@ -143,6 +152,9 @@ packetbeat.protocols: # This option indicates which Operator/Operators will be ignored. #ignored_ops: ["SUPPORTED","OPTIONS"] + # Overrides where this protocol's events are indexed. + #index: my-custom-cassandra-index + - type: dhcpv4 # Configure the DHCP for IPv4 ports. ports: [67, 68] @@ -183,6 +195,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-dhcpv4-index + - type: http # Enable HTTP monitoring. Default: true #enabled: true @@ -257,6 +272,9 @@ packetbeat.protocols: # be trimmed to this size. Default is 10 MB. #max_message_size: 10485760 + # Overrides where this protocol's events are indexed. + #index: my-custom-http-index + - type: memcache # Enable memcache monitoring. Default: true #enabled: true @@ -309,6 +327,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-memcache-index + - type: mysql # Enable mysql monitoring. Default: true #enabled: true @@ -332,6 +353,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-mysql-index + - type: pgsql # Enable pgsql monitoring. Default: true #enabled: true @@ -355,6 +379,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-pgsql-index + - type: redis # Enable redis monitoring. Default: true #enabled: true @@ -387,6 +414,9 @@ packetbeat.protocols: # large enough to allow for pipelining. #queue_max_messages: 20000 + # Overrides where this protocol's events are indexed. + #index: my-custom-redis-index + - type: thrift # Enable thrift monitoring. Default: true #enabled: true @@ -445,6 +475,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-thrift-index + - type: mongodb # Enable mongodb monitoring. Default: true #enabled: true @@ -478,6 +511,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-mongodb-index + - type: nfs # Enable NFS monitoring. Default: true #enabled: true @@ -501,6 +537,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-nfs-index + - type: tls # Enable TLS monitoring. Default: true #enabled: true @@ -531,6 +570,9 @@ packetbeat.protocols: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where this protocol's events are indexed. + #index: my-custom-tls-index + - type: sip # Configure the ports where to listen for SIP traffic. You can disable the SIP protocol by commenting out the list of ports. ports: [5060] @@ -544,6 +586,9 @@ packetbeat.protocols: # Preserve original contents in event.original keep_original: true + # Overrides where this protocol's events are indexed. + #index: my-custom-sip-index + {{header "Monitored processes"}} # Packetbeat can enrich events with information about the process associated diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index b862bd3ee112..d72a98d4a5f9 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -18,46 +18,32 @@ package beater import ( - "errors" "flag" - "fmt" - "sync" "time" - "github.com/tsg/gopacket/layers" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/service" "github.com/elastic/beats/v7/packetbeat/config" - "github.com/elastic/beats/v7/packetbeat/decoder" - "github.com/elastic/beats/v7/packetbeat/flows" - "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" - "github.com/elastic/beats/v7/packetbeat/protos/icmp" - "github.com/elastic/beats/v7/packetbeat/protos/tcp" - "github.com/elastic/beats/v7/packetbeat/protos/udp" - "github.com/elastic/beats/v7/packetbeat/publish" - "github.com/elastic/beats/v7/packetbeat/sniffer" // Add packetbeat default processors _ "github.com/elastic/beats/v7/packetbeat/processor/add_kubernetes_metadata" ) -// Beater object. Contains all objects needed to run the beat -type packetbeat struct { - config config.Config - cmdLineArgs flags - sniff *sniffer.Sniffer - - // publisher/pipeline - pipeline beat.Pipeline - transPub *publish.TransactionPublisher - flows *flows.Flows -} +// this is mainly a limitation to ensure that we never deadlock +// after exiting the main select loop in centrally managed packetbeat +// in order to ensure we don't block on a channel write we make sure +// that the errors channel propagated back from the sniffers has a buffer +// that's equal to the number of sniffers that we can run, that way, if +// exiting and we throw a whole bunch of errors for some reason, each +// sniffer can write out the error even though the main loop has already +// exited with the result of the first error +var maxSniffers = 100 type flags struct { file *string @@ -79,8 +65,8 @@ func init() { } } -func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { - config := config.Config{ +func initialConfig() config.Config { + return config.Config{ Interfaces: config.InterfacesConfig{ File: *cmdLineArgs.file, Loop: *cmdLineArgs.loop, @@ -89,111 +75,31 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { Dumpfile: *cmdLineArgs.dumpfile, }, } - err := rawConfig.Unpack(&config) - if err != nil { - logp.Err("fails to read the beat config: %v, %v", err, config) - return nil, err - } - - pb := &packetbeat{ - config: config, - cmdLineArgs: cmdLineArgs, - } - err = pb.init(b) - if err != nil { - return nil, err - } - - return pb, nil } -// init packetbeat components -func (pb *packetbeat) init(b *beat.Beat) error { - var err error - cfg := &pb.config - // Enable the process watcher only if capturing live traffic - if cfg.Interfaces.File == "" { - err = procs.ProcWatcher.Init(cfg.Procs) - if err != nil { - logp.Critical(err.Error()) - return err - } - } else { - logp.Info("Process watcher disabled when file input is used") - } - - pb.pipeline = b.Publisher - pb.transPub, err = publish.NewTransactionPublisher( - b.Info.Name, - b.Publisher, - pb.config.IgnoreOutgoing, - pb.config.Interfaces.File == "", - ) - if err != nil { - return err - } - - logp.Debug("main", "Initializing protocol plugins") - err = protos.Protos.Init(false, pb.transPub, cfg.Protocols, cfg.ProtocolsList) - if err != nil { - return fmt.Errorf("Initializing protocol analyzers failed: %v", err) - } - - if err := pb.setupFlows(); err != nil { - return err - } - - return pb.setupSniffer() -} - -func (pb *packetbeat) setupSniffer() error { - config := &pb.config - - icmp, err := pb.icmpConfig() - if err != nil { - return err - } - - withVlans := config.Interfaces.WithVlans - withICMP := icmp.Enabled() - - filter := config.Interfaces.BpfFilter - if filter == "" && !config.Flows.IsEnabled() { - filter = protos.Protos.BpfFilter(withVlans, withICMP) - } - - pb.sniff, err = sniffer.New(false, filter, pb.createWorker, config.Interfaces) - return err +// Beater object. Contains all objects needed to run the beat +type packetbeat struct { + config *common.Config + factory *processorFactory + done chan struct{} } -func (pb *packetbeat) setupFlows() error { - config := &pb.config - if !config.Flows.IsEnabled() { - return nil - } - - processors, err := processors.New(config.Flows.Processors) - if err != nil { - return err - } - - client, err := pb.pipeline.ConnectWith(beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - EventMetadata: config.Flows.EventMetadata, - Processor: processors, - KeepNull: config.Flows.KeepNull, - }, - }) - if err != nil { - return err +func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { + configurator := config.NewAgentConfig + if !b.Manager.Enabled() { + configurator = initialConfig().FromStatic } - pb.flows, err = flows.NewFlows(client.PublishAll, config.Flows) - if err != nil { - return err + factory := newProcessorFactory(b.Info.Name, make(chan error, maxSniffers), b, configurator) + if err := factory.CheckConfig(rawConfig); err != nil { + return nil, err } - return nil + return &packetbeat{ + config: rawConfig, + factory: factory, + done: make(chan struct{}), + }, nil } func (pb *packetbeat) Run(b *beat.Beat) error { @@ -205,114 +111,56 @@ func (pb *packetbeat) Run(b *beat.Beat) error { } }() - defer pb.transPub.Stop() - - timeout := pb.config.ShutdownTimeout - if timeout > 0 { - defer time.Sleep(timeout) + if !b.Manager.Enabled() { + return pb.runStatic(b, pb.factory) } + return pb.runManaged(b, pb.factory) +} - if pb.flows != nil { - pb.flows.Start() - defer pb.flows.Stop() +func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error { + runner, err := factory.Create(b.Publisher, pb.config) + if err != nil { + return err } + runner.Start() + defer runner.Stop() - var wg sync.WaitGroup - errC := make(chan error, 1) + logp.Debug("main", "Waiting for the runner to finish") - // Run the sniffer in background - wg.Add(1) - go func() { - defer wg.Done() - - err := pb.sniff.Run() - if err != nil { - errC <- fmt.Errorf("Sniffer main loop failed: %v", err) - } - }() - - logp.Debug("main", "Waiting for the sniffer to finish") - wg.Wait() select { - default: - case err := <-errC: + case <-pb.done: + case err := <-factory.err: + close(pb.done) return err } - return nil } -// Called by the Beat stop function -func (pb *packetbeat) Stop() { - logp.Info("Packetbeat send stop signal") - pb.sniff.Stop() -} - -func (pb *packetbeat) createWorker(dl layers.LinkType) (sniffer.Worker, error) { - var icmp4 icmp.ICMPv4Processor - var icmp6 icmp.ICMPv6Processor - cfg, err := pb.icmpConfig() - if err != nil { - return nil, err - } - if cfg.Enabled() { - reporter, err := pb.transPub.CreateReporter(cfg) - if err != nil { - return nil, err - } - - icmp, err := icmp.New(false, reporter, cfg) - if err != nil { - return nil, err +func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error { + runner := newReloader(management.DebugK, factory, b.Publisher) + reload.Register.MustRegisterList("inputs", runner) + defer runner.Stop() + + logp.Debug("main", "Waiting for the runner to finish") + + for { + select { + case <-pb.done: + return nil + case err := <-factory.err: + // when we're managed we don't want + // to stop if the sniffer(s) exited without an error + // this would happen during a configuration reload + if err != nil { + close(pb.done) + return err + } } - - icmp4 = icmp - icmp6 = icmp - } - - tcp, err := tcp.NewTCP(&protos.Protos) - if err != nil { - return nil, err } - - udp, err := udp.NewUDP(&protos.Protos) - if err != nil { - return nil, err - } - - worker, err := decoder.New(pb.flows, dl, icmp4, icmp6, tcp, udp) - if err != nil { - return nil, err - } - - return worker, nil } -func (pb *packetbeat) icmpConfig() (*common.Config, error) { - var icmp *common.Config - if pb.config.Protocols["icmp"].Enabled() { - icmp = pb.config.Protocols["icmp"] - } - - for _, cfg := range pb.config.ProtocolsList { - info := struct { - Type string `config:"type" validate:"required"` - }{} - - if err := cfg.Unpack(&info); err != nil { - return nil, err - } - - if info.Type != "icmp" { - continue - } - - if icmp != nil { - return nil, errors.New("More then one icmp configurations found") - } - - icmp = cfg - } - - return icmp, nil +// Called by the Beat stop function +func (pb *packetbeat) Stop() { + logp.Info("Packetbeat send stop signal") + close(pb.done) } diff --git a/packetbeat/beater/processor.go b/packetbeat/beater/processor.go new file mode 100644 index 000000000000..d6aafbb1a7fc --- /dev/null +++ b/packetbeat/beater/processor.go @@ -0,0 +1,163 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "fmt" + "sync" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/publisher/pipeline" + + "github.com/elastic/beats/v7/packetbeat/config" + "github.com/elastic/beats/v7/packetbeat/flows" + "github.com/elastic/beats/v7/packetbeat/procs" + "github.com/elastic/beats/v7/packetbeat/protos" + "github.com/elastic/beats/v7/packetbeat/publish" + "github.com/elastic/beats/v7/packetbeat/sniffer" +) + +type processor struct { + wg sync.WaitGroup + publisher *publish.TransactionPublisher + flows *flows.Flows + sniffer *sniffer.Sniffer + shutdownTimeout time.Duration + err chan error +} + +func newProcessor(shutdownTimeout time.Duration, publisher *publish.TransactionPublisher, flows *flows.Flows, sniffer *sniffer.Sniffer, err chan error) *processor { + return &processor{ + publisher: publisher, + flows: flows, + sniffer: sniffer, + err: err, + shutdownTimeout: shutdownTimeout, + } +} + +func (p *processor) String() string { + return "packetbeat.processor" +} + +func (p *processor) Start() { + if p.flows != nil { + p.flows.Start() + } + p.wg.Add(1) + go func() { + defer p.wg.Done() + + err := p.sniffer.Run() + if err != nil { + p.err <- fmt.Errorf("sniffer loop failed: %v", err) + return + } + p.err <- nil + }() +} + +func (p *processor) Stop() { + p.sniffer.Stop() + if p.flows != nil { + p.flows.Stop() + } + p.wg.Wait() + // wait for shutdownTimeout to let the publisher flush + // whatever pending events + if p.shutdownTimeout > 0 { + time.Sleep(p.shutdownTimeout) + } + p.publisher.Stop() +} + +type processorFactory struct { + name string + err chan error + beat *beat.Beat + configurator func(*common.Config) (config.Config, error) +} + +func newProcessorFactory(name string, err chan error, beat *beat.Beat, configurator func(*common.Config) (config.Config, error)) *processorFactory { + return &processorFactory{ + name: name, + err: err, + beat: beat, + configurator: configurator, + } +} + +func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *common.Config) (cfgfile.Runner, error) { + config, err := p.configurator(cfg) + if err != nil { + logp.Err("Failed to read the beat config: %v, %v", err, config) + return nil, err + } + + publisher, err := publish.NewTransactionPublisher( + p.beat.Info.Name, + p.beat.Publisher, + config.IgnoreOutgoing, + config.Interfaces.File == "", + ) + if err != nil { + return nil, err + } + + watcher := procs.ProcessesWatcher{} + // Enable the process watcher only if capturing live traffic + if config.Interfaces.File == "" { + err = watcher.Init(config.Procs) + if err != nil { + logp.Critical(err.Error()) + return nil, err + } + } else { + logp.Info("Process watcher disabled when file input is used") + } + + logp.Debug("main", "Initializing protocol plugins") + protocols := protos.NewProtocols() + err = protocols.Init(false, publisher, watcher, config.Protocols, config.ProtocolsList) + if err != nil { + return nil, fmt.Errorf("Initializing protocol analyzers failed: %v", err) + } + flows, err := setupFlows(pipeline, watcher, config) + if err != nil { + return nil, err + } + sniffer, err := setupSniffer(config, protocols, workerFactory(publisher, protocols, watcher, flows, config)) + if err != nil { + return nil, err + } + + return newProcessor(config.ShutdownTimeout, publisher, flows, sniffer, p.err), nil +} + +func (p *processorFactory) CheckConfig(config *common.Config) error { + runner, err := p.Create(pipeline.NewNilPipeline(), config) + if err != nil { + return err + } + runner.Stop() + return nil +} diff --git a/packetbeat/beater/reloader.go b/packetbeat/beater/reloader.go new file mode 100644 index 000000000000..c6925e2fa95f --- /dev/null +++ b/packetbeat/beater/reloader.go @@ -0,0 +1,43 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common/reload" +) + +type reloader struct { + *cfgfile.RunnerList +} + +func newReloader(name string, factory *processorFactory, pipeline beat.PipelineConnector) *reloader { + return &reloader{ + RunnerList: cfgfile.NewRunnerList(name, factory, pipeline), + } +} + +func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { + if len(configs) > maxSniffers { + return fmt.Errorf("only %d inputs are currently supported", maxSniffers) + } + return r.RunnerList.Reload(configs) +} diff --git a/packetbeat/beater/setup.go b/packetbeat/beater/setup.go new file mode 100644 index 000000000000..f8ed8b0aea67 --- /dev/null +++ b/packetbeat/beater/setup.go @@ -0,0 +1,72 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/packetbeat/config" + "github.com/elastic/beats/v7/packetbeat/flows" + "github.com/elastic/beats/v7/packetbeat/procs" + "github.com/elastic/beats/v7/packetbeat/protos" + "github.com/elastic/beats/v7/packetbeat/sniffer" +) + +func setupSniffer(cfg config.Config, protocols *protos.ProtocolsStruct, workerFactory sniffer.WorkerFactory) (*sniffer.Sniffer, error) { + icmp, err := cfg.ICMP() + if err != nil { + return nil, err + } + + filter := cfg.Interfaces.BpfFilter + if filter == "" && !cfg.Flows.IsEnabled() { + filter = protocols.BpfFilter(cfg.Interfaces.WithVlans, icmp.Enabled()) + } + + return sniffer.New(false, filter, workerFactory, cfg.Interfaces) +} + +func setupFlows(pipeline beat.Pipeline, watcher procs.ProcessesWatcher, cfg config.Config) (*flows.Flows, error) { + if !cfg.Flows.IsEnabled() { + return nil, nil + } + + processors, err := processors.New(cfg.Flows.Processors) + if err != nil { + return nil, err + } + + clientConfig := beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + EventMetadata: cfg.Flows.EventMetadata, + Processor: processors, + KeepNull: cfg.Flows.KeepNull, + }, + } + if cfg.Flows.Index != "" { + clientConfig.Processing.Meta = common.MapStr{"index": cfg.Flows.Index} + } + + client, err := pipeline.ConnectWith(clientConfig) + if err != nil { + return nil, err + } + + return flows.NewFlows(client.PublishAll, watcher, cfg.Flows) +} diff --git a/packetbeat/beater/worker.go b/packetbeat/beater/worker.go new file mode 100644 index 000000000000..5dd6a5144541 --- /dev/null +++ b/packetbeat/beater/worker.go @@ -0,0 +1,75 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "github.com/tsg/gopacket/layers" + + "github.com/elastic/beats/v7/packetbeat/config" + "github.com/elastic/beats/v7/packetbeat/decoder" + "github.com/elastic/beats/v7/packetbeat/flows" + "github.com/elastic/beats/v7/packetbeat/procs" + "github.com/elastic/beats/v7/packetbeat/protos" + "github.com/elastic/beats/v7/packetbeat/protos/icmp" + "github.com/elastic/beats/v7/packetbeat/protos/tcp" + "github.com/elastic/beats/v7/packetbeat/protos/udp" + "github.com/elastic/beats/v7/packetbeat/publish" + "github.com/elastic/beats/v7/packetbeat/sniffer" +) + +func workerFactory(publisher *publish.TransactionPublisher, protocols *protos.ProtocolsStruct, watcher procs.ProcessesWatcher, flows *flows.Flows, cfg config.Config) func(dl layers.LinkType) (sniffer.Worker, error) { + return func(dl layers.LinkType) (sniffer.Worker, error) { + var icmp4 icmp.ICMPv4Processor + var icmp6 icmp.ICMPv6Processor + config, err := cfg.ICMP() + if err != nil { + return nil, err + } + if config.Enabled() { + reporter, err := publisher.CreateReporter(config) + if err != nil { + return nil, err + } + + icmp, err := icmp.New(false, reporter, watcher, config) + if err != nil { + return nil, err + } + + icmp4 = icmp + icmp6 = icmp + } + + tcp, err := tcp.NewTCP(protocols) + if err != nil { + return nil, err + } + + udp, err := udp.NewUDP(protocols) + if err != nil { + return nil, err + } + + worker, err := decoder.New(flows, dl, icmp4, icmp6, tcp, udp) + if err != nil { + return nil, err + } + + return worker, nil + } +} diff --git a/packetbeat/config/agent.go b/packetbeat/config/agent.go new file mode 100644 index 000000000000..30f64630d44a --- /dev/null +++ b/packetbeat/config/agent.go @@ -0,0 +1,139 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package config + +import ( + "fmt" + "runtime" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/go-ucfg" +) + +type datastream struct { + Namespace string `config:"namespace"` + Dataset string `config:"dataset"` + Type string `config:"type"` +} + +type agentInput struct { + Type string `config:"type"` + Datastream datastream `config:"data_stream"` + Processors []common.MapStr `config:"processors"` + Streams []map[string]interface{} `config:"streams"` +} + +var osDefaultDevices = map[string]string{ + "darwin": "en0", + "linux": "any", +} + +func defaultDevice() string { + if device, found := osDefaultDevices[runtime.GOOS]; found { + return device + } + return "0" +} + +func (i agentInput) addProcessorsAndIndex(cfg *common.Config) (*common.Config, error) { + namespace := i.Datastream.Namespace + if namespace == "" { + namespace = "default" + } + datastreamConfig := struct { + Datastream datastream `config:"data_stream"` + }{} + if err := cfg.Unpack(&datastreamConfig); err != nil { + return nil, err + } + mergeConfig, err := common.NewConfigFrom(common.MapStr{ + "index": datastreamConfig.Datastream.Type + "-" + datastreamConfig.Datastream.Dataset + "-" + namespace, + "processors": append([]common.MapStr{ + common.MapStr{ + "add_fields": common.MapStr{ + "target": "data_stream", + "fields": common.MapStr{ + "type": datastreamConfig.Datastream.Type, + "dataset": datastreamConfig.Datastream.Dataset, + "namespace": namespace, + }, + }, + }, + common.MapStr{ + "add_fields": common.MapStr{ + "target": "event", + "fields": common.MapStr{ + "dataset": datastreamConfig.Datastream.Dataset, + }, + }, + }, + }, i.Processors...), + }) + if err != nil { + return nil, err + } + if err := cfg.MergeWithOpts(mergeConfig, ucfg.FieldAppendValues("processors")); err != nil { + return nil, err + } + return cfg, nil +} + +// NewAgentConfig allows the packetbeat configuration to understand +// agent semantics +func NewAgentConfig(cfg *common.Config) (Config, error) { + logp.Debug("agent", "Normalizing agent configuration") + var input agentInput + config := Config{ + Interfaces: InterfacesConfig{ + // TODO: make this configurable rather than just using the default device + Device: defaultDevice(), + }, + } + if err := cfg.Unpack(&input); err != nil { + return config, err + } + + logp.Debug("agent", fmt.Sprintf("Found %d inputs", len(input.Streams))) + for _, stream := range input.Streams { + if rawStreamType, ok := stream["type"]; ok { + streamType, ok := rawStreamType.(string) + if !ok { + return config, fmt.Errorf("invalid input type of: '%T'", rawStreamType) + } + logp.Debug("agent", fmt.Sprintf("Found agent configuration for %v", streamType)) + cfg, err := common.NewConfigFrom(stream) + if err != nil { + return config, err + } + cfg, err = input.addProcessorsAndIndex(cfg) + if err != nil { + return config, err + } + switch streamType { + case "flow": + if err := cfg.Unpack(&config.Flows); err != nil { + return config, err + } + default: + config.ProtocolsList = append(config.ProtocolsList, cfg) + } + } + } + return config, nil +} diff --git a/packetbeat/config/agent_test.go b/packetbeat/config/agent_test.go new file mode 100644 index 000000000000..2612423e0cb5 --- /dev/null +++ b/packetbeat/config/agent_test.go @@ -0,0 +1,64 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package config + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestAgentInputNormalization(t *testing.T) { + cfg, err := common.NewConfigFrom(` +type: packet +data_stream: + namespace: default +processors: + - add_fields: + target: 'elastic_agent' + fields: + id: agent-id + version: 8.0.0 + snapshot: false +streams: + - type: flow + timeout: 10s + period: 10s + keep_null: false + data_stream: + dataset: packet.flow + type: logs + - type: icmp + data_stream: + dataset: packet.icmp + type: logs +`) + require.NoError(t, err) + config, err := NewAgentConfig(cfg) + require.NoError(t, err) + + require.Equal(t, config.Flows.Timeout, "10s") + require.Equal(t, config.Flows.Index, "logs-packet.flow-default") + require.Len(t, config.ProtocolsList, 1) + + var protocol map[string]interface{} + require.NoError(t, config.ProtocolsList[0].Unpack(&protocol)) + require.Len(t, protocol["processors"].([]interface{}), 3) +} diff --git a/packetbeat/config/config.go b/packetbeat/config/config.go index 893e4828ab8f..9d3818d4aa4d 100644 --- a/packetbeat/config/config.go +++ b/packetbeat/config/config.go @@ -18,6 +18,7 @@ package config import ( + "errors" "time" "github.com/elastic/beats/v7/libbeat/common" @@ -35,6 +36,44 @@ type Config struct { ShutdownTimeout time.Duration `config:"shutdown_timeout"` } +// FromStatic initializes a configuration given a common.Config +func (c Config) FromStatic(cfg *common.Config) (Config, error) { + err := cfg.Unpack(&c) + if err != nil { + return c, err + } + return c, nil +} + +// ICMP returns the ICMP configuration +func (c Config) ICMP() (*common.Config, error) { + var icmp *common.Config + if c.Protocols["icmp"].Enabled() { + icmp = c.Protocols["icmp"] + } + + for _, cfg := range c.ProtocolsList { + info := struct { + Type string `config:"type" validate:"required"` + }{} + + if err := cfg.Unpack(&info); err != nil { + return nil, err + } + + if info.Type != "icmp" { + continue + } + + if icmp != nil { + return nil, errors.New("more than one icmp configuration found") + } + + icmp = cfg + } + return icmp, nil +} + type InterfacesConfig struct { Device string `config:"device"` Type string `config:"type"` @@ -57,6 +96,8 @@ type Flows struct { EventMetadata common.EventMetadata `config:",inline"` Processors processors.PluginConfig `config:"processors"` KeepNull bool `config:"keep_null"` + // Index is used to overwrite the index where flows are published + Index string `config:"index"` } type ProtocolCommon struct { diff --git a/packetbeat/docs/packetbeat-options.asciidoc b/packetbeat/docs/packetbeat-options.asciidoc index 32d9c4730540..d3777c594c4f 100644 --- a/packetbeat/docs/packetbeat-options.asciidoc +++ b/packetbeat/docs/packetbeat-options.asciidoc @@ -431,6 +431,11 @@ processors in your config. If this option is set to true, fields with `null` values will be published in the output document. By default, `keep_null` is set to `false`. +[float] +==== `index` + +Overrides the index that flow events are published to. + [[configuration-protocols]] == Configure which transaction protocols to monitor @@ -554,6 +559,12 @@ custom fields as top-level fields, set the `fields_under_root` option to true. If a duplicate field is declared in the general configuration, then its value will be overwritten by the value declared here. +[float] +[[packetbeat-configuration-index]] +==== `index` + +Overrides the index that events for the given protocol are published to. + [source,yaml] -------------------------------------------------------------------------------- packetbeat.protocols: diff --git a/packetbeat/flows/flows.go b/packetbeat/flows/flows.go index fb292b92bf2e..d58a2c45987f 100644 --- a/packetbeat/flows/flows.go +++ b/packetbeat/flows/flows.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/packetbeat/config" + "github.com/elastic/beats/v7/packetbeat/procs" ) type Flows struct { @@ -41,7 +42,7 @@ const ( defaultPeriod = 10 * time.Second ) -func NewFlows(pub Reporter, config *config.Flows) (*Flows, error) { +func NewFlows(pub Reporter, watcher procs.ProcessesWatcher, config *config.Flows) (*Flows, error) { duration := func(s string, d time.Duration) (time.Duration, error) { if s == "" { return d, nil @@ -67,7 +68,7 @@ func NewFlows(pub Reporter, config *config.Flows) (*Flows, error) { counter := &counterReg{} - worker, err := newFlowsWorker(pub, table, counter, timeout, period) + worker, err := newFlowsWorker(pub, watcher, table, counter, timeout, period) if err != nil { logp.Err("failed to configure flows processing intervals: %v", err) return nil, err diff --git a/packetbeat/flows/flows_test.go b/packetbeat/flows/flows_test.go index b1c3a5b83b93..e56b37773749 100644 --- a/packetbeat/flows/flows_test.go +++ b/packetbeat/flows/flows_test.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/packetbeat/config" + "github.com/elastic/beats/v7/packetbeat/procs" ) type flowsChan struct { @@ -50,7 +51,7 @@ func TestFlowsCounting(t *testing.T) { port1 := []byte{0, 1} port2 := []byte{0, 2} - module, err := NewFlows(nil, &config.Flows{}) + module, err := NewFlows(nil, procs.ProcessesWatcher{}, &config.Flows{}) assert.NoError(t, err) uint1, err := module.NewUint("uint1") diff --git a/packetbeat/flows/worker.go b/packetbeat/flows/worker.go index 564458017812..2a9ca482ed31 100644 --- a/packetbeat/flows/worker.go +++ b/packetbeat/flows/worker.go @@ -32,6 +32,7 @@ import ( type flowsProcessor struct { spool spool + watcher procs.ProcessesWatcher table *flowMetaTable counters *counterReg timeout time.Duration @@ -44,6 +45,7 @@ var ( func newFlowsWorker( pub Reporter, + watcher procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period time.Duration, @@ -84,6 +86,7 @@ func newFlowsWorker( defaultBatchSize := 1024 processor := &flowsProcessor{ table: table, + watcher: watcher, counters: counters, timeout: timeout, } @@ -194,13 +197,14 @@ func (fw *flowsProcessor) report( isOver bool, intNames, uintNames, floatNames []string, ) { - event := createEvent(ts, flow, isOver, intNames, uintNames, floatNames) + event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames) debugf("add event: %v", event) fw.spool.publish(event) } func createEvent( + watcher procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string, @@ -386,7 +390,7 @@ func createEvent( // Set process information if it's available if tuple.IPLength != 0 && tuple.SrcPort != 0 { - if proc := procs.ProcWatcher.FindProcessesTuple(&tuple, proto); proc != nil { + if proc := watcher.FindProcessesTuple(&tuple, proto); proc != nil { if proc.Src.PID > 0 { p := common.MapStr{ "pid": proc.Src.PID, diff --git a/packetbeat/flows/worker_test.go b/packetbeat/flows/worker_test.go index 15cef57cc250..3bec75f2fe31 100644 --- a/packetbeat/flows/worker_test.go +++ b/packetbeat/flows/worker_test.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" ) var ( @@ -66,7 +67,7 @@ func TestCreateEvent(t *testing.T) { } bif.stats[0] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{10, 1}} bif.stats[1] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{460, 2}} - event := createEvent(time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil) + event := createEvent(procs.ProcessesWatcher{}, time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil) // Validate the contents of the event. validate := lookslike.MustCompile(map[string]interface{}{ diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 3abfb9d4d61a..42888c1ec536 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -63,6 +63,9 @@ packetbeat.flows: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where flow events are indexed. + #index: my-custom-flow-index + # =========================== Transaction protocols ============================ packetbeat.protocols: @@ -73,6 +76,9 @@ packetbeat.protocols: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where this protocol's events are indexed. + #index: my-custom-icmp-index + - type: amqp # Enable AMQP monitoring. Default: true #enabled: true @@ -113,6 +119,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-amqp-index + - type: cassandra #Cassandra port for traffic monitoring. ports: [9042] @@ -143,6 +152,9 @@ packetbeat.protocols: # This option indicates which Operator/Operators will be ignored. #ignored_ops: ["SUPPORTED","OPTIONS"] + # Overrides where this protocol's events are indexed. + #index: my-custom-cassandra-index + - type: dhcpv4 # Configure the DHCP for IPv4 ports. ports: [67, 68] @@ -183,6 +195,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-dhcpv4-index + - type: http # Enable HTTP monitoring. Default: true #enabled: true @@ -257,6 +272,9 @@ packetbeat.protocols: # be trimmed to this size. Default is 10 MB. #max_message_size: 10485760 + # Overrides where this protocol's events are indexed. + #index: my-custom-http-index + - type: memcache # Enable memcache monitoring. Default: true #enabled: true @@ -309,6 +327,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-memcache-index + - type: mysql # Enable mysql monitoring. Default: true #enabled: true @@ -332,6 +353,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-mysql-index + - type: pgsql # Enable pgsql monitoring. Default: true #enabled: true @@ -355,6 +379,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-pgsql-index + - type: redis # Enable redis monitoring. Default: true #enabled: true @@ -387,6 +414,9 @@ packetbeat.protocols: # large enough to allow for pipelining. #queue_max_messages: 20000 + # Overrides where this protocol's events are indexed. + #index: my-custom-redis-index + - type: thrift # Enable thrift monitoring. Default: true #enabled: true @@ -445,6 +475,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-thrift-index + - type: mongodb # Enable mongodb monitoring. Default: true #enabled: true @@ -478,6 +511,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-mongodb-index + - type: nfs # Enable NFS monitoring. Default: true #enabled: true @@ -501,6 +537,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-nfs-index + - type: tls # Enable TLS monitoring. Default: true #enabled: true @@ -531,6 +570,9 @@ packetbeat.protocols: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where this protocol's events are indexed. + #index: my-custom-tls-index + - type: sip # Configure the ports where to listen for SIP traffic. You can disable the SIP protocol by commenting out the list of ports. ports: [5060] @@ -544,6 +586,9 @@ packetbeat.protocols: # Preserve original contents in event.original keep_original: true + # Overrides where this protocol's events are indexed. + #index: my-custom-sip-index + # ============================ Monitored processes ============================= # Packetbeat can enrich events with information about the process associated diff --git a/packetbeat/procs/procs.go b/packetbeat/procs/procs.go index dfead47d93cb..bf3daab9ff24 100644 --- a/packetbeat/procs/procs.go +++ b/packetbeat/procs/procs.go @@ -83,8 +83,6 @@ type ProcessesWatcher struct { impl processWatcherImpl } -var ProcWatcher ProcessesWatcher - func (proc *ProcessesWatcher) Init(config ProcsConfig) error { return proc.initWithImpl(config, proc) } diff --git a/packetbeat/protos/amqp/amqp.go b/packetbeat/protos/amqp/amqp.go index c361c3e7fe69..1113d4ee6dfb 100644 --- a/packetbeat/protos/amqp/amqp.go +++ b/packetbeat/protos/amqp/amqp.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/packetbeat/pb" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" ) @@ -47,6 +48,7 @@ type amqpPlugin struct { transactions *common.Cache transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher //map containing functions associated with different method numbers methodMap map[codeClass]map[codeMethod]amqpMethod @@ -64,6 +66,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &amqpPlugin{} @@ -74,13 +77,13 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (amqp *amqpPlugin) init(results protos.Reporter, config *amqpConfig) error { +func (amqp *amqpPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *amqpConfig) error { amqp.initMethodMap() amqp.setFromConfig(config) @@ -92,6 +95,7 @@ func (amqp *amqpPlugin) init(results protos.Reporter, config *amqpConfig) error protos.DefaultTransactionHashSize) amqp.transactions.StartJanitor(amqp.transactionTimeout) amqp.results = results + amqp.watcher = watcher return nil } diff --git a/packetbeat/protos/amqp/amqp_parser.go b/packetbeat/protos/amqp/amqp_parser.go index eeaaf2a94646..6ab15ec81591 100644 --- a/packetbeat/protos/amqp/amqp_parser.go +++ b/packetbeat/protos/amqp/amqp_parser.go @@ -23,7 +23,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/packetbeat/procs" ) func (amqp *amqpPlugin) amqpMessageParser(s *amqpStream) (ok bool, complete bool) { @@ -336,7 +335,7 @@ func (amqp *amqpPlugin) handleAmqp(m *amqpMessage, tcptuple *common.TCPTuple, di debugf("A message is ready to be handled") m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = amqp.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.method == "basic.publish" { amqp.handlePublishing(m) diff --git a/packetbeat/protos/amqp/amqp_test.go b/packetbeat/protos/amqp/amqp_test.go index 37be71c571d3..19002b609415 100644 --- a/packetbeat/protos/amqp/amqp_test.go +++ b/packetbeat/protos/amqp/amqp_test.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -45,7 +46,7 @@ func amqpModForTests() (*eventStore, *amqpPlugin) { var amqp amqpPlugin results := &eventStore{} config := defaultConfig - amqp.init(results.publish, &config) + amqp.init(results.publish, procs.ProcessesWatcher{}, &config) return results, &amqp } diff --git a/packetbeat/protos/cassandra/cassandra.go b/packetbeat/protos/cassandra/cassandra.go index ed0f48e91a44..7d3001a51595 100644 --- a/packetbeat/protos/cassandra/cassandra.go +++ b/packetbeat/protos/cassandra/cassandra.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" @@ -34,6 +35,7 @@ type cassandra struct { ports protos.PortsConfig parserConfig parserConfig transConfig transactionConfig + watcher procs.ProcessesWatcher pub transPub } @@ -60,6 +62,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &cassandra{} @@ -70,17 +73,18 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (cassandra *cassandra) init(results protos.Reporter, config *cassandraConfig) error { +func (cassandra *cassandra) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *cassandraConfig) error { if err := cassandra.setFromConfig(config); err != nil { return err } cassandra.pub.results = results + cassandra.watcher = watcher return nil } @@ -193,7 +197,7 @@ func (cassandra *cassandra) ensureConnection(private protos.ProtocolData) *conne conn := getConnection(private) if conn == nil { conn = &connection{} - conn.trans.init(&cassandra.transConfig, cassandra.pub.onTransaction) + conn.trans.init(&cassandra.transConfig, cassandra.watcher, cassandra.pub.onTransaction) } return conn } diff --git a/packetbeat/protos/cassandra/trans.go b/packetbeat/protos/cassandra/trans.go index 62d36ee36955..9b055d22c884 100644 --- a/packetbeat/protos/cassandra/trans.go +++ b/packetbeat/protos/cassandra/trans.go @@ -33,6 +33,8 @@ type transactions struct { responses messageList onTransaction transactionHandler + + watcher procs.ProcessesWatcher } type transactionConfig struct { @@ -46,8 +48,9 @@ type messageList struct { head, tail *message } -func (trans *transactions) init(c *transactionConfig, cb transactionHandler) { +func (trans *transactions) init(c *transactionConfig, watcher procs.ProcessesWatcher, cb transactionHandler) { trans.config = c + trans.watcher = watcher trans.onTransaction = cb } @@ -59,7 +62,7 @@ func (trans *transactions) onMessage( var err error msg.Tuple = *tuple msg.Transport = applayer.TransportTCP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(&msg.Tuple) + msg.CmdlineTuple = trans.watcher.FindProcessesTupleTCP(&msg.Tuple) if msg.IsRequest { if isDebug { diff --git a/packetbeat/protos/dhcpv4/dhcpv4.go b/packetbeat/protos/dhcpv4/dhcpv4.go index 10d299aea769..323f46f2a545 100644 --- a/packetbeat/protos/dhcpv4/dhcpv4.go +++ b/packetbeat/protos/dhcpv4/dhcpv4.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/packetbeat/pb" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/ecs/code/go/ecs" ) @@ -45,12 +46,13 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { - return newPlugin(testMode, results, cfg) + return newPlugin(testMode, results, watcher, cfg) } -func newPlugin(testMode bool, results protos.Reporter, cfg *common.Config) (*dhcpv4Plugin, error) { +func newPlugin(testMode bool, results protos.Reporter, watcher procs.ProcessesWatcher, cfg *common.Config) (*dhcpv4Plugin, error) { config := defaultConfig if !testMode { @@ -62,14 +64,16 @@ func newPlugin(testMode bool, results protos.Reporter, cfg *common.Config) (*dhc return &dhcpv4Plugin{ dhcpv4Config: config, report: results, + watcher: watcher, log: logp.NewLogger("dhcpv4"), }, nil } type dhcpv4Plugin struct { dhcpv4Config - report protos.Reporter - log *logp.Logger + report protos.Reporter + watcher procs.ProcessesWatcher + log *logp.Logger } func (p *dhcpv4Plugin) GetPorts() []int { diff --git a/packetbeat/protos/dhcpv4/dhcpv4_test.go b/packetbeat/protos/dhcpv4/dhcpv4_test.go index 1f7d416248a4..e695ecf00cf9 100644 --- a/packetbeat/protos/dhcpv4/dhcpv4_test.go +++ b/packetbeat/protos/dhcpv4/dhcpv4_test.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -81,7 +82,7 @@ var ( func TestParseDHCPRequest(t *testing.T) { logp.TestingSetup() - p, err := newPlugin(true, nil, nil) + p, err := newPlugin(true, nil, procs.ProcessesWatcher{}, nil) if err != nil { t.Fatal(err) } @@ -165,7 +166,7 @@ func TestParseDHCPRequest(t *testing.T) { } func TestParseDHCPACK(t *testing.T) { - p, err := newPlugin(true, nil, nil) + p, err := newPlugin(true, nil, procs.ProcessesWatcher{}, nil) if err != nil { t.Fatal(err) } diff --git a/packetbeat/protos/dns/dns.go b/packetbeat/protos/dns/dns.go index 8fbf402b6b45..15aa154276fc 100644 --- a/packetbeat/protos/dns/dns.go +++ b/packetbeat/protos/dns/dns.go @@ -38,6 +38,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/packetbeat/pb" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) @@ -55,6 +56,7 @@ type dnsPlugin struct { transactionTimeout time.Duration results protos.Reporter // Channel where results are pushed. + watcher procs.ProcessesWatcher } var ( @@ -220,6 +222,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &dnsPlugin{} @@ -230,13 +233,13 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (dns *dnsPlugin) init(results protos.Reporter, config *dnsConfig) error { +func (dns *dnsPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *dnsConfig) error { dns.setFromConfig(config) dns.transactions = common.NewCacheWithRemovalListener( dns.transactionTimeout, @@ -252,6 +255,7 @@ func (dns *dnsPlugin) init(results protos.Reporter, config *dnsConfig) error { dns.transactions.StartJanitor(dns.transactionTimeout) dns.results = results + dns.watcher = watcher return nil } diff --git a/packetbeat/protos/dns/dns_tcp.go b/packetbeat/protos/dns/dns_tcp.go index 310cf43553e3..bbf7e7369260 100644 --- a/packetbeat/protos/dns/dns_tcp.go +++ b/packetbeat/protos/dns/dns_tcp.go @@ -23,7 +23,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" @@ -150,7 +149,7 @@ func (dns *dnsPlugin) handleDNS(conn *dnsConnectionData, tcpTuple *common.TCPTup message := conn.data[dir].message dnsTuple := dnsTupleFromIPPort(&message.tuple, transportTCP, decodedData.Id) - message.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcpTuple.IPPort()) + message.cmdlineTuple = dns.watcher.FindProcessesTupleTCP(tcpTuple.IPPort()) message.data = decodedData message.length += decodeOffset diff --git a/packetbeat/protos/dns/dns_test.go b/packetbeat/protos/dns/dns_test.go index c5ee52eb5ebd..17303783fca7 100644 --- a/packetbeat/protos/dns/dns_test.go +++ b/packetbeat/protos/dns/dns_test.go @@ -34,6 +34,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -111,7 +112,7 @@ func newDNS(store *eventStore, verbose bool) *dnsPlugin { "send_request": true, "send_response": true, }) - dns, err := New(false, callback, cfg) + dns, err := New(false, callback, procs.ProcessesWatcher{}, cfg) if err != nil { panic(err) } diff --git a/packetbeat/protos/dns/dns_udp.go b/packetbeat/protos/dns/dns_udp.go index 652e03bb7174..c1a22c7536f5 100644 --- a/packetbeat/protos/dns/dns_udp.go +++ b/packetbeat/protos/dns/dns_udp.go @@ -20,7 +20,6 @@ package dns import ( "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) @@ -47,7 +46,7 @@ func (dns *dnsPlugin) ParseUDP(pkt *protos.Packet) { dnsMsg := &dnsMessage{ ts: pkt.Ts, tuple: pkt.Tuple, - cmdlineTuple: procs.ProcWatcher.FindProcessesTupleUDP(&pkt.Tuple), + cmdlineTuple: dns.watcher.FindProcessesTupleUDP(&pkt.Tuple), data: dnsPkt, length: packetSize, } diff --git a/packetbeat/protos/http/http.go b/packetbeat/protos/http/http.go index 4b2367c02397..3dd7484822ef 100644 --- a/packetbeat/protos/http/http.go +++ b/packetbeat/protos/http/http.go @@ -97,6 +97,7 @@ type httpPlugin struct { transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher } var ( @@ -111,6 +112,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &httpPlugin{} @@ -121,19 +123,20 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } // Init initializes the HTTP protocol analyser. -func (http *httpPlugin) init(results protos.Reporter, config *httpConfig) error { +func (http *httpPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *httpConfig) error { http.setFromConfig(config) isDebug = logp.IsDebug("http") isDetailed = logp.IsDebug("httpdetailed") http.results = results + http.watcher = watcher return nil } @@ -435,7 +438,7 @@ func (http *httpPlugin) handleHTTP( m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = http.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) http.hideHeaders(m) if m.isRequest { diff --git a/packetbeat/protos/http/http_test.go b/packetbeat/protos/http/http_test.go index 2e2995ff4638..53f552148512 100644 --- a/packetbeat/protos/http/http_test.go +++ b/packetbeat/protos/http/http_test.go @@ -33,6 +33,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -88,7 +89,7 @@ func httpModForTests(store *eventStore) *httpPlugin { callback = store.publish } - http, err := New(false, callback, common.NewConfig()) + http, err := New(false, callback, procs.ProcessesWatcher{}, common.NewConfig()) if err != nil { panic(err) } diff --git a/packetbeat/protos/icmp/icmp.go b/packetbeat/protos/icmp/icmp.go index 6fb210fd8713..f86dd291886c 100644 --- a/packetbeat/protos/icmp/icmp.go +++ b/packetbeat/protos/icmp/icmp.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/packetbeat/flows" "github.com/elastic/beats/v7/packetbeat/pb" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/tsg/gopacket/layers" @@ -45,6 +46,7 @@ type icmpPlugin struct { transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher } type ICMPv4Processor interface { @@ -74,7 +76,7 @@ var ( duplicateRequests = monitoring.NewInt(nil, "icmp.duplicate_requests") ) -func New(testMode bool, results protos.Reporter, cfg *common.Config) (*icmpPlugin, error) { +func New(testMode bool, results protos.Reporter, watcher procs.ProcessesWatcher, cfg *common.Config) (*icmpPlugin, error) { p := &icmpPlugin{} config := defaultConfig if !testMode { @@ -83,13 +85,13 @@ func New(testMode bool, results protos.Reporter, cfg *common.Config) (*icmpPlugi } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (icmp *icmpPlugin) init(results protos.Reporter, config *icmpConfig) error { +func (icmp *icmpPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *icmpConfig) error { icmp.setFromConfig(config) var err error @@ -112,6 +114,7 @@ func (icmp *icmpPlugin) init(results protos.Reporter, config *icmpConfig) error icmp.transactions.StartJanitor(icmp.transactionTimeout) icmp.results = results + icmp.watcher = watcher return nil } diff --git a/packetbeat/protos/icmp/icmp_test.go b/packetbeat/protos/icmp/icmp_test.go index 3ad537fa7d4a..fc9508fbcdc9 100644 --- a/packetbeat/protos/icmp/icmp_test.go +++ b/packetbeat/protos/icmp/icmp_test.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/tsg/gopacket" @@ -60,7 +61,7 @@ func TestIcmpDirection(t *testing.T) { func BenchmarkIcmpProcessICMPv4(b *testing.B) { logp.TestingSetup(logp.WithSelectors("icmp", "icmpdetailed")) - icmp, err := New(true, func(beat.Event) {}, common.NewConfig()) + icmp, err := New(true, func(beat.Event) {}, procs.ProcessesWatcher{}, common.NewConfig()) if err != nil { b.Error("Failed to create ICMP processor") return diff --git a/packetbeat/protos/memcache/memcache.go b/packetbeat/protos/memcache/memcache.go index e59550287a56..39bfccd255ad 100644 --- a/packetbeat/protos/memcache/memcache.go +++ b/packetbeat/protos/memcache/memcache.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/applayer" ) @@ -38,6 +39,7 @@ import ( type memcache struct { ports protos.PortsConfig results protos.Reporter + watcher procs.ProcessesWatcher config parserConfig udpMemcache @@ -131,6 +133,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &memcache{} @@ -141,14 +144,14 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } // Called to initialize the Plugin -func (mc *memcache) init(results protos.Reporter, config *memcacheConfig) error { +func (mc *memcache) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *memcacheConfig) error { debug("init memcache plugin") mc.handler = mc @@ -158,6 +161,7 @@ func (mc *memcache) init(results protos.Reporter, config *memcacheConfig) error mc.udpConnections = make(map[common.HashableIPPortTuple]*udpConnection) mc.results = results + mc.watcher = watcher return nil } diff --git a/packetbeat/protos/memcache/memcache_test.go b/packetbeat/protos/memcache/memcache_test.go index b36483770c1f..641dec070e16 100644 --- a/packetbeat/protos/memcache/memcache_test.go +++ b/packetbeat/protos/memcache/memcache_test.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/packetbeat/procs" ) type memcacheTest struct { @@ -36,7 +37,7 @@ type memcacheTest struct { func newMemcacheTest(config memcacheConfig) *memcacheTest { mct := &memcacheTest{} mc := &memcache{} - mc.init(nil, &config) + mc.init(nil, procs.ProcessesWatcher{}, &config) mc.handler = mct mct.mc = mc return mct diff --git a/packetbeat/protos/memcache/plugin_tcp.go b/packetbeat/protos/memcache/plugin_tcp.go index e9dded17dd65..830a0cd64a55 100644 --- a/packetbeat/protos/memcache/plugin_tcp.go +++ b/packetbeat/protos/memcache/plugin_tcp.go @@ -25,7 +25,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/applayer" "github.com/elastic/beats/v7/packetbeat/protos/tcp" @@ -191,7 +190,7 @@ func (mc *memcache) onTCPMessage( ) error { msg.Tuple = *tuple msg.Transport = applayer.TransportTCP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tuple) + msg.CmdlineTuple = mc.watcher.FindProcessesTupleTCP(tuple) if msg.IsRequest { return mc.onTCPRequest(conn, tuple, dir, msg) diff --git a/packetbeat/protos/memcache/plugin_udp.go b/packetbeat/protos/memcache/plugin_udp.go index 850c6e421fb2..441b286a49e4 100644 --- a/packetbeat/protos/memcache/plugin_udp.go +++ b/packetbeat/protos/memcache/plugin_udp.go @@ -27,7 +27,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common/streambuf" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/applayer" ) @@ -184,7 +183,7 @@ func (mc *memcache) onUDPMessage( } msg.Tuple = *tuple msg.Transport = applayer.TransportUDP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTupleUDP(tuple) + msg.CmdlineTuple = mc.watcher.FindProcessesTupleUDP(tuple) done := false var err error diff --git a/packetbeat/protos/mongodb/mongodb.go b/packetbeat/protos/mongodb/mongodb.go index ac3e66dca5ee..28a9350840ee 100644 --- a/packetbeat/protos/mongodb/mongodb.go +++ b/packetbeat/protos/mongodb/mongodb.go @@ -47,6 +47,7 @@ type mongodbPlugin struct { transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher } type transactionKey struct { @@ -65,6 +66,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &mongodbPlugin{} @@ -75,13 +77,13 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (mongodb *mongodbPlugin) init(results protos.Reporter, config *mongodbConfig) error { +func (mongodb *mongodbPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *mongodbConfig) error { debugf("Init a MongoDB protocol parser") mongodb.setFromConfig(config) @@ -94,6 +96,7 @@ func (mongodb *mongodbPlugin) init(results protos.Reporter, config *mongodbConfi protos.DefaultTransactionHashSize) mongodb.responses.StartJanitor(mongodb.transactionTimeout) mongodb.results = results + mongodb.watcher = watcher return nil } @@ -218,7 +221,7 @@ func (mongodb *mongodbPlugin) handleMongodb( m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = mongodb.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.isResponse { debugf("MongoDB response message") diff --git a/packetbeat/protos/mongodb/mongodb_test.go b/packetbeat/protos/mongodb/mongodb_test.go index 639a2ee7e780..2debae92dff0 100644 --- a/packetbeat/protos/mongodb/mongodb_test.go +++ b/packetbeat/protos/mongodb/mongodb_test.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) @@ -46,7 +47,7 @@ func mongodbModForTests() (*eventStore, *mongodbPlugin) { var mongodb mongodbPlugin results := &eventStore{} config := defaultConfig - mongodb.init(results.publish, &config) + mongodb.init(results.publish, procs.ProcessesWatcher{}, &config) return results, &mongodb } diff --git a/packetbeat/protos/mysql/mysql.go b/packetbeat/protos/mysql/mysql.go index 4d08debf9767..506b6c30ca8e 100644 --- a/packetbeat/protos/mysql/mysql.go +++ b/packetbeat/protos/mysql/mysql.go @@ -158,6 +158,7 @@ type mysqlPlugin struct { prepareStatementTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher // function pointer for mocking handleMysql func(mysql *mysqlPlugin, m *mysqlMessage, tcp *common.TCPTuple, @@ -171,6 +172,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &mysqlPlugin{} @@ -181,13 +183,13 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (mysql *mysqlPlugin) init(results protos.Reporter, config *mysqlConfig) error { +func (mysql *mysqlPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *mysqlConfig) error { mysql.setFromConfig(config) mysql.transactions = common.NewCache( @@ -203,6 +205,7 @@ func (mysql *mysqlPlugin) init(results protos.Reporter, config *mysqlConfig) err mysql.handleMysql = handleMysql mysql.results = results + mysql.watcher = watcher return nil } @@ -651,7 +654,7 @@ func handleMysql(mysql *mysqlPlugin, m *mysqlMessage, tcptuple *common.TCPTuple, m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = mysql.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) m.raw = rawMsg if m.isRequest { diff --git a/packetbeat/protos/mysql/mysql_test.go b/packetbeat/protos/mysql/mysql_test.go index d55917114eec..4e9123c56173 100644 --- a/packetbeat/protos/mysql/mysql_test.go +++ b/packetbeat/protos/mysql/mysql_test.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" "github.com/elastic/beats/v7/packetbeat/publish" @@ -60,7 +61,7 @@ func mysqlModForTests(store *eventStore) *mysqlPlugin { var mysql mysqlPlugin config := defaultConfig config.Ports = []int{serverPort} - mysql.init(callback, &config) + mysql.init(callback, procs.ProcessesWatcher{}, &config) return &mysql } diff --git a/packetbeat/protos/nfs/rpc.go b/packetbeat/protos/nfs/rpc.go index f115cf19fba0..9cde7ab5aac8 100644 --- a/packetbeat/protos/nfs/rpc.go +++ b/packetbeat/protos/nfs/rpc.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" ) @@ -70,6 +71,7 @@ func init() { func New( testMode bool, results protos.Reporter, + _ procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &rpc{} diff --git a/packetbeat/protos/pgsql/pgsql.go b/packetbeat/protos/pgsql/pgsql.go index 69bfb4688879..5ad6f6e305a8 100644 --- a/packetbeat/protos/pgsql/pgsql.go +++ b/packetbeat/protos/pgsql/pgsql.go @@ -49,6 +49,7 @@ type pgsqlPlugin struct { transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher // function pointer for mocking handlePgsql func(pgsql *pgsqlPlugin, m *pgsqlMessage, tcp *common.TCPTuple, @@ -140,6 +141,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &pgsqlPlugin{} @@ -150,13 +152,13 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (pgsql *pgsqlPlugin) init(results protos.Reporter, config *pgsqlConfig) error { +func (pgsql *pgsqlPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *pgsqlConfig) error { pgsql.setFromConfig(config) pgsql.log = logp.NewLogger("pgsql") @@ -170,6 +172,7 @@ func (pgsql *pgsqlPlugin) init(results protos.Reporter, config *pgsqlConfig) err pgsql.transactions.StartJanitor(pgsql.transactionTimeout) pgsql.handlePgsql = handlePgsql pgsql.results = results + pgsql.watcher = watcher return nil } @@ -379,7 +382,7 @@ var handlePgsql = func(pgsql *pgsqlPlugin, m *pgsqlMessage, tcptuple *common.TCP m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = pgsql.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.isRequest { pgsql.receivedPgsqlRequest(m) diff --git a/packetbeat/protos/pgsql/pgsql_test.go b/packetbeat/protos/pgsql/pgsql_test.go index db735c64a5db..737f2905a917 100644 --- a/packetbeat/protos/pgsql/pgsql_test.go +++ b/packetbeat/protos/pgsql/pgsql_test.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -56,7 +57,7 @@ func pgsqlModForTests(store *eventStore) *pgsqlPlugin { var pgsql pgsqlPlugin config := defaultConfig - pgsql.init(callback, &config) + pgsql.init(callback, procs.ProcessesWatcher{}, &config) return &pgsql } diff --git a/packetbeat/protos/protos.go b/packetbeat/protos/protos.go index 9991458eb2b8..e0343a0ee87c 100644 --- a/packetbeat/protos/protos.go +++ b/packetbeat/protos/protos.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" ) const ( @@ -92,11 +93,12 @@ type ProtocolsStruct struct { udp map[Protocol]UDPPlugin } -// Singleton of Protocols type. -var Protos = ProtocolsStruct{ - all: map[Protocol]protocolInstance{}, - tcp: map[Protocol]TCPPlugin{}, - udp: map[Protocol]UDPPlugin{}, +func NewProtocols() *ProtocolsStruct { + return &ProtocolsStruct{ + all: map[Protocol]protocolInstance{}, + tcp: map[Protocol]TCPPlugin{}, + udp: map[Protocol]UDPPlugin{}, + } } type protocolInstance struct { @@ -111,6 +113,7 @@ type reporterFactory interface { func (s ProtocolsStruct) Init( testMode bool, pub reporterFactory, + watcher procs.ProcessesWatcher, configs map[string]*common.Config, listConfigs []*common.Config, ) error { @@ -123,7 +126,7 @@ func (s ProtocolsStruct) Init( } for name, config := range configs { - if err := s.configureProtocol(testMode, pub, name, config); err != nil { + if err := s.configureProtocol(testMode, pub, watcher, name, config); err != nil { return err } } @@ -136,7 +139,7 @@ func (s ProtocolsStruct) Init( return err } - if err := s.configureProtocol(testMode, pub, module.Name, config); err != nil { + if err := s.configureProtocol(testMode, pub, watcher, module.Name, config); err != nil { return err } } @@ -147,6 +150,7 @@ func (s ProtocolsStruct) Init( func (s ProtocolsStruct) configureProtocol( testMode bool, pub reporterFactory, + watcher procs.ProcessesWatcher, name string, config *common.Config, ) error { @@ -182,7 +186,7 @@ func (s ProtocolsStruct) configureProtocol( } } - inst, err := plugin(testMode, results, config) + inst, err := plugin(testMode, results, watcher, config) if err != nil { logp.Err("Failed to register protocol plugin: %v", err) return err diff --git a/packetbeat/protos/redis/redis.go b/packetbeat/protos/redis/redis.go index bf23e94836f9..23dd1ad86966 100644 --- a/packetbeat/protos/redis/redis.go +++ b/packetbeat/protos/redis/redis.go @@ -55,6 +55,7 @@ type redisPlugin struct { transactionTimeout time.Duration queueConfig MessageQueueConfig + watcher procs.ProcessesWatcher results protos.Reporter } @@ -75,6 +76,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &redisPlugin{} @@ -85,16 +87,17 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (redis *redisPlugin) init(results protos.Reporter, config *redisConfig) error { +func (redis *redisPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *redisConfig) error { redis.setFromConfig(config) redis.results = results + redis.watcher = watcher isDebug = logp.IsDebug("redis") return nil @@ -247,7 +250,7 @@ func (redis *redisPlugin) handleRedis( ) { m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + m.cmdlineTuple = redis.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.isRequest { // wait for response diff --git a/packetbeat/protos/registry.go b/packetbeat/protos/registry.go index f1fc17b70740..1d1bd2c7b88c 100644 --- a/packetbeat/protos/registry.go +++ b/packetbeat/protos/registry.go @@ -22,11 +22,14 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + + "github.com/elastic/beats/v7/packetbeat/procs" ) type ProtocolPlugin func( testMode bool, results Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (Plugin, error) diff --git a/packetbeat/protos/sip/parser.go b/packetbeat/protos/sip/parser.go index 55e66045e956..7ee5a10bb5b3 100644 --- a/packetbeat/protos/sip/parser.go +++ b/packetbeat/protos/sip/parser.go @@ -87,6 +87,7 @@ const ( ) type parser struct { + watcher procs.ProcessesWatcher } type parsingInfo struct { @@ -117,15 +118,17 @@ var ( nameVia = []byte("via") ) -func newParser() *parser { - return &parser{} +func newParser(watcher procs.ProcessesWatcher) *parser { + return &parser{ + watcher: watcher, + } } func (parser *parser) parse(pi *parsingInfo) (*message, error) { m := &message{ ts: pi.pkt.Ts, ipPortTuple: pi.pkt.Tuple, - cmdlineTuple: procs.ProcWatcher.FindProcessesTupleTCP(&pi.pkt.Tuple), + cmdlineTuple: parser.watcher.FindProcessesTupleTCP(&pi.pkt.Tuple), rawData: pi.data, } for pi.parseOffset < len(pi.data) { diff --git a/packetbeat/protos/sip/plugin.go b/packetbeat/protos/sip/plugin.go index 14b56aeda452..0c28b5eb3a4f 100644 --- a/packetbeat/protos/sip/plugin.go +++ b/packetbeat/protos/sip/plugin.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/packetbeat/pb" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) @@ -46,6 +47,7 @@ type plugin struct { keepOriginal bool results protos.Reporter + watcher procs.ProcessesWatcher } var ( @@ -60,6 +62,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { cfgwarn.Beta("packetbeat SIP protocol is used") @@ -72,19 +75,20 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } // Init initializes the HTTP protocol analyser. -func (p *plugin) init(results protos.Reporter, config *config) error { +func (p *plugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *config) error { p.setFromConfig(config) isDebug = logp.IsDebug("sip") isDetailed = logp.IsDebug("sipdetailed") p.results = results + p.watcher = watcher return nil } @@ -112,7 +116,7 @@ func (p *plugin) doParse(pkt *protos.Packet) error { detailedf("Payload received: [%s]", pkt.Payload) } - parser := newParser() + parser := newParser(p.watcher) pi := newParsingInfo(pkt) m, err := parser.parse(pi) diff --git a/packetbeat/protos/sip/plugin_test.go b/packetbeat/protos/sip/plugin_test.go index d8c09f5b307b..5b09f522aff6 100644 --- a/packetbeat/protos/sip/plugin_test.go +++ b/packetbeat/protos/sip/plugin_test.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) @@ -114,7 +115,7 @@ func TestParseUDP(t *testing.T) { gotEvent = &evt } const data = "INVITE sip:test@10.0.2.15:5060 SIP/2.0\r\nVia: SIP/2.0/UDP 10.0.2.20:5060;branch=z9hG4bK-2187-1-0\r\nFrom: \"DVI4/8000\" ;tag=1\r\nTo: test \r\nCall-ID: 1-2187@10.0.2.20\r\nCSeq: 1 INVITE\r\nContact: sip:sipp@10.0.2.20:5060\r\nMax-Forwards: 70\r\nContent-Type: application/sdp\r\nContent-Length: 123\r\n\r\nv=0\r\no=- 42 42 IN IP4 10.0.2.20\r\ns=-\r\nc=IN IP4 10.0.2.20\r\nt=0 0\r\nm=audio 6000 RTP/AVP 5\r\na=rtpmap:5 DVI4/8000\r\na=recvonly\r\n" - p, _ := New(true, reporter, nil) + p, _ := New(true, reporter, procs.ProcessesWatcher{}, nil) plugin := p.(*plugin) plugin.ParseUDP(&protos.Packet{ Ts: time.Now(), diff --git a/packetbeat/protos/tcp/tcp_test.go b/packetbeat/protos/tcp/tcp_test.go index 092d1f6310cf..c014e870c6b9 100644 --- a/packetbeat/protos/tcp/tcp_test.go +++ b/packetbeat/protos/tcp/tcp_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/stretchr/testify/assert" @@ -44,7 +45,7 @@ var ( ) func init() { - new := func(_ bool, _ protos.Reporter, _ *common.Config) (protos.Plugin, error) { + new := func(_ bool, _ protos.Reporter, _ procs.ProcessesWatcher, _ *common.Config) (protos.Plugin, error) { return &TestProtocol{}, nil } diff --git a/packetbeat/protos/thrift/thrift.go b/packetbeat/protos/thrift/thrift.go index 8c15b9bdf9c6..d9778031d766 100644 --- a/packetbeat/protos/thrift/thrift.go +++ b/packetbeat/protos/thrift/thrift.go @@ -57,6 +57,7 @@ type thriftPlugin struct { publishQueue chan *thriftTransaction results protos.Reporter + watcher procs.ProcessesWatcher idl *thriftIdl } @@ -182,6 +183,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &thriftPlugin{} @@ -192,7 +194,7 @@ func New( } } - if err := p.init(testMode, results, &config); err != nil { + if err := p.init(testMode, results, watcher, &config); err != nil { return nil, err } return p, nil @@ -201,6 +203,7 @@ func New( func (thrift *thriftPlugin) init( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, config *thriftConfig, ) error { thrift.InitDefaults() @@ -218,6 +221,7 @@ func (thrift *thriftPlugin) init( if !testMode { thrift.publishQueue = make(chan *thriftTransaction, 1000) thrift.results = results + thrift.watcher = watcher go thrift.publishTransactions() } @@ -894,7 +898,7 @@ func (thrift *thriftPlugin) messageComplete(tcptuple *common.TCPTuple, dir uint8 // all ok, go to next level stream.message.tcpTuple = *tcptuple stream.message.direction = dir - stream.message.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + stream.message.cmdlineTuple = thrift.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) if stream.message.frameSize == 0 { stream.message.frameSize = uint32(stream.parseOffset - stream.message.start) } diff --git a/packetbeat/protos/thrift/thrift_test.go b/packetbeat/protos/thrift/thrift_test.go index 2c6618bab770..e1eca793e42a 100644 --- a/packetbeat/protos/thrift/thrift_test.go +++ b/packetbeat/protos/thrift/thrift_test.go @@ -26,13 +26,14 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" ) func thriftForTests() *thriftPlugin { t := &thriftPlugin{} config := defaultConfig - t.init(true, nil, &config) + t.init(true, nil, procs.ProcessesWatcher{}, &config) return t } diff --git a/packetbeat/protos/tls/tls.go b/packetbeat/protos/tls/tls.go index 74034c4afaff..e91c78d69b85 100644 --- a/packetbeat/protos/tls/tls.go +++ b/packetbeat/protos/tls/tls.go @@ -60,6 +60,7 @@ type tlsPlugin struct { fingerprints []*FingerprintAlgorithm transactionTimeout time.Duration results protos.Reporter + watcher procs.ProcessesWatcher } var ( @@ -78,6 +79,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &tlsPlugin{} @@ -88,18 +90,19 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func (plugin *tlsPlugin) init(results protos.Reporter, config *tlsConfig) error { +func (plugin *tlsPlugin) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *tlsConfig) error { if err := plugin.setFromConfig(config); err != nil { return err } plugin.results = results + plugin.watcher = watcher isDebug = logp.IsDebug("tls") return nil @@ -178,7 +181,7 @@ func (plugin *tlsPlugin) doParse( st := conn.streams[dir] if st == nil { st = newStream(tcptuple) - st.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) + st.cmdlineTuple = plugin.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) conn.streams[dir] = st } diff --git a/packetbeat/protos/tls/tls_test.go b/packetbeat/protos/tls/tls_test.go index 512294f2d4f1..90aadb07a953 100644 --- a/packetbeat/protos/tls/tls_test.go +++ b/packetbeat/protos/tls/tls_test.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/publish" ) @@ -66,7 +67,7 @@ func testInit() (*eventStore, *tlsPlugin) { logp.TestingSetup(logp.WithSelectors("tls", "tlsdetailed")) results := &eventStore{} - tls, err := New(true, results.publish, nil) + tls, err := New(true, results.publish, procs.ProcessesWatcher{}, nil) if err != nil { return nil, nil } diff --git a/packetbeat/publish/publish.go b/packetbeat/publish/publish.go index 7890a1c173d9..bd904030b598 100644 --- a/packetbeat/publish/publish.go +++ b/packetbeat/publish/publish.go @@ -84,6 +84,7 @@ func (p *TransactionPublisher) CreateReporter( // load and register the module it's fields, tags and processors settings meta := struct { + Index string `config:"index"` Event common.EventMetadata `config:",inline"` Processors processors.PluginConfig `config:"processors"` KeepNull bool `config:"keep_null"` @@ -107,6 +108,9 @@ func (p *TransactionPublisher) CreateReporter( if p.canDrop { clientConfig.PublishMode = beat.DropIfFull } + if meta.Index != "" { + clientConfig.Processing.Meta = common.MapStr{"index": meta.Index} + } client, err := p.pipeline.ConnectWith(clientConfig) if err != nil { diff --git a/packetbeat/scripts/tcp-protocol/{protocol}/trans.go.tmpl b/packetbeat/scripts/tcp-protocol/{protocol}/trans.go.tmpl index 4f7ad362bfed..c25d06f0d657 100644 --- a/packetbeat/scripts/tcp-protocol/{protocol}/trans.go.tmpl +++ b/packetbeat/scripts/tcp-protocol/{protocol}/trans.go.tmpl @@ -16,6 +16,8 @@ type transactions struct { responses messageList onTransaction transactionHandler + + watcher procs.ProcessesWatcher } type transactionConfig struct { @@ -29,8 +31,9 @@ type messageList struct { head, tail *message } -func (trans *transactions) init(c *transactionConfig, cb transactionHandler) { +func (trans *transactions) init(c *transactionConfig, watcher procs.ProcessesWatcher, cb transactionHandler) { trans.config = c + trans.watcher = watcher trans.onTransaction = cb } @@ -43,7 +46,7 @@ func (trans *transactions) onMessage( msg.Tuple = *tuple msg.Transport = applayer.TransportTCP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTuple(&msg.Tuple) + msg.CmdlineTuple = trans.watcher.FindProcessesTuple(&msg.Tuple) if msg.IsRequest { if isDebug { diff --git a/packetbeat/scripts/tcp-protocol/{protocol}/{protocol}.go.tmpl b/packetbeat/scripts/tcp-protocol/{protocol}/{protocol}.go.tmpl index f783e8403018..8af08842fd4e 100644 --- a/packetbeat/scripts/tcp-protocol/{protocol}/{protocol}.go.tmpl +++ b/packetbeat/scripts/tcp-protocol/{protocol}/{protocol}.go.tmpl @@ -6,6 +6,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" ) @@ -15,6 +16,7 @@ type {plugin_type} struct { ports protos.PortsConfig parserConfig parserConfig transConfig transactionConfig + watcher procs.ProcessesWatcher pub transPub } @@ -45,6 +47,7 @@ func init() { func New( testMode bool, results protos.Reporter, + watcher procs.ProcessesWatcher, cfg *common.Config, ) (protos.Plugin, error) { p := &{plugin_type}{} @@ -55,17 +58,18 @@ func New( } } - if err := p.init(results, &config); err != nil { + if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } -func ({plugin_var} *{plugin_type}) init(results protos.Reporter, config *{protocol}Config) error { +func ({plugin_var} *{plugin_type}) init(results protos.Reporter, watcher procs.ProcessesWatcher, config *{protocol}Config) error { if err := {plugin_var}.setFromConfig(config); err != nil { return err } {plugin_var}.pub.results = results + {plugin_var}.watcher = watcher isDebug = logp.IsDebug("http") return nil @@ -162,7 +166,7 @@ func ({plugin_var} *{plugin_type}) ensureConnection(private protos.ProtocolData) conn := getConnection(private) if conn == nil { conn = &connection{} - conn.trans.init(&{plugin_var}.transConfig, {plugin_var}.pub.onTransaction) + conn.trans.init(&{plugin_var}.transConfig, {plugin_var}.watcher, {plugin_var}.pub.onTransaction) } return conn } diff --git a/x-pack/packetbeat/packetbeat.reference.yml b/x-pack/packetbeat/packetbeat.reference.yml index 3abfb9d4d61a..42888c1ec536 100644 --- a/x-pack/packetbeat/packetbeat.reference.yml +++ b/x-pack/packetbeat/packetbeat.reference.yml @@ -63,6 +63,9 @@ packetbeat.flows: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where flow events are indexed. + #index: my-custom-flow-index + # =========================== Transaction protocols ============================ packetbeat.protocols: @@ -73,6 +76,9 @@ packetbeat.protocols: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where this protocol's events are indexed. + #index: my-custom-icmp-index + - type: amqp # Enable AMQP monitoring. Default: true #enabled: true @@ -113,6 +119,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-amqp-index + - type: cassandra #Cassandra port for traffic monitoring. ports: [9042] @@ -143,6 +152,9 @@ packetbeat.protocols: # This option indicates which Operator/Operators will be ignored. #ignored_ops: ["SUPPORTED","OPTIONS"] + # Overrides where this protocol's events are indexed. + #index: my-custom-cassandra-index + - type: dhcpv4 # Configure the DHCP for IPv4 ports. ports: [67, 68] @@ -183,6 +195,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-dhcpv4-index + - type: http # Enable HTTP monitoring. Default: true #enabled: true @@ -257,6 +272,9 @@ packetbeat.protocols: # be trimmed to this size. Default is 10 MB. #max_message_size: 10485760 + # Overrides where this protocol's events are indexed. + #index: my-custom-http-index + - type: memcache # Enable memcache monitoring. Default: true #enabled: true @@ -309,6 +327,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-memcache-index + - type: mysql # Enable mysql monitoring. Default: true #enabled: true @@ -332,6 +353,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-mysql-index + - type: pgsql # Enable pgsql monitoring. Default: true #enabled: true @@ -355,6 +379,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-pgsql-index + - type: redis # Enable redis monitoring. Default: true #enabled: true @@ -387,6 +414,9 @@ packetbeat.protocols: # large enough to allow for pipelining. #queue_max_messages: 20000 + # Overrides where this protocol's events are indexed. + #index: my-custom-redis-index + - type: thrift # Enable thrift monitoring. Default: true #enabled: true @@ -445,6 +475,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-thrift-index + - type: mongodb # Enable mongodb monitoring. Default: true #enabled: true @@ -478,6 +511,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-mongodb-index + - type: nfs # Enable NFS monitoring. Default: true #enabled: true @@ -501,6 +537,9 @@ packetbeat.protocols: # incoming responses, but sent to Elasticsearch immediately. #transaction_timeout: 10s + # Overrides where this protocol's events are indexed. + #index: my-custom-nfs-index + - type: tls # Enable TLS monitoring. Default: true #enabled: true @@ -531,6 +570,9 @@ packetbeat.protocols: # Set to true to publish fields with null values in events. #keep_null: false + # Overrides where this protocol's events are indexed. + #index: my-custom-tls-index + - type: sip # Configure the ports where to listen for SIP traffic. You can disable the SIP protocol by commenting out the list of ports. ports: [5060] @@ -544,6 +586,9 @@ packetbeat.protocols: # Preserve original contents in event.original keep_original: true + # Overrides where this protocol's events are indexed. + #index: my-custom-sip-index + # ============================ Monitored processes ============================= # Packetbeat can enrich events with information about the process associated