Skip to content

Commit

Permalink
feat: support aws managed service for prometheus (influxdata#10202)
Browse files Browse the repository at this point in the history
  • Loading branch information
baloo authored and powersj committed Jan 21, 2022
1 parent 60fdc27 commit 5650cd3
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 0 deletions.
21 changes: 21 additions & 0 deletions plugins/outputs/http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,27 @@ batch format by default.
## Maximum amount of time before idle connection is closed.
## Zero means no limit.
# idle_conn_timeout = 0

## Amazon Region
#region = "us-east-1"

## Amazon Credentials
## Credentials are loaded in the following order
## 1) Web identity provider credentials via STS if role_arn and web_identity_token_file are specified
## 2) Assumed credentials via STS if role_arn is specified
## 3) explicit credentials from 'access_key' and 'secret_key'
## 4) shared profile from 'profile'
## 5) environment variables
## 6) shared credentials file
## 7) EC2 Instance Profile
#access_key = ""
#secret_key = ""
#token = ""
#role_arn = ""
#web_identity_token_file = ""
#role_session_name = ""
#profile = ""
#shared_credential_file = ""
```

### Optional Cookie Authentication Settings
Expand Down
69 changes: 69 additions & 0 deletions plugins/outputs/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ import (
"bufio"
"bytes"
"context"
"crypto/sha256"
"fmt"
"io"
"net/http"
"strings"
"time"

awsV2 "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/influxdata/telegraf"
internalaws "github.com/influxdata/telegraf/config/aws"
"github.com/influxdata/telegraf/internal"
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/outputs"
Expand Down Expand Up @@ -81,6 +86,27 @@ var sampleConfig = `
## Maximum amount of time before idle connection is closed.
## Zero means no limit.
# idle_conn_timeout = 0
## Amazon Region
#region = "us-east-1"
## Amazon Credentials
## Credentials are loaded in the following order
## 1) Web identity provider credentials via STS if role_arn and web_identity_token_file are specified
## 2) Assumed credentials via STS if role_arn is specified
## 3) explicit credentials from 'access_key' and 'secret_key'
## 4) shared profile from 'profile'
## 5) environment variables
## 6) shared credentials file
## 7) EC2 Instance Profile
#access_key = ""
#secret_key = ""
#token = ""
#role_arn = ""
#web_identity_token_file = ""
#role_session_name = ""
#profile = ""
#shared_credential_file = ""
`

const (
Expand All @@ -97,18 +123,29 @@ type HTTP struct {
Headers map[string]string `toml:"headers"`
ContentEncoding string `toml:"content_encoding"`
UseBatchFormat bool `toml:"use_batch_format"`
AwsService string `toml:"aws_service"`
httpconfig.HTTPClientConfig
Log telegraf.Logger `toml:"-"`

client *http.Client
serializer serializers.Serializer

awsCfg *awsV2.Config
internalaws.CredentialConfig
}

func (h *HTTP) SetSerializer(serializer serializers.Serializer) {
h.serializer = serializer
}

func (h *HTTP) Connect() error {
if h.AwsService != "" {
cfg, err := h.CredentialConfig.Credentials()
if err == nil {
h.awsCfg = &cfg
}
}

if h.Method == "" {
h.Method = http.MethodPost
}
Expand Down Expand Up @@ -180,11 +217,43 @@ func (h *HTTP) writeMetric(reqBody []byte) error {
reqBodyBuffer = rc
}

var payloadHash *string
if h.awsCfg != nil {
// We need a local copy of the full buffer, the signature scheme requires a sha256 of the request body.
buf := new(bytes.Buffer)
_, err = io.Copy(buf, reqBodyBuffer)
if err != nil {
return err
}

sum := sha256.Sum256(buf.Bytes())
reqBodyBuffer = buf

// sha256 is hex encoded
hash := fmt.Sprintf("%x", sum)
payloadHash = &hash
}

req, err := http.NewRequest(h.Method, h.URL, reqBodyBuffer)
if err != nil {
return err
}

if h.awsCfg != nil {
signer := v4.NewSigner()
ctx := context.Background()

credentials, err := h.awsCfg.Credentials.Retrieve(ctx)
if err != nil {
return err
}

err = signer.SignHTTP(ctx, credentials, req, *payloadHash, h.AwsService, h.Region, time.Now().UTC())
if err != nil {
return err
}
}

if h.Username != "" || h.Password != "" {
req.SetBasicAuth(h.Username, h.Password)
}
Expand Down
51 changes: 51 additions & 0 deletions plugins/outputs/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
internalaws "github.com/influxdata/telegraf/config/aws"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
Expand Down Expand Up @@ -516,3 +517,53 @@ func TestBatchedUnbatched(t *testing.T) {
})
}
}

func TestAwsCredentials(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()

u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

tests := []struct {
name string
plugin *HTTP
tokenHandler TestHandlerFunc
handler TestHandlerFunc
}{
{
name: "simple credentials",
plugin: &HTTP{
URL: u.String(),
AwsService: "aps",
CredentialConfig: internalaws.CredentialConfig{
Region: "us-east-1",
AccessKey: "dummy",
SecretKey: "dummy",
},
},
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Contains(t, r.Header["Authorization"][0], "AWS4-HMAC-SHA256")
require.Contains(t, r.Header["Authorization"][0], "=dummy/")
require.Contains(t, r.Header["Authorization"][0], "/us-east-1/")
w.WriteHeader(http.StatusOK)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tt.handler(t, w, r)
})

serializer := influx.NewSerializer()
tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
require.NoError(t, err)

err = tt.plugin.Write([]telegraf.Metric{getMetric()})
require.NoError(t, err)
})
}
}

0 comments on commit 5650cd3

Please sign in to comment.