Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/http_endpoint: add input metrics #36427

Merged
merged 2 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- For request tracer logging in CEL and httpjson the request and response body are no longer included in `event.original`. The body is still present in `http.{request,response}.body.content`. {pull}36531[36531]
- Added support for Okta OAuth2 provider in the CEL input. {issue}36336[36336] {pull}36521[36521]
- Improve error logging in HTTPJSON input. {pull}36529[36529]
- Add input metrics to http_endpoint input. {issue}36402[36402] {pull}36427[36427]

*Auditbeat*

Expand Down
22 changes: 22 additions & 0 deletions x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,28 @@ This option defines the provider of the webhook that uses CRC (Challenge-Respons

The secret token provided by the webhook owner for the CRC validation. It is required when a `crc.provider` is set.

[float]
=== Metrics

This input exposes metrics under the <<http-endpoint, HTTP monitoring endpoint>>.
These metrics are exposed under the `/inputs` path. They can be used to
observe the activity of the input.

[options="header"]
|=======
| Metric | Description
| `bind_address` | Bind address of input.
| `route` | HTTP request route of the input.
| `is_tls_connection` | Whether the input is listening on a TLS connection.
| `api_errors_total` | Number of API errors.
| `batches_received_total` | Number of event arrays received.
| `batches_published_total` | Number of event arrays published.
| `events_published_total` | Number of events published.
| `size` | Histogram of request content lengths.
| `batch_size` | Histogram of the received event array length.
| `batch_processing_time` | Histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches).
|=======

[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]

Expand Down
12 changes: 12 additions & 0 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
type httpHandler struct {
log *logp.Logger
publisher stateless.Publisher
metrics *inputMetrics

messageField string
responseCode int
Expand All @@ -43,16 +44,21 @@ type httpHandler struct {

// Triggers if middleware validation returns successful
func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) {
start := time.Now()
h.metrics.batchesReceived.Add(1)
h.metrics.contentLength.Update(r.ContentLength)
body, status, err := getBodyReader(r)
if err != nil {
sendAPIErrorResponse(w, r, h.log, status, err)
h.metrics.apiErrors.Add(1)
return
}
defer body.Close()

objs, _, status, err := httpReadJSON(body)
if err != nil {
sendAPIErrorResponse(w, r, h.log, status, err)
h.metrics.apiErrors.Add(1)
return
}

Expand All @@ -66,6 +72,7 @@ func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) {
respBody string
)

h.metrics.batchSize.Update(int64(len(objs)))
for _, obj := range objs {
var err error
if h.crc != nil {
Expand All @@ -74,19 +81,24 @@ func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) {
// CRC request processed
break
} else if !errors.Is(err, errNotCRC) {
h.metrics.apiErrors.Add(1)
sendAPIErrorResponse(w, r, h.log, http.StatusBadRequest, err)
return
}
}

if err = h.publishEvent(obj, headers); err != nil {
h.metrics.apiErrors.Add(1)
sendAPIErrorResponse(w, r, h.log, http.StatusInternalServerError, err)
return
}
h.metrics.eventsPublished.Add(1)
respCode, respBody = h.responseCode, h.responseBody
}

h.sendResponse(w, respCode, respBody)
h.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())
h.metrics.batchesPublished.Add(1)
}

func (h *httpHandler) sendResponse(w http.ResponseWriter, status int, message string) {
Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/http_endpoint/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ func Test_apiResponse(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
// Setup
pub := new(publisher)
apiHandler := newHandler(tc.conf, pub, logp.NewLogger("http_endpoint.test"))
metrics := newInputMetrics("")
defer metrics.Close()
apiHandler := newHandler(tc.conf, pub, logp.NewLogger("http_endpoint.test"), metrics)

// Execute handler.
respRec := httptest.NewRecorder()
Expand Down
106 changes: 98 additions & 8 deletions x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@ import (
"fmt"
"net"
"net/http"
"net/url"
"reflect"
"sync"
"time"

"github.com/rcrowley/go-metrics"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
"github.com/elastic/go-concert/ctxtool"
)
Expand Down Expand Up @@ -87,7 +93,9 @@ func (e *httpEndpoint) Test(_ v2.TestContext) error {
}

func (e *httpEndpoint) Run(ctx v2.Context, publisher stateless.Publisher) error {
err := servers.serve(ctx, e, publisher)
metrics := newInputMetrics(ctx.ID)
defer metrics.Close()
err := servers.serve(ctx, e, publisher, metrics)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("unable to start server due to error: %w", err)
}
Expand All @@ -109,11 +117,17 @@ type pool struct {
// cancelled or the context of another end-point sharing the same address
// has had its context cancelled. If an end-point is re-registered with
// the same address and mux pattern, serve will return an error.
func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) error {
func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher, metrics *inputMetrics) error {
log := ctx.Logger.With("address", e.addr)
pattern := e.config.URL

var err error
u, err := url.Parse(pattern)
if err != nil {
return err
}
metrics.route.Set(u.Path)
metrics.isTLS.Set(e.tlsConfig != nil)

p.mu.Lock()
s, ok := p.servers[e.addr]
if ok {
Expand All @@ -132,15 +146,15 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) e
return err
}
log.Infof("Adding %s end point to server on %s", pattern, e.addr)
s.mux.Handle(pattern, newHandler(e.config, pub, log))
s.mux.Handle(pattern, newHandler(e.config, pub, log, metrics))
s.idOf[pattern] = ctx.ID
p.mu.Unlock()
<-s.ctx.Done()
return s.getErr()
}

mux := http.NewServeMux()
mux.Handle(pattern, newHandler(e.config, pub, log))
mux.Handle(pattern, newHandler(e.config, pub, log, metrics))
srv := &http.Server{Addr: e.addr, TLSConfig: e.tlsConfig, Handler: mux, ReadHeaderTimeout: 5 * time.Second}
s = &server{
idOf: map[string]string{pattern: ctx.ID},
Expand All @@ -156,10 +170,10 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) e
log.Infof("Starting HTTPS server on %s with %s end point", srv.Addr, pattern)
// The certificate is already loaded so we do not need
// to pass the cert file and key file parameters.
err = s.srv.ListenAndServeTLS("", "")
err = listenAndServeTLS(s.srv, "", "", metrics)
} else {
log.Infof("Starting HTTP server on %s with %s end point", srv.Addr, pattern)
err = s.srv.ListenAndServe()
err = listenAndServe(s.srv, metrics)
}
p.mu.Lock()
delete(p.servers, e.addr)
Expand All @@ -169,6 +183,36 @@ func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) e
return err
}

