-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extracting the UDP Server logic from the UDP input #6439
Extracting the UDP Server logic from the UDP input #6439
Conversation
@@ -0,0 +1 @@ | |||
package input_source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use an underscore in package name
@urso I believe this is what we were intending to do for separating the Source and the Input, this is a shot at the UDP input, if that is correct that will unblock me for the syslog Input and will also apply the same strategy for the TCP input in another PR. I am looking for naming of this package, We were thinking about: InputReader, InputStream, InputSource, I don't have a preference :) Also with the extract of the filewatcher/udp and the tcp code, I don't think we need a common interface. |
b342ef3
to
bda4f60
Compare
filebeat/inputsource/input_source.go
Outdated
@@ -0,0 +1 @@ | |||
package input_source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use an underscore in package name
assert.Equal(t, test.expected, message) | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only had python test..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's really great to have these tests in.
filebeat/input/udp/config.go
Outdated
@@ -2,19 +2,21 @@ package udp | |||
|
|||
import ( | |||
"github.com/elastic/beats/filebeat/harvester" | |||
udp "github.com/elastic/beats/filebeat/inputsource/udp" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if your editor adds udp
here?
I did a GoRename on a package, but it didn't update the import..
…On Thu, Feb 22, 2018 at 8:38 AM, Nicolas Ruflin ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In filebeat/input/udp/config.go
<#6439 (comment)>:
> @@ -2,19 +2,21 @@ package udp
import (
"github.com/elastic/beats/filebeat/harvester"
+ udp "github.com/elastic/beats/filebeat/inputsource/udp"
Not sure if your editor adds udp here?
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#6439 (review)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAACgEL-kcslWvkMXcov1MkS4168TyAIks5tXW3vgaJpZM4SOUgd>
.
--
--
ph
Software Engineer
|
@ruflin updated. |
Looks reasonably simple. +1 on adding short-lived go based tests. Not sure if the simple callback interface is good enough for all TCP though. How about passing some 'context' variable to the callback? This could be used to resolve some client metadata for example. Or close connection on parse/message errors -> TCP close client connection, UDP refuse processing messages from client X for some time? |
I think it's still premature to add them, I think the TCP extraction will definitely need it. I would merge it as is and let the requirements evolve that interface. |
Seems to have a hang on the suite, will check it out. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes LGTM. Naming can still be solved later.
I think windows struggles with the udp server tests. Perhaps skip them on windows?
@@ -0,0 +1 @@ | |||
package inputsource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a fan of the naming here but as it's not user facing we can figure out the "correct" name when we have multiple use cases. This will make it easier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More I think about it.. using source instead of inputsource
would make sense. @ruflin @andrewkroh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we call this one source, what should we call this one then? https://github.com/elastic/beats/blob/master/filebeat/harvester/source.go
I suggest to get the PR with this name in an iterate on top of it. There will be quite a few more renames I assume.
assert.Equal(t, test.expected, message) | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's really great to have these tests in.
I will check windows they should be solid.
…--
ph
|
@ph Seems like Windows still has some issues. |
filebeat/inputsource/udp/server.go
Outdated
// Config options for the UDPServer | ||
type Config struct { | ||
Host string `config:"host"` | ||
MaxMessageSize int `config:"max_message_size"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any bounds on these values? If so you should add a validate
tags like validate:"min:0 max:65535"
or validate:"required"
(double check the exact ucfg syntax)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, there was bounds before, but for now we can do the following: required and > 0
, enforcing a top level bound might not be a good idea in a 6.x release.
filebeat/inputsource/udp/server.go
Outdated
type Server struct { | ||
running atomic.Bool | ||
config *Config | ||
callback func(data []byte) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the callback should include the source address (ip/port). This will allow for source metadata to be added to the event. Or in the future it could allow for per source buffering.
filebeat/inputsource/udp/server.go
Outdated
} | ||
|
||
//IsRunning returns true when the UDP server is accepting new data. | ||
func (u *Server) IsRunning() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend having Start()
if you have a Stop()
to keep the interface balanced and remove the responsibility of managing a goroutine from the user.
filebeat/inputsource/udp/server.go
Outdated
return &Server{ | ||
running: atomic.MakeBool(false), | ||
config: config, | ||
callback: callback, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could create a Logger
instance for the server to use.
log: logp.NewLogger("udp").With("address", config.Host),
Then anywhere within this object that you do logging you would use s.log.Xyz()
. This gives you a named logger with context plus gives you the ability to use the structured logging methods if needed.
filebeat/inputsource/udp/server.go
Outdated
buffer := make([]byte, u.config.MaxMessageSize) | ||
|
||
for u.running.Load() { | ||
u.listener.SetDeadline(time.Now().Add(u.config.Timeout)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the deadline only needs to be set once after initialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sadly this is not how it works, SetDeadline
is an absolute time. But it does make sense, let say that you are waiting for a large computation you could give him a bigger deadline than a simple req/response.
// A deadline is an absolute time after which I/O operations
// fail with a timeout (see type Error) instead of
// blocking. The deadline applies to all future and pending
// I/O, not just the immediately following call to ReadFrom or
// WriteTo. After a deadline has been exceeded, the connection
// can be refreshed by setting a deadline in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was incorrectly thinking this was a read timeout with a time.Duration
.
filebeat/inputsource/udp/server.go
Outdated
// Stop stops the current udp server. | ||
func (u *Server) Stop() { | ||
logp.Info("Stopping UDP server on: %s", u.config.Host) | ||
u.running.Swap(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer for Stop
to block until the goroutine exits. This should Close
the socket and wait for run
to exit (via sync.WaitGroup
).
Signaling to stop via a flag means that the socket won't actually be closed until the a packet is received to unblock the recvfrom or a timeout is reached (max 5 minutes).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, you are right we need to get out of the loop and wait until the processing is done:
- Close the Socket.
- Set false, so we are not running (mostly used in tests)
- Block until goroutine exits.
filebeat/inputsource/udp/server.go
Outdated
if err != nil { | ||
// don't log any deadline events. | ||
e, ok := err.(net.Error) | ||
if !ok && !e.Timeout() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic looks incorrect. I think it should be if !ok || !e.Timeout()
, but personally I would probably special case the timeout and continue for that.
if ok && e.Timeout() {
continue
}
u.log.Errorw("Error receiving from socket", "error", err)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes its more clear that way.
Config: udp.Config{ | ||
MaxMessageSize: 10240, | ||
// TODO: What should be default port? | ||
Host: "localhost:8080", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps there should not be a default and make the user explicitly state on which port to listen. The logstash udp input does this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with that, but can only do in for 7.0 since this could break existing configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note, the TODO
was there before, my changes. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on the proposal from Andrew. Sorry about the TODO ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the udp input feature listed as experimental or beta? This could give us some leeway to make the change now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets do it in another PR, with a clear intend in the changelog.
59828be
to
5db8ec2
Compare
I will let it run, but I might disable them for this platform. |
+1 for 7.0 I’ll do the same for TCP
On Tue, Mar 27, 2018 at 7:27 AM Nicolas Ruflin ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In filebeat/input/udp/config.go
<#6439 (comment)>:
> )
var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "udp",
},
- MaxMessageSize: 10240,
- // TODO: What should be default port?
- Host: "localhost:8080",
+ Config: udp.Config{
+ MaxMessageSize: 10240,
+ // TODO: What should be default port?
+ Host: "localhost:8080",
+1 on the proposal from Andrew. Sorry about the TODO ...
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#6439 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAACgPQmL3ekJojfePQUU38jr1yeIEeQks5tiiIegaJpZM4SOUgd>
.
--
ph
|
5db8ec2
to
0a93fc0
Compare
I've updated this PR with comments from @ruflin and @andrewkroh, I've skipped the udp test on windows as suggested by @ruflin. Also there seems to be a flaky test with metricbeats that is not related to this PRs. |
Oh right!
On Tue, Mar 27, 2018 at 4:39 PM Andrew Kroh ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In filebeat/input/udp/config.go
<#6439 (comment)>:
> )
var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "udp",
},
- MaxMessageSize: 10240,
- // TODO: What should be default port?
- Host: "localhost:8080",
+ Config: udp.Config{
+ MaxMessageSize: 10240,
+ // TODO: What should be default port?
+ Host: "localhost:8080",
Is the udp input feature listed as experimental or beta? This could give
us some leeway to make the change now.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#6439 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAACgP7OBDDr_xKIXLjsI7XEdv2yyBswks5tiqN0gaJpZM4SOUgd>
.
--
ph
|
filebeat/input/udp/input.go
Outdated
p.started = true | ||
go func() { | ||
defer p.outlet.Close() | ||
err := p.harvester.Run() | ||
err := p.udp.Start() | ||
if err != nil { | ||
logp.Err("Error running harvester:: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double colon in error message.
} | ||
|
||
// Start starts the UDP Server and listen to incoming events. | ||
func (u *Server) Start() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically a start method does not block like this does. It will "start" some kind of worker goroutine. This encapsulates the logic required to manage a goroutine which is good so that every user is not re-implementing the same thing. As a barebones example:
func (x *Thing) Start() {
x.wg.Add(1)
go func() {
defer r.wg.Done()
x.run()
}()
}
func (x *Thing) Stop() {
close(x.done)
x.wg.Wait()
}
func (x *Thing) run() {
for {
select {
case <-r.done:
return
default:
}
// Do a unit work.
}
}
In your case the socket would replace the done channel. And Start
would initialize listening socket before starting the worker goroutine. This way Start
can return an error if it can't "listen" and when start returns the Server is guaranteed to be in the "listening" state (which might remove the need to have IsRunning
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the long input, really appreciated and it clarify things.
} | ||
|
||
ch := make(chan info) | ||
host := "0.0.0.0:10000" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend listening only on localhost to avoid opening the port up to the world. Secondly, since this is a test that shouldn't fail if 10000 is already in use you can use localhost:0
and it will bind to an ephemeral port. You'll need to expose the local address from the server so you know what port that it is bound to.
If the port in the address parameter is empty or "0", as in "127.0.0.1:" or "[::1]:0", a port number is automatically chosen. The LocalAddr method of PacketConn can be used to discover the chosen port.
|
||
func TestReceiveEventFromUDP(t *testing.T) { | ||
if runtime.GOOS == "window" { | ||
t.Skipf("Skipped on windows") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was the failure that led to this? I'm not a fan of doing this because it could be masking issues with the code or the test.
This commit extract the UDP Server logic outside of the UDP input, this will to reuse this component and the configuration for the syslog input. It now uses a callback instead of a forwarder instance. Ref: #6361
4e357c6
to
2057dad
Compare
There was some subtility in our UDP implementation for some time and there is some intrisinc difference between windows and Linux. If you do If you do a
@andrewkroh conserving the buffer send by windows makes the code a bit ugly I don't think I have other choice dans matching the original messages. we are a bit on our own.. Right now I do track the Windows Reference: |
} else { | ||
assert.NotNil(t, info.mt.Source) | ||
assert.False(t, info.mt.Truncated) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because of the platform differences we have to deal that here. I've decided to just use predicates to route the correct logic. If the tests become more complex it will make sense to use tags to target them.
blocked by discussion on the field of the event in #6700 (comment) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WFG
filebeat/input/udp/input.go
Outdated
logp.Err("Error running harvester:: %v", err) | ||
} | ||
}() | ||
if !p.started.Load() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we talked about this and you were going to open an issue for it. If you did/do can you add a link back to here.
There's a race condition problem with using an atomic.Bool
to protect the underlying state of the server. A mutex needs to be used instead to protect it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, let me switch that to a mutex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will do the same change in the TCP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved it to a mutex instead and will do a followup issue, I need to get a bit more insight about was is managing the input before proposing something. I did the same changes in the TCP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This commit extract the UDP Server logic outside of the UDP input, this
will to reuse this component and the configuration for the syslog input.
It now uses a callback instead of a forwarder instance.
Ref: #6361