Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extracting the UDP Server logic from the UDP input #6439

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions filebeat/_meta/common.reference.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ filebeat.inputs:
#------------------------------ Udp input --------------------------------
# Experimental: Config options for the udp input
#- type: udp
#enabled: false

# Maximum size of the message received over UDP
#max_message_size: 10240
Expand Down
1 change: 1 addition & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ filebeat.inputs:
#------------------------------ Udp input --------------------------------
# Experimental: Config options for the udp input
#- type: udp
#enabled: false

# Maximum size of the message received over UDP
#max_message_size: 10240
Expand Down
16 changes: 11 additions & 5 deletions filebeat/input/udp/config.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
package udp

import (
"time"

"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/inputsource/udp"
)

var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "udp",
},
MaxMessageSize: 10240,
// TODO: What should be default port?
Host: "localhost:8080",
Config: udp.Config{
MaxMessageSize: 10240,
// TODO: What should be default port?
Host: "localhost:8080",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps there should not be a default and make the user explicitly state on which port to listen. The logstash udp input does this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with that, but can only do in for 7.0 since this could break existing configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, the TODO was there before, my changes. :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on the proposal from Andrew. Sorry about the TODO ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the udp input feature listed as experimental or beta? This could give us some leeway to make the change now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets do it in another PR, with a clear intend in the changelog.

// TODO: What should be the default timeout?
Timeout: time.Minute * 5,
},
}

type config struct {
udp.Config `config:",inline"`
harvester.ForwarderConfig `config:",inline"`
Host string `config:"host"`
MaxMessageSize int `config:"max_message_size"`
}
74 changes: 0 additions & 74 deletions filebeat/input/udp/harvester.go

This file was deleted.

70 changes: 52 additions & 18 deletions filebeat/input/udp/input.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package udp

import (
"sync"
"time"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/inputsource/udp"
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
Expand All @@ -16,11 +22,12 @@ func init() {
}
}

// Input define a udp input
// Input defines a udp input to receive event on a specific host:port.
type Input struct {
harvester *Harvester
started bool
outlet channel.Outleter
sync.Mutex
udp *udp.Server
started bool
outlet channel.Outleter
}

// NewInput creates a new udp input
Expand All @@ -36,33 +43,60 @@ func NewInput(
return nil, err
}

config := defaultConfig
if err = cfg.Unpack(&config); err != nil {
return nil, err
}

forwarder := harvester.NewForwarder(out)
callback := func(data []byte, metadata udp.Metadata) {
e := util.NewData()
e.Event = beat.Event{
Timestamp: time.Now(),
Meta: common.MapStr{
"truncated": metadata.Truncated,
},
Fields: common.MapStr{
"message": string(data),
"source": metadata.RemoteAddr.String(),
},
}
forwarder.Send(e)
}

udp := udp.New(&config.Config, callback)

return &Input{
outlet: out,
harvester: NewHarvester(forwarder, cfg),
started: false,
outlet: out,
udp: udp,
started: false,
}, nil
}

// Run starts and execute the UDP server.
// Run starts and start the UDP server and read events from the socket
func (p *Input) Run() {
p.Lock()
defer p.Unlock()

if !p.started {
logp.Info("Starting udp input")
logp.Info("Starting UDP input")
err := p.udp.Start()
if err != nil {
logp.Err("Error running harvester: %v", err)
}
p.started = true
go func() {
defer p.outlet.Close()
err := p.harvester.Run()
if err != nil {
logp.Err("Error running harvester:: %v", err)
}
}()
}
}

// Stop stops the UDP input
func (p *Input) Stop() {
logp.Info("stopping UDP input")
p.harvester.Stop()
defer p.outlet.Close()
p.Lock()
defer p.Unlock()

logp.Info("Stopping UDP input")
p.udp.Stop()
p.started = false
}

// Wait suspends the UDP input
Expand Down
1 change: 1 addition & 0 deletions filebeat/inputsource/inputsource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package inputsource
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan of the naming here but as it's not user facing we can figure out the "correct" name when we have multiple use cases. This will make it easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More I think about it.. using source instead of inputsource would make sense. @ruflin @andrewkroh

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we call this one source, what should we call this one then? https://github.com/elastic/beats/blob/master/filebeat/harvester/source.go

