Skip to content

Commit

Permalink
windows <3
Browse files Browse the repository at this point in the history
  • Loading branch information
ph committed Mar 29, 2018
1 parent 55aad8c commit 2057dad
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 12 deletions.
7 changes: 5 additions & 2 deletions filebeat/input/udp/input.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package udp

import (
"net"
"time"

"github.com/elastic/beats/filebeat/channel"
Expand Down Expand Up @@ -49,10 +48,14 @@ func NewInput(
}

forwarder := harvester.NewForwarder(out)
callback := func(data []byte, addr net.Addr) {
callback := func(data []byte, metadata udp.Metadata) {
e := util.NewData()
e.Event = beat.Event{
Timestamp: time.Now(),
Meta: common.MapStr{
"source": metadata.Source.String(),
"truncated": metadata.Truncated,
},
Fields: common.MapStr{
"message": string(data),
},
Expand Down
40 changes: 35 additions & 5 deletions filebeat/inputsource/udp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,24 @@ package udp

import (
"net"
"runtime"
"strings"
"sync"
"time"

"github.com/elastic/beats/libbeat/logp"
)

const windowErrBuffer = "A message sent on a datagram socket was larger than the internal message" +
" buffer or some other network limit, or the buffer used to receive a datagram into was smaller" +
" than the datagram itself."

// Metadata contains formations about the packet.
type Metadata struct {
Source net.Addr
Truncated bool
}

// Config options for the UDPServer
type Config struct {
Host string `config:"host"`
Expand All @@ -19,15 +31,15 @@ type Config struct {
// event received to the callback method.
type Server struct {
config *Config
callback func(data []byte, addr net.Addr)
callback func(data []byte, mt Metadata)
Listener net.PacketConn
log *logp.Logger
wg sync.WaitGroup
done chan struct{}
}

// New returns a new UDPServer instance.
func New(config *Config, callback func(data []byte, addr net.Addr)) *Server {
func New(config *Config, callback func(data []byte, mt Metadata)) *Server {
return &Server{
config: config,
callback: callback,
Expand Down Expand Up @@ -62,8 +74,13 @@ func (u *Server) run() {

buffer := make([]byte, u.config.MaxMessageSize)
u.Listener.SetDeadline(time.Now().Add(u.config.Timeout))
length, addr, err := u.Listener.ReadFrom(buffer)

// If you are using Windows and you are using a fixed buffer and you get a datagram which
// is bigger than the specified size of the buffer, it will return an `err` and the buffer will
// contains a subset of the data.
//
// On Unix based system, the buffer will be truncated but no error will be returned.
length, addr, err := u.Listener.ReadFrom(buffer)
if err != nil {
// don't log any deadline events.
e, ok := err.(net.Error)
Expand All @@ -72,11 +89,17 @@ func (u *Server) run() {
}

u.log.Errorw("Error reading from the socket", "error", err)
continue

// On Windows send the current buffer and mark it as truncated.
// The buffer will have content but length will return 0, addr will be nil.
if isLargerThanBuffer(err) {
u.callback(buffer, Metadata{Source: addr, Truncated: true})
continue
}
}

if length > 0 {
u.callback(buffer[:length], addr)
u.callback(buffer[:length], Metadata{Source: addr})
}
}
}
Expand All @@ -89,3 +112,10 @@ func (u *Server) Stop() {
u.wg.Wait()
u.log.Info("UDP server stopped")
}

func isLargerThanBuffer(err error) bool {
if runtime.GOOS != "windows" {
return false
}
return strings.Contains(err.Error(), windowErrBuffer)
}
23 changes: 18 additions & 5 deletions filebeat/inputsource/udp/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package udp

import (
"net"
"runtime"
"testing"
"time"

Expand All @@ -13,7 +14,7 @@ const timeout = time.Second * 15

type info struct {
message []byte
addr net.Addr
mt Metadata
}

func TestReceiveEventFromUDP(t *testing.T) {
Expand All @@ -35,10 +36,10 @@ func TestReceiveEventFromUDP(t *testing.T) {
}

ch := make(chan info)
host := "127.0.0.1:"
host := "localhost:0"
config := &Config{Host: host, MaxMessageSize: maxMessageSize, Timeout: timeout}
fn := func(message []byte, addr net.Addr) {
ch <- info{message: message, addr: addr}
fn := func(message []byte, metadata Metadata) {
ch <- info{message: message, mt: metadata}
}
s := New(config, fn)
err := s.Start()
Expand All @@ -54,13 +55,25 @@ func TestReceiveEventFromUDP(t *testing.T) {
return
}
defer conn.Close()

_, err = conn.Write(test.message)
if !assert.NoError(t, err) {
return
}
info := <-ch
assert.Equal(t, test.expected, info.message)
assert.NotNil(t, info.addr)
if runtime.GOOS == "windows" {
if len(test.expected) < len(test.message) {
assert.Nil(t, info.mt.Source)
assert.True(t, info.mt.Truncated)
} else {
assert.NotNil(t, info.mt.Source)
assert.False(t, info.mt.Truncated)
}
} else {
assert.NotNil(t, info.mt.Source)
assert.False(t, info.mt.Truncated)
}
})
}
}

0 comments on commit 2057dad

Please sign in to comment.