-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extracting the UDP Server logic from the UDP input #6439
Changes from 5 commits
0a5d06c
fbecae7
af5648d
55aad8c
2057dad
59677a7
78b2d43
d8456e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,26 @@ | ||
package udp | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/elastic/beats/filebeat/harvester" | ||
"github.com/elastic/beats/filebeat/inputsource/udp" | ||
) | ||
|
||
var defaultConfig = config{ | ||
ForwarderConfig: harvester.ForwarderConfig{ | ||
Type: "udp", | ||
}, | ||
MaxMessageSize: 10240, | ||
// TODO: What should be default port? | ||
Host: "localhost:8080", | ||
Config: udp.Config{ | ||
MaxMessageSize: 10240, | ||
// TODO: What should be default port? | ||
Host: "localhost:8080", | ||
// TODO: What should be the default timeout? | ||
Timeout: time.Minute * 5, | ||
}, | ||
} | ||
|
||
type config struct { | ||
udp.Config `config:",inline"` | ||
harvester.ForwarderConfig `config:",inline"` | ||
Host string `config:"host"` | ||
MaxMessageSize int `config:"max_message_size"` | ||
} |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,16 @@ | ||
package udp | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/elastic/beats/filebeat/channel" | ||
"github.com/elastic/beats/filebeat/harvester" | ||
"github.com/elastic/beats/filebeat/input" | ||
"github.com/elastic/beats/filebeat/inputsource/udp" | ||
"github.com/elastic/beats/filebeat/util" | ||
"github.com/elastic/beats/libbeat/beat" | ||
"github.com/elastic/beats/libbeat/common" | ||
"github.com/elastic/beats/libbeat/common/atomic" | ||
"github.com/elastic/beats/libbeat/common/cfgwarn" | ||
"github.com/elastic/beats/libbeat/logp" | ||
) | ||
|
@@ -16,11 +22,11 @@ func init() { | |
} | ||
} | ||
|
||
// Input define a udp input | ||
// Input defines a udp input to receive event on a specific host:port. | ||
type Input struct { | ||
harvester *Harvester | ||
started bool | ||
outlet channel.Outleter | ||
udp *udp.Server | ||
started atomic.Bool | ||
outlet channel.Outleter | ||
} | ||
|
||
// NewInput creates a new udp input | ||
|
@@ -36,33 +42,54 @@ func NewInput( | |
return nil, err | ||
} | ||
|
||
config := defaultConfig | ||
if err = cfg.Unpack(&config); err != nil { | ||
return nil, err | ||
} | ||
|
||
forwarder := harvester.NewForwarder(out) | ||
callback := func(data []byte, metadata udp.Metadata) { | ||
e := util.NewData() | ||
e.Event = beat.Event{ | ||
Timestamp: time.Now(), | ||
Meta: common.MapStr{ | ||
"source": metadata.Source.String(), | ||
"truncated": metadata.Truncated, | ||
}, | ||
Fields: common.MapStr{ | ||
"message": string(data), | ||
}, | ||
} | ||
forwarder.Send(e) | ||
} | ||
|
||
udp := udp.New(&config.Config, callback) | ||
|
||
return &Input{ | ||
outlet: out, | ||
harvester: NewHarvester(forwarder, cfg), | ||
started: false, | ||
outlet: out, | ||
udp: udp, | ||
started: atomic.MakeBool(false), | ||
}, nil | ||
} | ||
|
||
// Run starts and execute the UDP server. | ||
// Run starts and start the UDP server and read events from the socket | ||
func (p *Input) Run() { | ||
if !p.started { | ||
logp.Info("Starting udp input") | ||
p.started = true | ||
go func() { | ||
defer p.outlet.Close() | ||
err := p.harvester.Run() | ||
if err != nil { | ||
logp.Err("Error running harvester:: %v", err) | ||
} | ||
}() | ||
if !p.started.Load() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we talked about this and you were going to open an issue for it. If you did/do can you add a link back to here. There's a race condition problem with using an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, let me switch that to a mutex. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will do the same change in the TCP. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've moved it to a mutex instead and will do a followup issue, I need to get a bit more insight about was is managing the input before proposing something. I did the same changes in the TCP. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
logp.Info("Starting UDP input") | ||
err := p.udp.Start() | ||
if err != nil { | ||
logp.Err("Error running harvester: %v", err) | ||
} | ||
p.started.Swap(true) | ||
} | ||
} | ||
|
||
// Stop stops the UDP input | ||
func (p *Input) Stop() { | ||
logp.Info("stopping UDP input") | ||
p.harvester.Stop() | ||
defer p.outlet.Close() | ||
defer p.started.Swap(false) | ||
logp.Info("Stopping UDP input") | ||
p.udp.Stop() | ||
} | ||
|
||
// Wait suspends the UDP input | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
package inputsource | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a fan of the naming here but as it's not user facing we can figure out the "correct" name when we have multiple use cases. This will make it easier. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. More I think about it.. using source instead of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we call this one source, what should we call this one then? https://github.com/elastic/beats/blob/master/filebeat/harvester/source.go I suggest to get the PR with this name in an iterate on top of it. There will be quite a few more renames I assume. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
package udp | ||
|
||
import ( | ||
"net" | ||
"runtime" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/elastic/beats/libbeat/logp" | ||
) | ||
|
||
const windowErrBuffer = "A message sent on a datagram socket was larger than the internal message" + | ||
" buffer or some other network limit, or the buffer used to receive a datagram into was smaller" + | ||
" than the datagram itself." | ||
|
||
// Metadata contains formations about the packet. | ||
type Metadata struct { | ||
Source net.Addr | ||
Truncated bool | ||
} | ||
|
||
// Config options for the UDPServer | ||
type Config struct { | ||
Host string `config:"host"` | ||
MaxMessageSize int `config:"max_message_size" validate:"positive,nonzero"` | ||
Timeout time.Duration `config:"timeout"` | ||
} | ||
|
||
// Server creates a simple UDP Server and listen to a specific host:port and will send any | ||
// event received to the callback method. | ||
type Server struct { | ||
config *Config | ||
callback func(data []byte, mt Metadata) | ||
Listener net.PacketConn | ||
log *logp.Logger | ||
wg sync.WaitGroup | ||
done chan struct{} | ||
} | ||
|
||
// New returns a new UDPServer instance. | ||
func New(config *Config, callback func(data []byte, mt Metadata)) *Server { | ||
return &Server{ | ||
config: config, | ||
callback: callback, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could create a
Then anywhere within this object that you do logging you would use |
||
log: logp.NewLogger("udp").With("address", config.Host), | ||
done: make(chan struct{}), | ||
} | ||
} | ||
|
||
// Start starts the UDP Server and listen to incoming events. | ||
func (u *Server) Start() error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typically a start method does not block like this does. It will "start" some kind of worker goroutine. This encapsulates the logic required to manage a goroutine which is good so that every user is not re-implementing the same thing. As a barebones example:
In your case the socket would replace the done channel. And There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the long input, really appreciated and it clarify things. |
||
var err error | ||
u.Listener, err = net.ListenPacket("udp", u.config.Host) | ||
if err != nil { | ||
return err | ||
} | ||
u.log.Info("Started listening for UDP connection") | ||
u.wg.Add(1) | ||
go func() { | ||
defer u.wg.Done() | ||
u.run() | ||
}() | ||
return nil | ||
} | ||
|
||
func (u *Server) run() { | ||
for { | ||
select { | ||
case <-u.done: | ||
return | ||
default: | ||
} | ||
|
||
buffer := make([]byte, u.config.MaxMessageSize) | ||
u.Listener.SetDeadline(time.Now().Add(u.config.Timeout)) | ||
|
||
// If you are using Windows and you are using a fixed buffer and you get a datagram which | ||
// is bigger than the specified size of the buffer, it will return an `err` and the buffer will | ||
// contains a subset of the data. | ||
// | ||
// On Unix based system, the buffer will be truncated but no error will be returned. | ||
length, addr, err := u.Listener.ReadFrom(buffer) | ||
if err != nil { | ||
// don't log any deadline events. | ||
e, ok := err.(net.Error) | ||
if ok && e.Timeout() { | ||
continue | ||
} | ||
|
||
u.log.Errorw("Error reading from the socket", "error", err) | ||
|
||
// On Windows send the current buffer and mark it as truncated. | ||
// The buffer will have content but length will return 0, addr will be nil. | ||
if isLargerThanBuffer(err) { | ||
u.callback(buffer, Metadata{Source: addr, Truncated: true}) | ||
continue | ||
} | ||
} | ||
|
||
if length > 0 { | ||
u.callback(buffer[:length], Metadata{Source: addr}) | ||
} | ||
} | ||
} | ||
|
||
// Stop stops the current udp server. | ||
func (u *Server) Stop() { | ||
u.log.Info("Stopping UDP server") | ||
u.Listener.Close() | ||
close(u.done) | ||
u.wg.Wait() | ||
u.log.Info("UDP server stopped") | ||
} | ||
|
||
func isLargerThanBuffer(err error) bool { | ||
if runtime.GOOS != "windows" { | ||
return false | ||
} | ||
return strings.Contains(err.Error(), windowErrBuffer) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps there should not be a default and make the user explicitly state on which port to listen. The logstash udp input does this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with that, but can only do in for 7.0 since this could break existing configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note, the
TODO
was there before, my changes. :)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on the proposal from Andrew. Sorry about the TODO ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the udp input feature listed as experimental or beta? This could give us some leeway to make the change now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets do it in another PR, with a clear intend in the changelog.