From 2b19e360462948362a07e473ea06b3c84514a908 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20Ma=C5=82ek?= Date: Mon, 22 Nov 2021 11:24:39 +0100 Subject: [PATCH] http_listener_v2: add Init() --- agent/accumulator.go | 2 - .../http_listener_v2/http_listener_v2.go | 59 +++++++++++-------- .../http_listener_v2/http_listener_v2_test.go | 23 +++++++- 3 files changed, 57 insertions(+), 27 deletions(-) diff --git a/agent/accumulator.go b/agent/accumulator.go index cc5bed68d16dd..3683b6767d47f 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -16,7 +16,6 @@ type MetricMaker interface { type accumulator struct { maker MetricMaker metrics chan<- telegraf.Metric - closed chan struct{} precision time.Duration } @@ -27,7 +26,6 @@ func NewAccumulator( acc := accumulator{ maker: maker, metrics: metrics, - closed: make(chan struct{}), precision: time.Nanosecond, } return &acc diff --git a/plugins/inputs/http_listener_v2/http_listener_v2.go b/plugins/inputs/http_listener_v2/http_listener_v2.go index 3aef78f8d246d..0236d21c42a13 100644 --- a/plugins/inputs/http_listener_v2/http_listener_v2.go +++ b/plugins/inputs/http_listener_v2/http_listener_v2.go @@ -51,7 +51,9 @@ type HTTPListenerV2 struct { BasicUsername string `toml:"basic_username"` BasicPassword string `toml:"basic_password"` HTTPHeaderTags map[string]string `toml:"http_header_tags"` + tlsint.ServerConfig + tlsConf *tls.Config TimeFunc Log telegraf.Logger @@ -156,33 +158,12 @@ func (h *HTTPListenerV2) Start(acc telegraf.Accumulator) error { h.acc = acc - tlsConf, err := h.ServerConfig.TLSConfig() - if err != nil { - return err - } - - server := &http.Server{ - Addr: h.ServiceAddress, - Handler: h, - ReadTimeout: time.Duration(h.ReadTimeout), - WriteTimeout: time.Duration(h.WriteTimeout), - TLSConfig: tlsConf, - } - - var listener net.Listener - if tlsConf != nil { - listener, err = tls.Listen("tcp", h.ServiceAddress, tlsConf) - } else { - listener, err = net.Listen("tcp", h.ServiceAddress) - } + server, err := h.createHTTPServer() if err != nil { return err } - h.listener = listener - h.Port = listener.Addr().(*net.TCPAddr).Port h.wg.Add(1) - go func() { defer h.wg.Done() if err := server.Serve(h.listener); err != nil { @@ -193,11 +174,21 @@ func (h *HTTPListenerV2) Start(acc telegraf.Accumulator) error { } }() - h.Log.Infof("Listening on %s", listener.Addr().String()) + h.Log.Infof("Listening on %s", h.listener.Addr().String()) return nil } +func (h *HTTPListenerV2) createHTTPServer() (*http.Server, error) { + return &http.Server{ + Addr: h.ServiceAddress, + Handler: h, + ReadTimeout: time.Duration(h.ReadTimeout), + WriteTimeout: time.Duration(h.WriteTimeout), + TLSConfig: h.tlsConf, + }, nil +} + // Stop cleans up all resources func (h *HTTPListenerV2) Stop() { if h.listener != nil { @@ -208,6 +199,28 @@ func (h *HTTPListenerV2) Stop() { h.wg.Wait() } +func (h *HTTPListenerV2) Init() error { + tlsConf, err := h.ServerConfig.TLSConfig() + if err != nil { + return err + } + + var listener net.Listener + if tlsConf != nil { + listener, err = tls.Listen("tcp", h.ServiceAddress, tlsConf) + } else { + listener, err = net.Listen("tcp", h.ServiceAddress) + } + if err != nil { + return err + } + h.tlsConf = tlsConf + h.listener = listener + h.Port = listener.Addr().(*net.TCPAddr).Port + + return nil +} + func (h *HTTPListenerV2) ServeHTTP(res http.ResponseWriter, req *http.Request) { handler := h.serveWrite diff --git a/plugins/inputs/http_listener_v2/http_listener_v2_test.go b/plugins/inputs/http_listener_v2/http_listener_v2_test.go index 8f66c90ffd915..ddbb5be64ed52 100644 --- a/plugins/inputs/http_listener_v2/http_listener_v2_test.go +++ b/plugins/inputs/http_listener_v2/http_listener_v2_test.go @@ -122,8 +122,7 @@ func TestInvalidListenerConfig(t *testing.T) { close: make(chan struct{}), } - acc := &testutil.Accumulator{} - require.Error(t, listener.Start(acc)) + require.Error(t, listener.Init()) // Stop is called when any ServiceInput fails to start; it must succeed regardless of state listener.Stop() @@ -134,6 +133,7 @@ func TestWriteHTTPSNoClientAuth(t *testing.T) { listener.TLSAllowedCACerts = nil acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -158,6 +158,7 @@ func TestWriteHTTPSWithClientAuth(t *testing.T) { listener := newTestHTTPSListenerV2() acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -172,6 +173,7 @@ func TestWriteHTTPBasicAuth(t *testing.T) { listener := newTestHTTPAuthListener() acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -190,6 +192,7 @@ func TestWriteHTTP(t *testing.T) { listener := newTestHTTPListenerV2() acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -240,6 +243,7 @@ func TestWriteHTTPWithPathTag(t *testing.T) { listener.PathTag = true acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -263,6 +267,7 @@ func TestWriteHTTPWithMultiplePaths(t *testing.T) { listener.PathTag = true acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -295,6 +300,7 @@ func TestWriteHTTPNoNewline(t *testing.T) { listener := newTestHTTPListenerV2() acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -326,6 +332,7 @@ func TestWriteHTTPExactMaxBodySize(t *testing.T) { } acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -350,6 +357,7 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) { } acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -364,6 +372,8 @@ func TestWriteHTTPGzippedData(t *testing.T) { listener := newTestHTTPListenerV2() acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -396,6 +406,7 @@ func TestWriteHTTPSnappyData(t *testing.T) { listener := newTestHTTPListenerV2() acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -434,6 +445,7 @@ func TestWriteHTTPHighTraffic(t *testing.T) { listener := newTestHTTPListenerV2() acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -469,6 +481,7 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) { listener := newTestHTTPListenerV2() acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -483,6 +496,7 @@ func TestWriteHTTPInvalid(t *testing.T) { listener := newTestHTTPListenerV2() acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -497,6 +511,7 @@ func TestWriteHTTPEmpty(t *testing.T) { listener := newTestHTTPListenerV2() acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -512,6 +527,7 @@ func TestWriteHTTPTransformHeaderValuesToTagsSingleWrite(t *testing.T) { listener.HTTPHeaderTags = map[string]string{"Present_http_header_1": "presentMeasurementKey1", "present_http_header_2": "presentMeasurementKey2", "NOT_PRESENT_HEADER": "notPresentMeasurementKey"} acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -550,6 +566,7 @@ func TestWriteHTTPTransformHeaderValuesToTagsBulkWrite(t *testing.T) { listener.HTTPHeaderTags = map[string]string{"Present_http_header_1": "presentMeasurementKey1", "Present_http_header_2": "presentMeasurementKey2", "NOT_PRESENT_HEADER": "notPresentMeasurementKey"} acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -581,6 +598,7 @@ func TestWriteHTTPQueryParams(t *testing.T) { listener.Parser = parser acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop() @@ -602,6 +620,7 @@ func TestWriteHTTPFormData(t *testing.T) { listener.Parser = parser acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) require.NoError(t, listener.Start(acc)) defer listener.Stop()