Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.firehose): Add new plugin #15988

Merged
merged 9 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Restructure code and improve unit-tests
  • Loading branch information
srebhan committed Dec 19, 2024
commit 72de753b4b91a8e09e9f8b793942484547532bf0
107 changes: 85 additions & 22 deletions plugins/inputs/firehose/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"crypto/tls"
_ "embed"
"encoding/json"
"errors"
"fmt"
"net"
Expand All @@ -23,7 +24,6 @@ import (
//go:embed sample.conf
var sampleConfig string

// Firehose is an input plugin that collects external metrics sent via HTTP from AWS Data Firhose
type Firehose struct {
ServiceAddress string `toml:"service_address"`
Paths []string `toml:"paths"`
Expand All @@ -49,10 +49,6 @@ func (*Firehose) SampleConfig() string {
return sampleConfig
}

func (*Firehose) Gather(telegraf.Accumulator) error {
return nil
}

func (f *Firehose) SetParser(parser telegraf.Parser) {
f.parser = parser
}
Expand Down Expand Up @@ -112,38 +108,104 @@ func (f *Firehose) Stop() {
}
}

func (*Firehose) Gather(telegraf.Accumulator) error {
return nil
}

func (f *Firehose) ServeHTTP(res http.ResponseWriter, req *http.Request) {
srebhan marked this conversation as resolved.
Show resolved Hide resolved
if !slices.Contains(f.Paths, req.URL.Path) {
res.WriteHeader(http.StatusNotFound)
return
}

if err := f.handleRequest(req).sendResponse(res); err != nil {
f.Log.Errorf("Sending response failed: %v", err)
msg, err := f.handleRequest(req)
if err != nil {
f.acc.AddError(err)
}
if err := msg.sendResponse(res); err != nil {
f.acc.AddError(fmt.Errorf("sending response failed: %w", err))
}
}

func (f *Firehose) handleRequest(req *http.Request) (r *request) {
var err error
if r, err = newFirehoseRequest(req); err != nil {
f.Log.Errorf("Creating request object failed: %v", err)
return r
func (f *Firehose) handleRequest(req *http.Request) (*message, error) {
// Create a request with a default response status code
msg := &message{
responseCode: http.StatusInternalServerError,
}

// Extract the request ID used to reference the request
msg.id = req.Header.Get("x-amz-firehose-request-id")
if msg.id == "" {
msg.responseCode = http.StatusBadRequest
return msg, errors.New("x-amz-firehose-request-id header is not set")
}

// Check the maximum body size which can be up to 64 MiB according to
// https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html
if req.ContentLength > int64(64*1024*1024) {
msg.responseCode = http.StatusRequestEntityTooLarge
return msg, errors.New("content length is too large")
}

// Check the HTTP method used
switch req.Method {
case http.MethodPost, http.MethodPut:
// Do nothing, those methods are allowed
default:
msg.responseCode = http.StatusMethodNotAllowed
return msg, fmt.Errorf("method %q is not allowed", req.Method)
}

if req.Header.Get("content-type") != "application/json" {
msg.responseCode = http.StatusUnsupportedMediaType
return msg, fmt.Errorf("content type %q is not allowed", req.Header.Get("content-type"))
}

// Decode the content if necessary and parse the JSON message
encoding := req.Header.Get("content-encoding")
body, err := internal.NewStreamContentDecoder(encoding, req.Body)
if err != nil {
msg.responseCode = http.StatusUnsupportedMediaType
return msg, fmt.Errorf("creating %q decoder for request %q failed: %w", encoding, msg.id, err)
}
defer req.Body.Close()

var reqbody requestBody
if err := json.NewDecoder(body).Decode(&reqbody); err != nil {
msg.responseCode = http.StatusBadRequest
return msg, fmt.Errorf("decode body for request %q failed: %w", msg.id, err)
}

// Validate the body content
if msg.id != reqbody.RequestID {
msg.responseCode = http.StatusBadRequest
return msg, fmt.Errorf("mismatch between request ID in header (%q) and body (%q)", msg.id, reqbody.RequestID)
}

// Authenticate the request
if err := msg.authenticate(req, f.AccessKey); err != nil {
return msg, fmt.Errorf("authentication for request %q failed: %w", msg.id, err)
}

// Extract the records and parameters for tagging
records, err := msg.decodeData(&reqbody)
if err != nil {
return msg, fmt.Errorf("decode base64 data from request %q failed: %w", msg.id, err)
}

records, paramTags, err := r.processRequest(f.AccessKey, f.ParameterTags)
tags, err := msg.extractTagsFromCommonAttributes(req, f.ParameterTags)
if err != nil {
f.Log.Errorf("Processing request failed: %v", err)
return r
return msg, fmt.Errorf("extracting parameter tags for request %q failed: %w", msg.id, err)
}

// Parse the metrics
var metrics []telegraf.Metric
for _, record := range records {
m, err := f.parser.Parse(record)
if err != nil {
// respond with bad request status code to inform firehose about the failure
r.res.statusCode = http.StatusBadRequest
f.Log.Errorf("Parse data from request %q failed: %v", r.body.RequestID, err)
return r
msg.responseCode = http.StatusBadRequest
return msg, fmt.Errorf("parsing data of request %q failed: %w", msg.id, err)
}
metrics = append(metrics, m...)
}
Expand All @@ -154,16 +216,17 @@ func (f *Firehose) handleRequest(req *http.Request) (r *request) {
})
}

// Add the extracted tags and the path
for _, m := range metrics {
for k, v := range paramTags {
for k, v := range tags {
m.AddTag(k, v)
}
m.AddTag("firehose_http_path", r.req.URL.Path)
m.AddTag("path", req.URL.Path)
f.acc.AddMetric(m)
}

r.res.statusCode = http.StatusOK
return r
msg.responseCode = http.StatusOK
return msg, nil
}

func init() {
Expand Down
Loading