Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extracting the UDP Server logic from the UDP input #6439

Merged

Conversation

ph
Copy link
Contributor

@ph ph commented Feb 21, 2018

This commit extract the UDP Server logic outside of the UDP input, this
will to reuse this component and the configuration for the syslog input.
It now uses a callback instead of a forwarder instance.

Ref: #6361

@ph ph added Filebeat Filebeat in progress Pull request is currently in progress. labels Feb 21, 2018
@@ -0,0 +1 @@
package input_source

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use an underscore in package name

@ph
Copy link
Contributor Author

ph commented Feb 21, 2018

@urso I believe this is what we were intending to do for separating the Source and the Input, this is a shot at the UDP input, if that is correct that will unblock me for the syslog Input and will also apply the same strategy for the TCP input in another PR.

I am looking for naming of this package, We were thinking about: InputReader, InputStream, InputSource, I don't have a preference :)

Also with the extract of the filewatcher/udp and the tcp code, I don't think we need a common interface.

@ph ph force-pushed the refactor/make-udp-composable-with-other-input branch from b342ef3 to bda4f60 Compare February 21, 2018 21:07
@@ -0,0 +1 @@
package input_source

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use an underscore in package name

@ph ph mentioned this pull request Feb 21, 2018
assert.Equal(t, test.expected, message)
})
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only had python test..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really great to have these tests in.

@@ -2,19 +2,21 @@ package udp

import (
"github.com/elastic/beats/filebeat/harvester"
udp "github.com/elastic/beats/filebeat/inputsource/udp"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if your editor adds udp here?

@ph
Copy link
Contributor Author

ph commented Feb 22, 2018 via email

@ph ph added review and removed in progress Pull request is currently in progress. labels Feb 22, 2018
@ph
Copy link
Contributor Author

ph commented Feb 22, 2018

@ruflin updated.

@urso
Copy link

urso commented Feb 22, 2018

Looks reasonably simple.

+1 on adding short-lived go based tests.

Not sure if the simple callback interface is good enough for all TCP though.

How about passing some 'context' variable to the callback? This could be used to resolve some client metadata for example. Or close connection on parse/message errors -> TCP close client connection, UDP refuse processing messages from client X for some time?

@ph
Copy link
Contributor Author

ph commented Feb 22, 2018

How about passing some 'context' variable to the callback? This could be used to resolve some client metadata for example. Or close connection on parse/message errors -> TCP close client connection, UDP refuse processing messages from client X for some time?

I think it's still premature to add them, I think the TCP extraction will definitely need it. I would merge it as is and let the requirements evolve that interface.

@ph
Copy link
Contributor Author

ph commented Feb 22, 2018

Seems to have a hang on the suite, will check it out.

Copy link
Contributor

@ruflin ruflin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes LGTM. Naming can still be solved later.

I think windows struggles with the udp server tests. Perhaps skip them on windows?

@@ -0,0 +1 @@
package inputsource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan of the naming here but as it's not user facing we can figure out the "correct" name when we have multiple use cases. This will make it easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More I think about it.. using source instead of inputsource would make sense. @ruflin @andrewkroh

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we call this one source, what should we call this one then? https://github.com/elastic/beats/blob/master/filebeat/harvester/source.go

I suggest to get the PR with this name in an iterate on top of it. There will be quite a few more renames I assume.

assert.Equal(t, test.expected, message)
})
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really great to have these tests in.

@ph
Copy link
Contributor Author

ph commented Feb 23, 2018 via email

@ruflin
Copy link
Contributor

ruflin commented Feb 26, 2018

@ph Seems like Windows still has some issues.

