forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add amazon kinesis as an output plugin
closes influxdata#428
- Loading branch information
1 parent
c89ef84
commit b91eab6
Showing
6 changed files
with
283 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
tivan | ||
.vagrant | ||
telegraf | ||
.idea |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
## Amazon Kinesis Output for Telegraf | ||
|
||
This is an experimental plugin that is still in the early stages of development. It will batch up all of the Points | ||
in one Put request to Kinesis. This should save the number of API requests by a considerable level. | ||
|
||
## About Kinesis | ||
|
||
This is not the place to document all of the various Kinesis terms however it | ||
maybe useful for users to review Amazons official documentation which is available | ||
[here](http://docs.aws.amazon.com/kinesis/latest/dev/key-concepts.html). | ||
|
||
## Amazon Authentication | ||
|
||
This plugin uses a credential chain for Authentication with the Kinesis API endpoint. In the following order the plugin | ||
will attempt to authenticate. | ||
1. [IAMS Role](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html) | ||
2. [Environment Variables](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk) | ||
3. [Shared Credentials](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk) | ||
|
||
|
||
## Config | ||
|
||
For this output plugin to function correctly the following variables must be configured. | ||
|
||
* region | ||
* streamname | ||
* partitionkey | ||
|
||
### region | ||
|
||
The region is the Amazon region that you wish to connect to. Examples include but are not limited to | ||
* us-west-1 | ||
* us-west-2 | ||
* us-east-1 | ||
* ap-southeast-1 | ||
* ap-southeast-2 | ||
|
||
### streamname | ||
|
||
The streamname is used by the plugin to ensure that data is sent to the correct Kinesis stream. It is important to | ||
note that the stream *MUST* be pre-configured for this plugin to function correctly. If the stream does not exist the | ||
plugin will result in telegraf exiting with an exit code of 1. | ||
|
||
### partitionkey | ||
|
||
This is used to group data within a stream. Currently this plugin only supports a single partitionkey. | ||
Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable | ||
solution to scale out. | ||
|
||
### format | ||
|
||
The format configuration value has been designated to allow people to change the format of the Point as written to | ||
Kinesis. Right now there are two supported formats string and custom. | ||
|
||
#### string | ||
|
||
String is defined using the default Point.String() value and translated to []byte for the Kinesis stream. | ||
|
||
#### custom | ||
|
||
Custom is a string defined by a number of values in the FormatMetric() function. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
package kinesis | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"log" | ||
"os" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/aws/credentials" | ||
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" | ||
"github.com/aws/aws-sdk-go/aws/ec2metadata" | ||
"github.com/aws/aws-sdk-go/aws/session" | ||
"github.com/aws/aws-sdk-go/service/kinesis" | ||
|
||
"github.com/influxdb/influxdb/client/v2" | ||
"github.com/influxdb/telegraf/outputs" | ||
) | ||
|
||
type KinesisOutput struct { | ||
Region string `toml:"region"` | ||
StreamName string `toml:"streamname"` | ||
PartitionKey string `toml:"partitionkey"` | ||
Format string `toml:"format"` | ||
Debug bool `toml:"debug"` | ||
svc *kinesis.Kinesis | ||
} | ||
|
||
var sampleConfig = ` | ||
# Amazon REGION of kinesis endpoint. | ||
region = "ap-southeast-2" | ||
# Kinesis StreamName must exist prior to starting telegraf. | ||
streamname = "StreamName" | ||
# PartitionKey as used for sharding data. | ||
partitionkey = "PartitionKey" | ||
# format of the Data payload in the kinesis PutRecord, supported | ||
# String and Custom. | ||
format = "string" | ||
# debug will show upstream aws messages. | ||
debug = false | ||
` | ||
|
||
func (k *KinesisOutput) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
func (k *KinesisOutput) Description() string { | ||
return "Configuration for the AWS Kinesis output." | ||
} | ||
|
||
func checkstream(l []*string, s string) bool { | ||
// Check if the StreamName exists in the slice returned from the ListStreams API request. | ||
for _, stream := range l { | ||
if *stream == s { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
func (k *KinesisOutput) Connect() error { | ||
// We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using | ||
// environment variables, and then Shared Credentials. | ||
if k.Debug { | ||
log.Printf("kinesis: Establishing a connection to Kinesis in %+v", k.Region) | ||
} | ||
Config := &aws.Config{ | ||
Region: aws.String(k.Region), | ||
Credentials: credentials.NewChainCredentials( | ||
[]credentials.Provider{ | ||
&ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())}, | ||
&credentials.EnvProvider{}, | ||
&credentials.SharedCredentialsProvider{}, | ||
}), | ||
} | ||
svc := kinesis.New(session.New(Config)) | ||
|
||
KinesisParams := &kinesis.ListStreamsInput{ | ||
Limit: aws.Int64(100), | ||
} | ||
|
||
resp, err := svc.ListStreams(KinesisParams) | ||
|
||
if err != nil { | ||
log.Printf("kinesis: Error in ListSteams API call : %+v \n", err) | ||
} | ||
|
||
if checkstream(resp.StreamNames, k.StreamName) { | ||
if k.Debug { | ||
log.Printf("kinesis: Stream Exists") | ||
} | ||
k.svc = svc | ||
return nil | ||
} else { | ||
log.Printf("kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName) | ||
os.Exit(1) | ||
} | ||
return err | ||
} | ||
|
||
func (k *KinesisOutput) Close() error { | ||
return errors.New("Error") | ||
} | ||
|
||
func FormatMetric(k *KinesisOutput, point *client.Point) (string, error) { | ||
if k.Format == "string" { | ||
return point.String(), nil | ||
} else { | ||
m := fmt.Sprintf("%+v,%+v,%+v", | ||
point.Name(), | ||
point.Tags(), | ||
point.String()) | ||
return m, nil | ||
} | ||
} | ||
|
||
func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Duration { | ||
start := time.Now() | ||
payload := &kinesis.PutRecordsInput{ | ||
Records: r, | ||
StreamName: aws.String(k.StreamName), | ||
} | ||
|
||
if k.Debug { | ||
resp, err := k.svc.PutRecords(payload) | ||
if err != nil { | ||
log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error()) | ||
} | ||
log.Printf("%+v \n", resp) | ||
|
||
} else { | ||
_, err := k.svc.PutRecords(payload) | ||
if err != nil { | ||
log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error()) | ||
} | ||
} | ||
return time.Since(start) | ||
} | ||
|
||
func (k *KinesisOutput) Write(points []*client.Point) error { | ||
var sz uint32 = 0 | ||
|
||
if len(points) == 0 { | ||
return nil | ||
} | ||
|
||
r := []*kinesis.PutRecordsRequestEntry{} | ||
|
||
for _, p := range points { | ||
atomic.AddUint32(&sz, 1) | ||
|
||
metric, _ := FormatMetric(k, p) | ||
d := kinesis.PutRecordsRequestEntry{ | ||
Data: []byte(metric), | ||
PartitionKey: aws.String(k.PartitionKey), | ||
} | ||
r = append(r, &d) | ||
|
||
if sz == 500 { | ||
// Max Messages Per PutRecordRequest is 500 | ||
elapsed := writekinesis(k, r) | ||
log.Printf("Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed) | ||
atomic.StoreUint32(&sz, 0) | ||
r = nil | ||
} | ||
} | ||
|
||
writekinesis(k, r) | ||
|
||
return nil | ||
} | ||
|
||
func init() { | ||
outputs.Add("kinesis", func() outputs.Output { | ||
return &KinesisOutput{} | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package kinesis | ||
|
||
import ( | ||
"testing" | ||
"github.com/influxdb/telegraf/testutil" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
|
||
func TestFormatMetric(t *testing.T) { | ||
if testing.Short() { | ||
t.Skip("Skipping integration test in short mode") | ||
} | ||
|
||
k := &KinesisOutput{ | ||
Format: "string", | ||
} | ||
|
||
p := testutil.MockBatchPoints().Points()[0] | ||
|
||
valid_string := "test1,tag1=value1 value=1 1257894000000000000" | ||
func_string, err := FormatMetric(k, p) | ||
|
||
if func_string != valid_string { | ||
t.Error("Expected ", valid_string) | ||
} | ||
require.NoError(t, err) | ||
|
||
k = &KinesisOutput{ | ||
Format: "custom", | ||
} | ||
|
||
valid_custom := "test1,map[tag1:value1],test1,tag1=value1 value=1 1257894000000000000" | ||
func_custom, err := FormatMetric(k, p) | ||
|
||
if func_custom != valid_custom { | ||
t.Error("Expected ", valid_custom) | ||
} | ||
require.NoError(t, err) | ||
} |