Skip to content

Possible panic with socket_listener #6209

Closed
@sgtsquiggs

Description

Relevant telegraf.conf:

[[inputs.socket_listener]]
  service_address = "tcp://:7778"
  data_format = "whatever"

System info:

telegraf 1.11.2 w/ additional plugins

Steps to reproduce:

  1. Start telegraf with socket_listener input using tcp port
  2. Spam the socket_listener (~ millisecond delay between connections)
  3. Send SIGHUP to telegraf during (2)

Expected behavior:

socket_listener does not return from Stop() until all open connections are closed and parsers have parsed

Actual behavior:

telegraf panics because socket_listener's streamSocketListener read() go routine is still processing and adding metrics. The accumulator's metrics channel has been closed and panics when a new metric is added.

Additional info:

This was difficult to reproduce; you need a parser that isn't blazingly fast (protobuf in my case) so that you can squeeze a parse in between read() and close().

I wrote a quick fix that mimics the google PubSub input plugin below. I am concerned about blocking Stop(); perhaps a cancellable context could improve.

diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go
index d29cff58..7aafc438 100644
--- a/plugins/inputs/socket_listener/socket_listener.go
+++ b/plugins/inputs/socket_listener/socket_listener.go
@@ -37,6 +37,7 @@ type streamSocketListener struct {
 func (ssl *streamSocketListener) listen() {
 	ssl.connections = map[string]net.Conn{}
 
+	wg := &sync.WaitGroup{}
 	for {
 		c, err := ssl.Accept()
 		if err != nil {
@@ -67,7 +68,11 @@ func (ssl *streamSocketListener) listen() {
 			ssl.AddError(fmt.Errorf("unable to configure keep alive (%s): %s", ssl.ServiceAddress, err))
 		}
 
-		go ssl.read(c)
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			ssl.read(c)
+		}()
 	}
 
 	ssl.connectionsMtx.Lock()
@@ -75,6 +80,8 @@ func (ssl *streamSocketListener) listen() {
 		c.Close()
 	}
 	ssl.connectionsMtx.Unlock()
+
+	wg.Wait()
 }
 
 func (ssl *streamSocketListener) setKeepAlive(c net.Conn) error {
@@ -169,6 +176,8 @@ type SocketListener struct {
 	SocketMode      string             `toml:"socket_mode"`
 	tlsint.ServerConfig
 
+	wg *sync.WaitGroup
+
 	parsers.Parser
 	telegraf.Accumulator
 	io.Closer
@@ -302,7 +311,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
 		}
 
 		sl.Closer = ssl
-		go ssl.listen()
+		sl.wg = &sync.WaitGroup{}
+		sl.wg.Add(1)
+		go func() {
+			defer sl.wg.Done()
+			ssl.listen()
+		}()
 	case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram":
 		pc, err := udpListen(protocol, addr)
 		if err != nil {
@@ -336,7 +350,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
 		}
 
 		sl.Closer = psl
-		go psl.listen()
+		sl.wg = &sync.WaitGroup{}
+		sl.wg.Add(1)
+		go func() {
+			defer sl.wg.Done()
+			psl.listen()
+		}()
 	default:
 		return fmt.Errorf("unknown protocol '%s' in '%s'", protocol, sl.ServiceAddress)
 	}
@@ -378,6 +397,7 @@ func (sl *SocketListener) Stop() {
 		sl.Close()
 		sl.Closer = nil
 	}
+	sl.wg.Wait()
 }
 
 func newSocketListener() *SocketListener {

edited to move wg.Wait() to end of ssl.listen()

Metadata

Assignees

No one assigned

    Labels

    bugunexpected problem or unintended behaviorpanicissue that results in panics from Telegraf

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions