Closed
Description
Relevant telegraf.conf:
[[inputs.socket_listener]]
service_address = "tcp://:7778"
data_format = "whatever"
System info:
telegraf 1.11.2 w/ additional plugins
Steps to reproduce:
- Start telegraf with socket_listener input using tcp port
- Spam the socket_listener (~ millisecond delay between connections)
- Send SIGHUP to telegraf during (2)
Expected behavior:
socket_listener does not return from Stop()
until all open connections are closed and parsers have parsed
Actual behavior:
telegraf panics because socket_listener's streamSocketListener read()
go routine is still processing and adding metrics. The accumulator's metrics channel has been closed and panics when a new metric is added.
Additional info:
This was difficult to reproduce; you need a parser that isn't blazingly fast (protobuf in my case) so that you can squeeze a parse in between read() and close().
I wrote a quick fix that mimics the google PubSub input plugin below. I am concerned about blocking Stop()
; perhaps a cancellable context could improve.
diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go
index d29cff58..7aafc438 100644
--- a/plugins/inputs/socket_listener/socket_listener.go
+++ b/plugins/inputs/socket_listener/socket_listener.go
@@ -37,6 +37,7 @@ type streamSocketListener struct {
func (ssl *streamSocketListener) listen() {
ssl.connections = map[string]net.Conn{}
+ wg := &sync.WaitGroup{}
for {
c, err := ssl.Accept()
if err != nil {
@@ -67,7 +68,11 @@ func (ssl *streamSocketListener) listen() {
ssl.AddError(fmt.Errorf("unable to configure keep alive (%s): %s", ssl.ServiceAddress, err))
}
- go ssl.read(c)
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ ssl.read(c)
+ }()
}
ssl.connectionsMtx.Lock()
@@ -75,6 +80,8 @@ func (ssl *streamSocketListener) listen() {
c.Close()
}
ssl.connectionsMtx.Unlock()
+
+ wg.Wait()
}
func (ssl *streamSocketListener) setKeepAlive(c net.Conn) error {
@@ -169,6 +176,8 @@ type SocketListener struct {
SocketMode string `toml:"socket_mode"`
tlsint.ServerConfig
+ wg *sync.WaitGroup
+
parsers.Parser
telegraf.Accumulator
io.Closer
@@ -302,7 +311,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
}
sl.Closer = ssl
- go ssl.listen()
+ sl.wg = &sync.WaitGroup{}
+ sl.wg.Add(1)
+ go func() {
+ defer sl.wg.Done()
+ ssl.listen()
+ }()
case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram":
pc, err := udpListen(protocol, addr)
if err != nil {
@@ -336,7 +350,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
}
sl.Closer = psl
- go psl.listen()
+ sl.wg = &sync.WaitGroup{}
+ sl.wg.Add(1)
+ go func() {
+ defer sl.wg.Done()
+ psl.listen()
+ }()
default:
return fmt.Errorf("unknown protocol '%s' in '%s'", protocol, sl.ServiceAddress)
}
@@ -378,6 +397,7 @@ func (sl *SocketListener) Stop() {
sl.Close()
sl.Closer = nil
}
+ sl.wg.Wait()
}
func newSocketListener() *SocketListener {
edited to move wg.Wait() to end of ssl.listen()