Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.firehose): Add new plugin #15988

Merged
merged 9 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/inputs/all/firehose.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !custom || inputs || inputs.firehose

package all

import _ "github.com/influxdata/telegraf/plugins/inputs/firehose" // register plugin
122 changes: 122 additions & 0 deletions plugins/inputs/firehose/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# AWS Data Firehose Input Plugin

This plugin listens for metrics sent via HTTP from [AWS Data Firehose][firehose]
in one of the supported [data formats][data_formats].
The plugin strictly follows the request-response schema as describe in the
official [documentation][response_spec].

⭐ Telegraf v1.34.0
🏷️ cloud, messaging
💻 all

[firehose]: https://aws.amazon.com/de/firehose/
[data_formats]: /docs/DATA_FORMATS_INPUT.md
[response_spec]: https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html

## Service Input <!-- @/docs/includes/service_input.md -->

This plugin is a service input. Normal plugins gather metrics determined by the
interval setting. Service plugins start a service to listens and waits for
metrics or events to occur. Service plugins have two key differences from
normal plugins:

1. The global or plugin specific `interval` setting may not apply
2. The CLI options of `--test`, `--test-wait`, and `--once` may not produce
output for this plugin

## Global configuration options <!-- @/docs/includes/plugin_config.md -->

In addition to the plugin-specific configuration settings, plugins support
additional global and plugin configuration settings. These settings are used to
modify metrics, tags, and field or create aliases and configure ordering, etc.
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins

## Configuration

```toml @sample.conf
# AWS Data Firehose listener
[[inputs.firehose]]
## Address and port to host HTTP listener on
service_address = ":8080"

## Paths to listen to.
# paths = ["/telegraf"]

## maximum duration before timing out read of the request
# read_timeout = "5s"
## maximum duration before timing out write of the response
# write_timeout = "5s"

## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]

## Add service certificate and key
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"

## Minimal TLS version accepted by the server
# tls_min_version = "TLS12"

## Optional access key to accept for authentication.
## AWS Data Firehose uses "x-amz-firehose-access-key" header to set the access key.
## If no access_key is provided (default), authentication is completely disabled and
## this plugin will accept all request ignoring the provided access-key in the request!
# access_key = "foobar"

## Optional setting to add parameters as tags
## If the http header "x-amz-firehose-common-attributes" is not present on the
## request, no corresponding tag will be added. The header value should be a
## json and should follow the schema as describe in the official documentation:
## https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#requestformat
# parameter_tags = ["env"]

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx"
```

## Metrics

Metrics are collected from the `records.[*].data` field in the request body.
The data must be base64 encoded and may be sent in any supported
[data format][data_formats].

## Example Output

When run with this configuration:

```toml
[[inputs.firehose]]
service_address = ":8080"
paths = ["/telegraf"]
data_format = "value"
data_type = "string"
```

the following curl command:

```sh
curl -i -XPOST 'localhost:8080/telegraf' \
--header 'x-amz-firehose-request-id: ed4acda5-034f-9f42-bba1-f29aea6d7d8f' \
--header 'Content-Type: application/json' \
--data '{
"requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f",
"timestamp": 1578090901599,
"records": [
{
"data": "aGVsbG8gd29ybGQK" // "hello world"
}
]
}'
```

produces:

```text
firehose,firehose_http_path=/telegraf value="hello world" 1725001851000000000
```
239 changes: 239 additions & 0 deletions plugins/inputs/firehose/firehose.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
//go:generate ../../../tools/readme_config_includer/generator
package firehose