// Config options for the UDPServer
type Config struct {
Host string `config:"host"`
MaxMessageSize int `config:"max_message_size"`
Copy link
Member

@andrewkroh andrewkroh Mar 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any bounds on these values? If so you should add a validate tags like validate:"min:0 max:65535" or validate:"required" (double check the exact ucfg syntax)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, there was bounds before, but for now we can do the following: required and > 0, enforcing a top level bound might not be a good idea in a 6.x release.

type Server struct {
running atomic.Bool
config *Config
callback func(data []byte)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the callback should include the source address (ip/port). This will allow for source metadata to be added to the event. Or in the future it could allow for per source buffering.

}

//IsRunning returns true when the UDP server is accepting new data.
func (u *Server) IsRunning() bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend having Start() if you have a Stop() to keep the interface balanced and remove the responsibility of managing a goroutine from the user.

return &Server{
running: atomic.MakeBool(false),
config: config,
callback: callback,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could create a Logger instance for the server to use.

  log: logp.NewLogger("udp").With("address", config.Host),

Then anywhere within this object that you do logging you would use s.log.Xyz(). This gives you a named logger with context plus gives you the ability to use the structured logging methods if needed.

buffer := make([]byte, u.config.MaxMessageSize)

for u.running.Load() {
u.listener.SetDeadline(time.Now().Add(u.config.Timeout))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the deadline only needs to be set once after initialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly this is not how it works, SetDeadline is an absolute time. But it does make sense, let say that you are waiting for a large computation you could give him a bigger deadline than a simple req/response.

	// A deadline is an absolute time after which I/O operations
	// fail with a timeout (see type Error) instead of
	// blocking. The deadline applies to all future and pending
	// I/O, not just the immediately following call to ReadFrom or
	// WriteTo. After a deadline has been exceeded, the connection
	// can be refreshed by setting a deadline in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was incorrectly thinking this was a read timeout with a time.Duration.

// Stop stops the current udp server.
func (u *Server) Stop() {
logp.Info("Stopping UDP server on: %s", u.config.Host)
u.running.Swap(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer for Stop to block until the goroutine exits. This should Close the socket and wait for run to exit (via sync.WaitGroup).

Signaling to stop via a flag means that the socket won't actually be closed until the a packet is received to unblock the recvfrom or a timeout is reached (max 5 minutes).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, you are right we need to get out of the loop and wait until the processing is done:

  1. Close the Socket.
  2. Set false, so we are not running (mostly used in tests)
  3. Block until goroutine exits.

if err != nil {
// don't log any deadline events.
e, ok := err.(net.Error)
if !ok && !e.Timeout() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic looks incorrect. I think it should be if !ok || !e.Timeout(), but personally I would probably special case the timeout and continue for that.

if ok && e.Timeout() {
  continue
}
u.log.Errorw("Error receiving from socket", "error", err)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes its more clear that way.

Config: udp.Config{
MaxMessageSize: 10240,
// TODO: What should be default port?
Host: "localhost:8080",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps there should not be a default and make the user explicitly state on which port to listen. The logstash udp input does this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with that, but can only do in for 7.0 since this could break existing configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, the TODO was there before, my changes. :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on the proposal from Andrew. Sorry about the TODO ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the udp input feature listed as experimental or beta? This could give us some leeway to make the change now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets do it in another PR, with a clear intend in the changelog.

@ph ph force-pushed the refactor/make-udp-composable-with-other-input branch from 59828be to 5db8ec2 Compare March 26, 2018 20:27
@ph
Copy link
Contributor Author

ph commented Mar 26, 2018

I think windows struggles with the udp server tests. Perhaps skip them on windows?

I will let it run, but I might disable them for this platform.

@ph
Copy link
Contributor Author

ph commented Mar 27, 2018 via email

@ph ph force-pushed the refactor/make-udp-composable-with-other-input branch from 5db8ec2 to 0a93fc0 Compare March 27, 2018 14:30
@ph
Copy link
Contributor Author

ph commented Mar 27, 2018

I've updated this PR with comments from @ruflin and @andrewkroh, I've skipped the udp test on windows as suggested by @ruflin.

Also there seems to be a flaky test with metricbeats that is not related to this PRs.

@ph
Copy link
Contributor Author

ph commented Mar 27, 2018 via email

p.started = true
go func() {
defer p.outlet.Close()
err := p.harvester.Run()
err := p.udp.Start()
if err != nil {
logp.Err("Error running harvester:: %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double colon in error message.

}

// Start starts the UDP Server and listen to incoming events.
func (u *Server) Start() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically a start method does not block like this does. It will "start" some kind of worker goroutine. This encapsulates the logic required to manage a goroutine which is good so that every user is not re-implementing the same thing. As a barebones example:

func (x *Thing) Start() {
    x.wg.Add(1)
    go func() {
        defer r.wg.Done()
        x.run()
    }() 
}

func (x *Thing) Stop() {
    close(x.done) 
    x.wg.Wait()
}

func (x *Thing) run() {
    for {
        select {
        case <-r.done:
            return
        default:
        }

        // Do a unit work.
    }
}

In your case the socket would replace the done channel. And Start would initialize listening socket before starting the worker goroutine. This way Start can return an error if it can't "listen" and when start returns the Server is guaranteed to be in the "listening" state (which might remove the need to have IsRunning).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the long input, really appreciated and it clarify things.

}

ch := make(chan info)
host := "0.0.0.0:10000"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend listening only on localhost to avoid opening the port up to the world. Secondly, since this is a test that shouldn't fail if 10000 is already in use you can use localhost:0 and it will bind to an ephemeral port. You'll need to expose the local address from the server so you know what port that it is bound to.

If the port in the address parameter is empty or "0", as in "127.0.0.1:" or "[::1]:0", a port number is automatically chosen. The LocalAddr method of PacketConn can be used to discover the chosen port.

https://golang.org/pkg/net/#ListenPacket


func TestReceiveEventFromUDP(t *testing.T) {
if runtime.GOOS == "window" {
t.Skipf("Skipped on windows")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the failure that led to this? I'm not a fan of doing this because it could be masking issues with the code or the test.

ph added 2 commits March 29, 2018 10:54
This commit extract the UDP Server logic outside of the UDP input, this
will to reuse this component and the configuration for the syslog input.
It now uses a callback instead of a forwarder instance.

Ref: #6361
@ph ph force-pushed the refactor/make-udp-composable-with-other-input branch from 4e357c6 to 2057dad Compare March 29, 2018 14:54
@ph
Copy link
Contributor Author

ph commented Mar 29, 2018

There was some subtility in our UDP implementation for some time and there is some intrisinc difference between windows and Linux.

If you do ReadFrom with UDP with a fixed buffer on Linux and the original datagram is bigger than the read limit from the buffer, no error will be returned, addr will be correctly set, length will be set to the maximun size of the buffer.

If you do a ReadFrom for UDP with a fixed buffer on Windows and the original data is bigger than the datagram received the following will happen:

  • Err will be : A message sent on a datagram socket was larger than the internal message buffer
  • Addr with be NIL
  • length will be set to 0
  • buffer will have the truncated data.

@andrewkroh conserving the buffer send by windows makes the code a bit ugly I don't think I have other choice dans matching the original messages. we are a bit on our own..

Right now I do track the truncated, but its not exposed in the event.

Windows Reference:
https://msdn.microsoft.com/en-us/library/windows/desktop/ms741686(v=vs.85).aspx

} else {
assert.NotNil(t, info.mt.Source)
assert.False(t, info.mt.Truncated)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because of the platform differences we have to deal that here. I've decided to just use predicates to route the correct logic. If the tests become more complex it will make sense to use tags to target them.

@ph ph mentioned this pull request Mar 29, 2018
@ph
Copy link
Contributor Author

ph commented Apr 4, 2018

blocked by discussion on the field of the event in #6700 (comment)

Copy link
Member

@andrewkroh andrewkroh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WFG

logp.Err("Error running harvester:: %v", err)
}
}()
if !p.started.Load() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we talked about this and you were going to open an issue for it. If you did/do can you add a link back to here.

There's a race condition problem with using an atomic.Bool to protect the underlying state of the server. A mutex needs to be used instead to protect it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, let me switch that to a mutex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do the same change in the TCP.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved it to a mutex instead and will do a followup issue, I need to get a bit more insight about was is managing the input before proposing something. I did the same changes in the TCP.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@andrewkroh andrewkroh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants