Skip to content

Commit

Permalink
http_listener_v2: add Init()
Browse files Browse the repository at this point in the history
  • Loading branch information
pmalek committed Dec 3, 2021
1 parent a663a83 commit 2b19e36
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 27 deletions.
2 changes: 0 additions & 2 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type MetricMaker interface {
type accumulator struct {
maker MetricMaker
metrics chan<- telegraf.Metric
closed chan struct{}
precision time.Duration
}

Expand All @@ -27,7 +26,6 @@ func NewAccumulator(
acc := accumulator{
maker: maker,
metrics: metrics,
closed: make(chan struct{}),
precision: time.Nanosecond,
}
return &acc
Expand Down
59 changes: 36 additions & 23 deletions plugins/inputs/http_listener_v2/http_listener_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ type HTTPListenerV2 struct {
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
HTTPHeaderTags map[string]string `toml:"http_header_tags"`

tlsint.ServerConfig
tlsConf *tls.Config

TimeFunc
Log telegraf.Logger
Expand Down Expand Up @@ -156,33 +158,12 @@ func (h *HTTPListenerV2) Start(acc telegraf.Accumulator) error {

h.acc = acc

tlsConf, err := h.ServerConfig.TLSConfig()
if err != nil {
return err
}

server := &http.Server{
Addr: h.ServiceAddress,
Handler: h,
ReadTimeout: time.Duration(h.ReadTimeout),
WriteTimeout: time.Duration(h.WriteTimeout),
TLSConfig: tlsConf,
}

var listener net.Listener
if tlsConf != nil {
listener, err = tls.Listen("tcp", h.ServiceAddress, tlsConf)
} else {
listener, err = net.Listen("tcp", h.ServiceAddress)
}
server, err := h.createHTTPServer()
if err != nil {
return err
}
h.listener = listener
h.Port = listener.Addr().(*net.TCPAddr).Port

h.wg.Add(1)

go func() {
defer h.wg.Done()
if err := server.Serve(h.listener); err != nil {
Expand All @@ -193,11 +174,21 @@ func (h *HTTPListenerV2) Start(acc telegraf.Accumulator) error {
}
}()

h.Log.Infof("Listening on %s", listener.Addr().String())
h.Log.Infof("Listening on %s", h.listener.Addr().String())

return nil
}

func (h *HTTPListenerV2) createHTTPServer() (*http.Server, error) {
return &http.Server{
Addr: h.ServiceAddress,
Handler: h,
ReadTimeout: time.Duration(h.ReadTimeout),
WriteTimeout: time.Duration(h.WriteTimeout),
TLSConfig: h.tlsConf,
}, nil
}

// Stop cleans up all resources
func (h *HTTPListenerV2) Stop() {
if h.listener != nil {
Expand All @@ -208,6 +199,28 @@ func (h *HTTPListenerV2) Stop() {
h.wg.Wait()
}

func (h *HTTPListenerV2) Init() error {
tlsConf, err := h.ServerConfig.TLSConfig()
if err != nil {
return err
}

var listener net.Listener
if tlsConf != nil {
listener, err = tls.Listen("tcp", h.ServiceAddress, tlsConf)
} else {
listener, err = net.Listen("tcp", h.ServiceAddress)
}
if err != nil {
return err
}
h.tlsConf = tlsConf
h.listener = listener
h.Port = listener.Addr().(*net.TCPAddr).Port

return nil
}

func (h *HTTPListenerV2) ServeHTTP(res http.ResponseWriter, req *http.Request) {
handler := h.serveWrite

Expand Down
23 changes: 21 additions & 2 deletions plugins/inputs/http_listener_v2/http_listener_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ func TestInvalidListenerConfig(t *testing.T) {
close: make(chan struct{}),
}

acc := &testutil.Accumulator{}
require.Error(t, listener.Start(acc))
require.Error(t, listener.Init())

// Stop is called when any ServiceInput fails to start; it must succeed regardless of state
listener.Stop()
Expand All @@ -134,6 +133,7 @@ func TestWriteHTTPSNoClientAuth(t *testing.T) {
listener.TLSAllowedCACerts = nil

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand All @@ -158,6 +158,7 @@ func TestWriteHTTPSWithClientAuth(t *testing.T) {
listener := newTestHTTPSListenerV2()

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand All @@ -172,6 +173,7 @@ func TestWriteHTTPBasicAuth(t *testing.T) {
listener := newTestHTTPAuthListener()

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand All @@ -190,6 +192,7 @@ func TestWriteHTTP(t *testing.T) {
listener := newTestHTTPListenerV2()

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand Down Expand Up @@ -240,6 +243,7 @@ func TestWriteHTTPWithPathTag(t *testing.T) {
listener.PathTag = true

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand All @@ -263,6 +267,7 @@ func TestWriteHTTPWithMultiplePaths(t *testing.T) {
listener.PathTag = true

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand Down Expand Up @@ -295,6 +300,7 @@ func TestWriteHTTPNoNewline(t *testing.T) {
listener := newTestHTTPListenerV2()

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand Down Expand Up @@ -326,6 +332,7 @@ func TestWriteHTTPExactMaxBodySize(t *testing.T) {
}

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand All @@ -350,6 +357,7 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
}

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand All @@ -364,6 +372,8 @@ func TestWriteHTTPGzippedData(t *testing.T) {
listener := newTestHTTPListenerV2()

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand Down Expand Up @@ -396,6 +406,7 @@ func TestWriteHTTPSnappyData(t *testing.T) {
listener := newTestHTTPListenerV2()

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand Down Expand Up @@ -434,6 +445,7 @@ func TestWriteHTTPHighTraffic(t *testing.T) {
listener := newTestHTTPListenerV2()

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand Down Expand Up @@ -469,6 +481,7 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) {
listener := newTestHTTPListenerV2()

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand All @@ -483,6 +496,7 @@ func TestWriteHTTPInvalid(t *testing.T) {
listener := newTestHTTPListenerV2()

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand All @@ -497,6 +511,7 @@ func TestWriteHTTPEmpty(t *testing.T) {
listener := newTestHTTPListenerV2()

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand All @@ -512,6 +527,7 @@ func TestWriteHTTPTransformHeaderValuesToTagsSingleWrite(t *testing.T) {
listener.HTTPHeaderTags = map[string]string{"Present_http_header_1": "presentMeasurementKey1", "present_http_header_2": "presentMeasurementKey2", "NOT_PRESENT_HEADER": "notPresentMeasurementKey"}

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand Down Expand Up @@ -550,6 +566,7 @@ func TestWriteHTTPTransformHeaderValuesToTagsBulkWrite(t *testing.T) {
listener.HTTPHeaderTags = map[string]string{"Present_http_header_1": "presentMeasurementKey1", "Present_http_header_2": "presentMeasurementKey2", "NOT_PRESENT_HEADER": "notPresentMeasurementKey"}

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand Down Expand Up @@ -581,6 +598,7 @@ func TestWriteHTTPQueryParams(t *testing.T) {
listener.Parser = parser

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand All @@ -602,6 +620,7 @@ func TestWriteHTTPFormData(t *testing.T) {
listener.Parser = parser

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

Expand Down

0 comments on commit 2b19e36

Please sign in to comment.