func listenAndServeTLS(srv *http.Server, certFile, keyFile string, metrics *inputMetrics) error {
addr := srv.Addr
if addr == "" {
addr = ":https"
}

ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
metrics.bindAddr.Set(ln.Addr().String())

defer ln.Close()

return srv.ServeTLS(ln, certFile, keyFile)
}

func listenAndServe(srv *http.Server, metrics *inputMetrics) error {
addr := srv.Addr
if addr == "" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
metrics.bindAddr.Set(ln.Addr().String())
return srv.Serve(ln)
}

func checkTLSConsistency(addr string, old, new *tlscommon.ServerConfig) error {
if old == nil && new == nil {
return nil
Expand Down Expand Up @@ -240,7 +284,7 @@ func (s *server) getErr() error {
return s.err
}

func newHandler(c config, pub stateless.Publisher, log *logp.Logger) http.Handler {
func newHandler(c config, pub stateless.Publisher, log *logp.Logger, metrics *inputMetrics) http.Handler {
validator := &apiValidator{
basicAuth: c.BasicAuth,
username: c.Username,
Expand All @@ -258,6 +302,7 @@ func newHandler(c config, pub stateless.Publisher, log *logp.Logger) http.Handle
handler := &httpHandler{
log: log,
publisher: pub,
metrics: metrics,
messageField: c.Prefix,
responseCode: c.ResponseCode,
responseBody: c.ResponseBody,
Expand All @@ -268,3 +313,48 @@ func newHandler(c config, pub stateless.Publisher, log *logp.Logger) http.Handle

return newAPIValidationHandler(http.HandlerFunc(handler.apiResponse), validator, log)
}

// inputMetrics handles the input's metric reporting.
type inputMetrics struct {
unregister func()

bindAddr *monitoring.String // bind address of input
route *monitoring.String // request route
isTLS *monitoring.Bool // whether the input is listening on a TLS connection
apiErrors *monitoring.Uint // number of API errors
batchesReceived *monitoring.Uint // number of event arrays received
batchesPublished *monitoring.Uint // number of event arrays published
eventsPublished *monitoring.Uint // number of events published
contentLength metrics.Sample // histogram of request content lengths.
batchSize metrics.Sample // histogram of the received batch sizes.
batchProcessingTime metrics.Sample // histogram of the elapsed successful batch processing times in nanoseconds (time of handler start to time of ACK for non-empty batches).
}

func newInputMetrics(id string) *inputMetrics {
reg, unreg := inputmon.NewInputRegistry(inputName, id, nil)
out := &inputMetrics{
unregister: unreg,
bindAddr: monitoring.NewString(reg, "bind_address"),
route: monitoring.NewString(reg, "route"),
isTLS: monitoring.NewBool(reg, "is_tls_connection"),
apiErrors: monitoring.NewUint(reg, "api_errors_total"),
batchesReceived: monitoring.NewUint(reg, "batches_received_total"),
batchesPublished: monitoring.NewUint(reg, "batches_published_total"),
eventsPublished: monitoring.NewUint(reg, "events_published_total"),
contentLength: metrics.NewUniformSample(1024),
batchSize: metrics.NewUniformSample(1024),
batchProcessingTime: metrics.NewUniformSample(1024),
}
_ = adapter.NewGoMetrics(reg, "size", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.contentLength))
_ = adapter.NewGoMetrics(reg, "batch_size", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.batchSize))
_ = adapter.NewGoMetrics(reg, "batch_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.batchProcessingTime))

return out
}

func (m *inputMetrics) Close() {
m.unregister()
}
6 changes: 4 additions & 2 deletions x-pack/filebeat/input/http_endpoint/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,15 @@ func TestServerPool(t *testing.T) {
fails = make(chan error, 1)
)
ctx, cancel := newCtx("server_pool_test", test.name)
metrics := newInputMetrics("")
defer metrics.Close()
var wg sync.WaitGroup
for _, cfg := range test.cfgs {
cfg := cfg
wg.Add(1)
go func() {
defer wg.Done()
err := servers.serve(ctx, cfg, &pub)
err := servers.serve(ctx, cfg, &pub, metrics)
if err != http.ErrServerClosed {
select {
case fails <- err:
Expand Down Expand Up @@ -274,7 +276,7 @@ func TestServerPool(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
err := servers.serve(ctx, cfg, &pub)
err := servers.serve(ctx, cfg, &pub, metrics)
if err != nil && err != http.ErrServerClosed && test.wantErr == nil {
t.Errorf("failed to re-register %v: %v", cfg.addr, err)
}
Expand Down