Skip to content

Commit

Permalink
http_listener_v2: fix panic on close
Browse files Browse the repository at this point in the history
  • Loading branch information
pmalek committed Nov 22, 2021
1 parent 5f9bd0d commit a663a83
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
2 changes: 2 additions & 0 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type MetricMaker interface {
type accumulator struct {
maker MetricMaker
metrics chan<- telegraf.Metric
closed chan struct{}
precision time.Duration
}

Expand All @@ -26,6 +27,7 @@ func NewAccumulator(
acc := accumulator{
maker: maker,
metrics: metrics,
closed: make(chan struct{}),
precision: time.Nanosecond,
}
return &acc
Expand Down
18 changes: 16 additions & 2 deletions plugins/inputs/http_listener_v2/http_listener_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"compress/gzip"
"crypto/subtle"
"crypto/tls"
"errors"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -55,7 +56,8 @@ type HTTPListenerV2 struct {
TimeFunc
Log telegraf.Logger

wg sync.WaitGroup
wg sync.WaitGroup
close chan struct{}

listener net.Listener

Expand Down Expand Up @@ -180,10 +182,14 @@ func (h *HTTPListenerV2) Start(acc telegraf.Accumulator) error {
h.Port = listener.Addr().(*net.TCPAddr).Port

h.wg.Add(1)

go func() {
defer h.wg.Done()
if err := server.Serve(h.listener); err != nil {
h.Log.Errorf("Serve failed: %v", err)
if !errors.Is(err, net.ErrClosed) {
h.Log.Errorf("Serve failed: %v", err)
}
close(h.close)
}
}()

Expand Down Expand Up @@ -213,6 +219,13 @@ func (h *HTTPListenerV2) ServeHTTP(res http.ResponseWriter, req *http.Request) {
}

func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request) {
select {
case <-h.close:
res.WriteHeader(http.StatusGone)
return
default:
}

// Check that the content length is not too large for us to handle.
if req.ContentLength > int64(h.MaxBodySize) {
if err := tooLarge(res); err != nil {
Expand Down Expand Up @@ -393,6 +406,7 @@ func init() {
Paths: []string{"/telegraf"},
Methods: []string{"POST", "PUT"},
DataSource: body,
close: make(chan struct{}),
}
})
}
5 changes: 5 additions & 0 deletions plugins/inputs/http_listener_v2/http_listener_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func newTestHTTPListenerV2() *HTTPListenerV2 {
TimeFunc: time.Now,
MaxBodySize: config.Size(70000),
DataSource: "body",
close: make(chan struct{}),
}
return listener
}
Expand All @@ -78,6 +79,7 @@ func newTestHTTPSListenerV2() *HTTPListenerV2 {
Parser: parser,
ServerConfig: *pki.TLSServerConfig(),
TimeFunc: time.Now,
close: make(chan struct{}),
}

return listener
Expand Down Expand Up @@ -117,6 +119,7 @@ func TestInvalidListenerConfig(t *testing.T) {
TimeFunc: time.Now,
MaxBodySize: config.Size(70000),
DataSource: "body",
close: make(chan struct{}),
}

acc := &testutil.Accumulator{}
Expand Down Expand Up @@ -319,6 +322,7 @@ func TestWriteHTTPExactMaxBodySize(t *testing.T) {
Parser: parser,
MaxBodySize: config.Size(len(hugeMetric)),
TimeFunc: time.Now,
close: make(chan struct{}),
}

acc := &testutil.Accumulator{}
Expand All @@ -342,6 +346,7 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
Parser: parser,
MaxBodySize: config.Size(4096),
TimeFunc: time.Now,
close: make(chan struct{}),
}

acc := &testutil.Accumulator{}
Expand Down

0 comments on commit a663a83

Please sign in to comment.