Skip to content

Commit

Permalink
Extracting the UDP Server logic from the UDP input (#6439)
Browse files Browse the repository at this point in the history
This commit extract the UDP Server logic outside of the UDP input, this
will to reuse this component and the configuration for the syslog input.
It now uses a callback instead of a forwarder instance.

Ref: #6361
  • Loading branch information
ph authored and ruflin committed Apr 5, 2018
1 parent 7a73968 commit 3efbb74
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 98 deletions.
1 change: 1 addition & 0 deletions filebeat/_meta/common.reference.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ filebeat.inputs:
#------------------------------ Udp input --------------------------------
# Experimental: Config options for the udp input
#- type: udp
#enabled: false

# Maximum size of the message received over UDP
#max_message_size: 10240
Expand Down
1 change: 1 addition & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ filebeat.inputs:
#------------------------------ Udp input --------------------------------
# Experimental: Config options for the udp input
#- type: udp
#enabled: false

# Maximum size of the message received over UDP
#max_message_size: 10240
Expand Down
16 changes: 11 additions & 5 deletions filebeat/input/udp/config.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
package udp

import (
"time"

"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/inputsource/udp"
)

var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "udp",
},
MaxMessageSize: 10240,
// TODO: What should be default port?
Host: "localhost:8080",
Config: udp.Config{
MaxMessageSize: 10240,
// TODO: What should be default port?
Host: "localhost:8080",
// TODO: What should be the default timeout?
Timeout: time.Minute * 5,
},
}

type config struct {
udp.Config `config:",inline"`
harvester.ForwarderConfig `config:",inline"`
Host string `config:"host"`
MaxMessageSize int `config:"max_message_size"`
}
74 changes: 0 additions & 74 deletions filebeat/input/udp/harvester.go

This file was deleted.

70 changes: 52 additions & 18 deletions filebeat/input/udp/input.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package udp

import (
"sync"
"time"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/inputsource/udp"
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
Expand All @@ -16,11 +22,12 @@ func init() {
}
}

// Input define a udp input
// Input defines a udp input to receive event on a specific host:port.
type Input struct {
harvester *Harvester
started bool
outlet channel.Outleter
sync.Mutex
udp *udp.Server
started bool
outlet channel.Outleter
}

// NewInput creates a new udp input
Expand All @@ -36,33 +43,60 @@ func NewInput(
return nil, err
}

config := defaultConfig
if err = cfg.Unpack(&config); err != nil {
return nil, err
}

forwarder := harvester.NewForwarder(out)
callback := func(data []byte, metadata udp.Metadata) {
e := util.NewData()
e.Event = beat.Event{
Timestamp: time.Now(),
Meta: common.MapStr{
"truncated": metadata.Truncated,
},
Fields: common.MapStr{
"message": string(data),
"source": metadata.RemoteAddr.String(),
},
}
forwarder.Send(e)
}

udp := udp.New(&config.Config, callback)

return &Input{
outlet: out,
harvester: NewHarvester(forwarder, cfg),
started: false,
outlet: out,
udp: udp,
started: false,
}, nil
}

// Run starts and execute the UDP server.
// Run starts and start the UDP server and read events from the socket
func (p *Input) Run() {
p.Lock()
defer p.Unlock()

if !p.started {
logp.Info("Starting udp input")
logp.Info("Starting UDP input")
err := p.udp.Start()
if err != nil {
logp.Err("Error running harvester: %v", err)
}
p.started = true
go func() {
defer p.outlet.Close()
err := p.harvester.Run()
if err != nil {
logp.Err("Error running harvester:: %v", err)
}
}()
}
}

// Stop stops the UDP input
func (p *Input) Stop() {
logp.Info("stopping UDP input")
p.harvester.Stop()
defer p.outlet.Close()
p.Lock()
defer p.Unlock()

logp.Info("Stopping UDP input")
p.udp.Stop()
p.started = false
}

// Wait suspends the UDP input
Expand Down
1 change: 1 addition & 0 deletions filebeat/inputsource/inputsource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package inputsource
121 changes: 121 additions & 0 deletions filebeat/inputsource/udp/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package udp

import (
"net"
"runtime"
"strings"
"sync"
"time"

"github.com/elastic/beats/libbeat/logp"
)

const windowErrBuffer = "A message sent on a datagram socket was larger than the internal message" +
" buffer or some other network limit, or the buffer used to receive a datagram into was smaller" +
" than the datagram itself."

// Metadata contains formations about the packet.
type Metadata struct {
RemoteAddr net.Addr
Truncated bool
}

// Config options for the UDPServer
type Config struct {
Host string `config:"host"`
MaxMessageSize int `config:"max_message_size" validate:"positive,nonzero"`
Timeout time.Duration `config:"timeout"`
}

// Server creates a simple UDP Server and listen to a specific host:port and will send any
// event received to the callback method.
type Server struct {
config *Config
callback func(data []byte, mt Metadata)
Listener net.PacketConn
log *logp.Logger
wg sync.WaitGroup
done chan struct{}
}

// New returns a new UDPServer instance.
func New(config *Config, callback func(data []byte, mt Metadata)) *Server {
return &Server{
config: config,
callback: callback,
log: logp.NewLogger("udp").With("address", config.Host),
done: make(chan struct{}),
}
}

// Start starts the UDP Server and listen to incoming events.
func (u *Server) Start() error {
var err error
u.Listener, err = net.ListenPacket("udp", u.config.Host)
if err != nil {
return err
}
u.log.Info("Started listening for UDP connection")
u.wg.Add(1)
go func() {
defer u.wg.Done()
u.run()
}()
return nil
}

func (u *Server) run() {
for {
select {
case <-u.done:
return
default:
}

buffer := make([]byte, u.config.MaxMessageSize)
u.Listener.SetDeadline(time.Now().Add(u.config.Timeout))

// If you are using Windows and you are using a fixed buffer and you get a datagram which
// is bigger than the specified size of the buffer, it will return an `err` and the buffer will
// contains a subset of the data.
//
// On Unix based system, the buffer will be truncated but no error will be returned.
length, addr, err := u.Listener.ReadFrom(buffer)
if err != nil {
// don't log any deadline events.
e, ok := err.(net.Error)
if ok && e.Timeout() {
continue
}

u.log.Errorw("Error reading from the socket", "error", err)

// On Windows send the current buffer and mark it as truncated.
// The buffer will have content but length will return 0, addr will be nil.
if isLargerThanBuffer(err) {
u.callback(buffer, Metadata{RemoteAddr: addr, Truncated: true})
continue
}
}

if length > 0 {
u.callback(buffer[:length], Metadata{RemoteAddr: addr})
}
}
}

// Stop stops the current udp server.
func (u *Server) Stop() {
u.log.Info("Stopping UDP server")
u.Listener.Close()
close(u.done)
u.wg.Wait()
u.log.Info("UDP server stopped")
}

func isLargerThanBuffer(err error) bool {
if runtime.GOOS != "windows" {
return false
}
return strings.Contains(err.Error(), windowErrBuffer)
}
Loading

0 comments on commit 3efbb74

Please sign in to comment.