I suggest to get the PR with this name in an iterate on top of it. There will be quite a few more renames I assume.

121 changes: 121 additions & 0 deletions filebeat/inputsource/udp/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package udp

import (
"net"
"runtime"
"strings"
"sync"
"time"

"github.com/elastic/beats/libbeat/logp"
)

const windowErrBuffer = "A message sent on a datagram socket was larger than the internal message" +
" buffer or some other network limit, or the buffer used to receive a datagram into was smaller" +
" than the datagram itself."

// Metadata contains formations about the packet.
type Metadata struct {
RemoteAddr net.Addr
Truncated bool
}

// Config options for the UDPServer
type Config struct {
Host string `config:"host"`
MaxMessageSize int `config:"max_message_size" validate:"positive,nonzero"`
Timeout time.Duration `config:"timeout"`
}

// Server creates a simple UDP Server and listen to a specific host:port and will send any
// event received to the callback method.
type Server struct {
config *Config
callback func(data []byte, mt Metadata)
Listener net.PacketConn
log *logp.Logger
wg sync.WaitGroup
done chan struct{}
}

// New returns a new UDPServer instance.
func New(config *Config, callback func(data []byte, mt Metadata)) *Server {
return &Server{
config: config,
callback: callback,
log: logp.NewLogger("udp").With("address", config.Host),
done: make(chan struct{}),
}
}

// Start starts the UDP Server and listen to incoming events.
func (u *Server) Start() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically a start method does not block like this does. It will "start" some kind of worker goroutine. This encapsulates the logic required to manage a goroutine which is good so that every user is not re-implementing the same thing. As a barebones example:

func (x *Thing) Start() {
    x.wg.Add(1)
    go func() {
        defer r.wg.Done()
        x.run()
    }() 
}

func (x *Thing) Stop() {
    close(x.done) 
    x.wg.Wait()
}

func (x *Thing) run() {
    for {
        select {
        case <-r.done:
            return
        default:
        }

        // Do a unit work.
    }
}

In your case the socket would replace the done channel. And Start would initialize listening socket before starting the worker goroutine. This way Start can return an error if it can't "listen" and when start returns the Server is guaranteed to be in the "listening" state (which might remove the need to have IsRunning).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the long input, really appreciated and it clarify things.

var err error
u.Listener, err = net.ListenPacket("udp", u.config.Host)
if err != nil {
return err
}
u.log.Info("Started listening for UDP connection")
u.wg.Add(1)
go func() {
defer u.wg.Done()
u.run()
}()
return nil
}

func (u *Server) run() {
for {
select {
case <-u.done:
return
default:
}

buffer := make([]byte, u.config.MaxMessageSize)
u.Listener.SetDeadline(time.Now().Add(u.config.Timeout))

// If you are using Windows and you are using a fixed buffer and you get a datagram which
// is bigger than the specified size of the buffer, it will return an `err` and the buffer will
// contains a subset of the data.
//
// On Unix based system, the buffer will be truncated but no error will be returned.
length, addr, err := u.Listener.ReadFrom(buffer)
if err != nil {
// don't log any deadline events.
e, ok := err.(net.Error)
if ok && e.Timeout() {
continue
}

u.log.Errorw("Error reading from the socket", "error", err)

// On Windows send the current buffer and mark it as truncated.
// The buffer will have content but length will return 0, addr will be nil.
if isLargerThanBuffer(err) {
u.callback(buffer, Metadata{RemoteAddr: addr, Truncated: true})
continue
}
}

if length > 0 {
u.callback(buffer[:length], Metadata{RemoteAddr: addr})
}
}
}

// Stop stops the current udp server.
func (u *Server) Stop() {
u.log.Info("Stopping UDP server")
u.Listener.Close()
close(u.done)
u.wg.Wait()
u.log.Info("UDP server stopped")
}

func isLargerThanBuffer(err error) bool {
if runtime.GOOS != "windows" {
return false
}
return strings.Contains(err.Error(), windowErrBuffer)
}
Loading