Skip to content

Commit

Permalink
Fix socket_listener setting ReadBufferSize on TCP sockets (influxdata…
Browse files Browse the repository at this point in the history
  • Loading branch information
phemmer authored and danielnelson committed Mar 9, 2018
1 parent 0e14e31 commit 07dbbb2
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
- [#1896](https://github.com/influxdata/telegraf/issues/1896): Fix various mysql data type conversions.
- [#3810](https://github.com/influxdata/telegraf/issues/3810): Fix metric buffer limit in internal plugin after reload.
- [#3801](https://github.com/influxdata/telegraf/issues/3801): Fix panic in http_response on invalid regex.
- [#3973](https://github.com/influxdata/telegraf/issues/3873): Fix socket_listener setting ReadBufferSize on tcp sockets.

## v1.5.3 [unreleased]

Expand Down
19 changes: 11 additions & 8 deletions plugins/inputs/socket_listener/socket_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type streamSocketListener struct {
net.Listener
*SocketListener

sockType string

connections map[string]net.Conn
connectionsMtx sync.Mutex
}
Expand All @@ -42,6 +44,14 @@ func (ssl *streamSocketListener) listen() {
break
}

if ssl.ReadBufferSize > 0 {
if srb, ok := c.(setReadBufferer); ok {
srb.SetReadBuffer(ssl.ReadBufferSize)
} else {
log.Printf("W! Unable to set read buffer on a %s socket", ssl.sockType)
}
}

ssl.connectionsMtx.Lock()
if ssl.MaxConnections > 0 && len(ssl.connections) >= ssl.MaxConnections {
ssl.connectionsMtx.Unlock()
Expand Down Expand Up @@ -237,17 +247,10 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
return err
}

if sl.ReadBufferSize > 0 {
if srb, ok := l.(setReadBufferer); ok {
srb.SetReadBuffer(sl.ReadBufferSize)
} else {
log.Printf("W! Unable to set read buffer on a %s socket", spl[0])
}
}

ssl := &streamSocketListener{
Listener: l,
SocketListener: sl,
sockType: spl[0],
}

sl.Closer = ssl
Expand Down
26 changes: 26 additions & 0 deletions plugins/inputs/socket_listener/socket_listener_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package socket_listener

import (
"bytes"
"log"
"net"
"os"
"testing"
Expand All @@ -11,9 +13,24 @@ import (
"github.com/stretchr/testify/require"
)

// testEmptyLog is a helper function to ensure no data is written to log.
// Should be called at the start of the test, and returns a function which should run at the end.
func testEmptyLog(t *testing.T) func() {
buf := bytes.NewBuffer(nil)
log.SetOutput(buf)

return func() {
log.SetOutput(os.Stderr)
assert.Empty(t, string(buf.Bytes()), "log not empty")
}
}

func TestSocketListener_tcp(t *testing.T) {
defer testEmptyLog(t)()

sl := newSocketListener()
sl.ServiceAddress = "tcp://127.0.0.1:0"
sl.ReadBufferSize = 1024

acc := &testutil.Accumulator{}
err := sl.Start(acc)
Expand All @@ -27,8 +44,11 @@ func TestSocketListener_tcp(t *testing.T) {
}

func TestSocketListener_udp(t *testing.T) {
defer testEmptyLog(t)()

sl := newSocketListener()
sl.ServiceAddress = "udp://127.0.0.1:0"
sl.ReadBufferSize = 1024

acc := &testutil.Accumulator{}
err := sl.Start(acc)
Expand All @@ -42,9 +62,12 @@ func TestSocketListener_udp(t *testing.T) {
}

func TestSocketListener_unix(t *testing.T) {
defer testEmptyLog(t)()

os.Create("/tmp/telegraf_test.sock")
sl := newSocketListener()
sl.ServiceAddress = "unix:///tmp/telegraf_test.sock"
sl.ReadBufferSize = 1024

acc := &testutil.Accumulator{}
err := sl.Start(acc)
Expand All @@ -58,9 +81,12 @@ func TestSocketListener_unix(t *testing.T) {
}

func TestSocketListener_unixgram(t *testing.T) {
defer testEmptyLog(t)()

os.Create("/tmp/telegraf_test.sock")
sl := newSocketListener()
sl.ServiceAddress = "unixgram:///tmp/telegraf_test.sock"
sl.ReadBufferSize = 1024

acc := &testutil.Accumulator{}
err := sl.Start(acc)
Expand Down

0 comments on commit 07dbbb2

Please sign in to comment.