import (
"context"
"crypto/tls"
_ "embed"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"slices"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
common_tls "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)

//go:embed sample.conf
var sampleConfig string

type Firehose struct {
ServiceAddress string `toml:"service_address"`
Paths []string `toml:"paths"`
ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"`
AccessKey config.Secret `toml:"access_key"`
ParameterTags []string `toml:"parameter_tags"`
Log telegraf.Logger `toml:"-"`

common_tls.ServerConfig
tlsConf *tls.Config

once sync.Once

listener net.Listener
server http.Server

parser telegraf.Parser
acc telegraf.Accumulator
}

func (*Firehose) SampleConfig() string {
return sampleConfig
}

func (f *Firehose) SetParser(parser telegraf.Parser) {
f.parser = parser
}

func (f *Firehose) Init() error {
if f.ServiceAddress == "" {
f.ServiceAddress = ":8080"
}
if len(f.Paths) == 0 {
f.Paths = []string{"/telegraf"}
}

var err error
f.tlsConf, err = f.ServerConfig.TLSConfig()
return err
}

// Start starts the http listener service.
func (f *Firehose) Start(acc telegraf.Accumulator) error {
f.acc = acc

var err error
if f.tlsConf != nil {
f.listener, err = tls.Listen("tcp", f.ServiceAddress, f.tlsConf)
} else {
f.listener, err = net.Listen("tcp", f.ServiceAddress)
}
if err != nil {
return fmt.Errorf("creating listener failed: %w", err)
}

f.server = http.Server{
Addr: f.ServiceAddress,
Handler: f,
ReadTimeout: time.Duration(f.ReadTimeout),
WriteTimeout: time.Duration(f.WriteTimeout),
TLSConfig: f.tlsConf,
}

go func() {
if err := f.server.Serve(f.listener); err != nil {
if !errors.Is(err, net.ErrClosed) {
f.Log.Errorf("Server failed: %v", err)
}
}
}()

f.Log.Infof("Listening on %s", f.listener.Addr().String())

return nil
}

// Stop cleans up all resources
func (f *Firehose) Stop() {
if err := f.server.Shutdown(context.Background()); err != nil {
f.Log.Errorf("Shutting down server failed: %v", err)
}
}

func (*Firehose) Gather(telegraf.Accumulator) error {
return nil
}

func (f *Firehose) ServeHTTP(res http.ResponseWriter, req *http.Request) {
srebhan marked this conversation as resolved.
Show resolved Hide resolved
if !slices.Contains(f.Paths, req.URL.Path) {
res.WriteHeader(http.StatusNotFound)
return
}

msg, err := f.handleRequest(req)
if err != nil {
f.acc.AddError(err)
}
if err := msg.sendResponse(res); err != nil {
f.acc.AddError(fmt.Errorf("sending response failed: %w", err))
}
}

func (f *Firehose) handleRequest(req *http.Request) (*message, error) {
// Create a request with a default response status code
msg := &message{
responseCode: http.StatusInternalServerError,
}

// Extract the request ID used to reference the request
msg.id = req.Header.Get("x-amz-firehose-request-id")
if msg.id == "" {
msg.responseCode = http.StatusBadRequest
return msg, errors.New("x-amz-firehose-request-id header is not set")
}

// Check the maximum body size which can be up to 64 MiB according to
// https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html
if req.ContentLength > int64(64*1024*1024) {
msg.responseCode = http.StatusRequestEntityTooLarge
return msg, errors.New("content length is too large")
}

// Check the HTTP method used
switch req.Method {
case http.MethodPost, http.MethodPut:
// Do nothing, those methods are allowed
default:
msg.responseCode = http.StatusMethodNotAllowed
return msg, fmt.Errorf("method %q is not allowed", req.Method)
}

if req.Header.Get("content-type") != "application/json" {
msg.responseCode = http.StatusUnsupportedMediaType
return msg, fmt.Errorf("content type %q is not allowed", req.Header.Get("content-type"))
}

// Decode the content if necessary and parse the JSON message
encoding := req.Header.Get("content-encoding")
body, err := internal.NewStreamContentDecoder(encoding, req.Body)
if err != nil {
msg.responseCode = http.StatusUnsupportedMediaType
return msg, fmt.Errorf("creating %q decoder for request %q failed: %w", encoding, msg.id, err)
}
defer req.Body.Close()

var reqbody requestBody
if err := json.NewDecoder(body).Decode(&reqbody); err != nil {
msg.responseCode = http.StatusBadRequest
return msg, fmt.Errorf("decode body for request %q failed: %w", msg.id, err)
}

// Validate the body content
if msg.id != reqbody.RequestID {
msg.responseCode = http.StatusBadRequest
return msg, fmt.Errorf("mismatch between request ID in header (%q) and body (%q)", msg.id, reqbody.RequestID)
}

// Authenticate the request
if err := msg.authenticate(req, f.AccessKey); err != nil {
return msg, fmt.Errorf("authentication for request %q failed: %w", msg.id, err)
}

// Extract the records and parameters for tagging
records, err := msg.decodeData(&reqbody)
if err != nil {
return msg, fmt.Errorf("decode base64 data from request %q failed: %w", msg.id, err)
}

tags, err := msg.extractTagsFromCommonAttributes(req, f.ParameterTags)
if err != nil {
return msg, fmt.Errorf("extracting parameter tags for request %q failed: %w", msg.id, err)
}

// Parse the metrics
var metrics []telegraf.Metric
for _, record := range records {
m, err := f.parser.Parse(record)
if err != nil {
// respond with bad request status code to inform firehose about the failure
msg.responseCode = http.StatusBadRequest
return msg, fmt.Errorf("parsing data of request %q failed: %w", msg.id, err)
}
metrics = append(metrics, m...)
}

if len(metrics) == 0 {
f.once.Do(func() {
f.Log.Info(internal.NoMetricsCreatedMsg)
})
}

// Add the extracted tags and the path
for _, m := range metrics {
for k, v := range tags {
m.AddTag(k, v)
}
m.AddTag("path", req.URL.Path)
f.acc.AddMetric(m)
}

msg.responseCode = http.StatusOK
return msg, nil
}

func init() {
inputs.Add("firehose", func() telegraf.Input {
return &Firehose{
ReadTimeout: config.Duration(time.Second * 5),
WriteTimeout: config.Duration(time.Second * 5),
}
})
}
Loading
Loading