Skip to content

Commit

Permalink
Use DescribeStreamSummary in place of ListStreams in kinesis output (i…
Browse files Browse the repository at this point in the history
  • Loading branch information
tpounds authored and Mathieu Lecarme committed Apr 17, 2020
1 parent bf3a8ee commit 0b67395
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 37 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = "1.14.8"
version = "1.15.54"

[[constraint]]
name = "github.com/bsm/sarama-cluster"
Expand Down
40 changes: 7 additions & 33 deletions plugins/outputs/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package kinesis

import (
"log"
"os"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -115,17 +114,11 @@ func (k *KinesisOutput) Description() string {
return "Configuration for the AWS Kinesis output."
}

func checkstream(l []*string, s string) bool {
// Check if the StreamName exists in the slice returned from the ListStreams API request.
for _, stream := range l {
if *stream == s {
return true
}
func (k *KinesisOutput) Connect() error {
if k.Partition == nil {
log.Print("E! kinesis : Deprecated paritionkey configuration in use, please consider using outputs.kinesis.partition")
}
return false
}

func (k *KinesisOutput) Connect() error {
// We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using
// environment variables, and then Shared Credentials.
if k.Debug {
Expand All @@ -145,29 +138,10 @@ func (k *KinesisOutput) Connect() error {
configProvider := credentialConfig.Credentials()
svc := kinesis.New(configProvider)

KinesisParams := &kinesis.ListStreamsInput{
Limit: aws.Int64(100),
}

resp, err := svc.ListStreams(KinesisParams)

if err != nil {
log.Printf("E! kinesis: Error in ListSteams API call : %+v \n", err)
}

if checkstream(resp.StreamNames, k.StreamName) {
if k.Debug {
log.Printf("E! kinesis: Stream Exists")
}
k.svc = svc
return nil
} else {
log.Printf("E! kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName)
os.Exit(1)
}
if k.Partition == nil {
log.Print("E! kinesis : Deprecated paritionkey configuration in use, please consider using outputs.kinesis.partition")
}
_, err := svc.DescribeStreamSummary(&kinesis.DescribeStreamSummaryInput{
StreamName: aws.String(k.StreamName),
})
k.svc = svc
return err
}

Expand Down

0 comments on commit 0b67395

Please sign in to comment.