From a98c553d124a90bc7f89d952a350db52ea1b7ced Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20Ma=C5=82ek?= Date: Thu, 18 Nov 2021 22:39:19 +0100 Subject: [PATCH] http_listener_v2: fix panic on close --- agent/accumulator.go | 2 ++ .../inputs/http_listener_v2/http_listener_v2.go | 14 ++++++++++++-- .../http_listener_v2/http_listener_v2_test.go | 5 +++++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/agent/accumulator.go b/agent/accumulator.go index 3683b6767d47f..cc5bed68d16dd 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -16,6 +16,7 @@ type MetricMaker interface { type accumulator struct { maker MetricMaker metrics chan<- telegraf.Metric + closed chan struct{} precision time.Duration } @@ -26,6 +27,7 @@ func NewAccumulator( acc := accumulator{ maker: maker, metrics: metrics, + closed: make(chan struct{}), precision: time.Nanosecond, } return &acc diff --git a/plugins/inputs/http_listener_v2/http_listener_v2.go b/plugins/inputs/http_listener_v2/http_listener_v2.go index d2a2e5f35214e..5b66cbb0268ff 100644 --- a/plugins/inputs/http_listener_v2/http_listener_v2.go +++ b/plugins/inputs/http_listener_v2/http_listener_v2.go @@ -55,7 +55,8 @@ type HTTPListenerV2 struct { TimeFunc Log telegraf.Logger - wg sync.WaitGroup + wg sync.WaitGroup + close chan struct{} listener net.Listener @@ -180,10 +181,11 @@ func (h *HTTPListenerV2) Start(acc telegraf.Accumulator) error { h.Port = listener.Addr().(*net.TCPAddr).Port h.wg.Add(1) + go func() { defer h.wg.Done() if err := server.Serve(h.listener); err != nil { - h.Log.Errorf("Serve failed: %v", err) + close(h.close) } }() @@ -213,6 +215,13 @@ func (h *HTTPListenerV2) ServeHTTP(res http.ResponseWriter, req *http.Request) { } func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request) { + select { + case <-h.close: + res.WriteHeader(http.StatusGone) + return + default: + } + // Check that the content length is not too large for us to handle. if req.ContentLength > int64(h.MaxBodySize) { if err := tooLarge(res); err != nil { @@ -393,6 +402,7 @@ func init() { Paths: []string{"/telegraf"}, Methods: []string{"POST", "PUT"}, DataSource: body, + close: make(chan struct{}), } }) } diff --git a/plugins/inputs/http_listener_v2/http_listener_v2_test.go b/plugins/inputs/http_listener_v2/http_listener_v2_test.go index bf320d6f05174..8f66c90ffd915 100644 --- a/plugins/inputs/http_listener_v2/http_listener_v2_test.go +++ b/plugins/inputs/http_listener_v2/http_listener_v2_test.go @@ -56,6 +56,7 @@ func newTestHTTPListenerV2() *HTTPListenerV2 { TimeFunc: time.Now, MaxBodySize: config.Size(70000), DataSource: "body", + close: make(chan struct{}), } return listener } @@ -78,6 +79,7 @@ func newTestHTTPSListenerV2() *HTTPListenerV2 { Parser: parser, ServerConfig: *pki.TLSServerConfig(), TimeFunc: time.Now, + close: make(chan struct{}), } return listener @@ -117,6 +119,7 @@ func TestInvalidListenerConfig(t *testing.T) { TimeFunc: time.Now, MaxBodySize: config.Size(70000), DataSource: "body", + close: make(chan struct{}), } acc := &testutil.Accumulator{} @@ -319,6 +322,7 @@ func TestWriteHTTPExactMaxBodySize(t *testing.T) { Parser: parser, MaxBodySize: config.Size(len(hugeMetric)), TimeFunc: time.Now, + close: make(chan struct{}), } acc := &testutil.Accumulator{} @@ -342,6 +346,7 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) { Parser: parser, MaxBodySize: config.Size(4096), TimeFunc: time.Now, + close: make(chan struct{}), } acc := &testutil.Accumulator{}