Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(inputs.kinesis_consumer): Cleanup code #16267

Merged
merged 2 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions plugins/inputs/kinesis_consumer/encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package kinesis_consumer

import (
"bytes"
"compress/gzip"
"compress/zlib"
"fmt"
"io"
)

type decodingFunc func([]byte) ([]byte, error)

func processGzip(data []byte) ([]byte, error) {
zipData, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
defer zipData.Close()
return io.ReadAll(zipData)
}

func processZlib(data []byte) ([]byte, error) {
zlibData, err := zlib.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
defer zlibData.Close()
return io.ReadAll(zlibData)
}

func processNoOp(data []byte) ([]byte, error) {
return data, nil
}

func getDecodingFunc(encoding string) (decodingFunc, error) {
switch encoding {
case "gzip":
return processGzip, nil
case "zlib":
return processZlib, nil
case "none", "identity", "":
return processNoOp, nil
}
return nil, fmt.Errorf("unknown content encoding %q", encoding)
}
250 changes: 83 additions & 167 deletions plugins/inputs/kinesis_consumer/kinesis_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,15 @@
package kinesis_consumer

import (
"bytes"
"compress/gzip"
"compress/zlib"
"context"
_ "embed"
"errors"
"fmt"
"io"
"math/big"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/smithy-go/logging"
consumer "github.com/harlow/kinesis-consumer"
"github.com/harlow/kinesis-consumer/store/ddb"

Expand All @@ -31,86 +23,85 @@ import (
//go:embed sample.conf
var sampleConfig string

var (
once sync.Once
// this is the largest sequence number allowed - https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SequenceNumberRange.html
maxSeq = strToBint(strings.Repeat("9", 129))
negOne *big.Int
)

const (
defaultMaxUndeliveredMessages = 1000
)

type (
KinesisConsumer struct {
StreamName string `toml:"streamname"`
ShardIteratorType string `toml:"shard_iterator_type"`
DynamoDB *dynamoDB `toml:"checkpoint_dynamodb"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
ContentEncoding string `toml:"content_encoding"`

Log telegraf.Logger `toml:"-"`

cons *consumer.Consumer
parser telegraf.Parser
cancel context.CancelFunc
acc telegraf.TrackingAccumulator
sem chan struct{}
var once sync.Once

type KinesisConsumer struct {
StreamName string `toml:"streamname"`
ShardIteratorType string `toml:"shard_iterator_type"`
DynamoDB *dynamoDB `toml:"checkpoint_dynamodb"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
ContentEncoding string `toml:"content_encoding"`
Log telegraf.Logger `toml:"-"`
common_aws.CredentialConfig

cons *consumer.Consumer
parser telegraf.Parser
cancel context.CancelFunc
acc telegraf.TrackingAccumulator
sem chan struct{}

checkpoint consumer.Store
checkpoints map[string]checkpoint
records map[telegraf.TrackingID]string
checkpointTex sync.Mutex
recordsTex sync.Mutex
wg sync.WaitGroup

contentDecodingFunc decodingFunc

lastSeqNum string
}

checkpoint consumer.Store
checkpoints map[string]checkpoint
records map[telegraf.TrackingID]string
checkpointTex sync.Mutex
recordsTex sync.Mutex
wg sync.WaitGroup
type dynamoDB struct {
AppName string `toml:"app_name"`
TableName string `toml:"table_name"`
}

processContentEncodingFunc processContent
type checkpoint struct {
streamName string
shardID string
}

lastSeqNum *big.Int
func (*KinesisConsumer) SampleConfig() string {
return sampleConfig
}

common_aws.CredentialConfig
func (k *KinesisConsumer) Init() error {
// Set defaults
if k.MaxUndeliveredMessages < 1 {
k.MaxUndeliveredMessages = 1000
}

dynamoDB struct {
AppName string `toml:"app_name"`
TableName string `toml:"table_name"`
if k.ShardIteratorType == "" {
k.ShardIteratorType = "TRIM_HORIZON"
}

checkpoint struct {
streamName string
shardID string
if k.ContentEncoding == "" {
k.ContentEncoding = "identity"
}
)

type processContent func([]byte) ([]byte, error)

func (*KinesisConsumer) SampleConfig() string {
return sampleConfig
}
f, err := getDecodingFunc(k.ContentEncoding)
if err != nil {
return err
}
k.contentDecodingFunc = f

func (k *KinesisConsumer) Init() error {
return k.configureProcessContentEncodingFunc()
return nil
}

func (k *KinesisConsumer) SetParser(parser telegraf.Parser) {
k.parser = parser
}

func (k *KinesisConsumer) Start(ac telegraf.Accumulator) error {
err := k.connect(ac)
if err != nil {
return err
}

return nil
func (k *KinesisConsumer) Start(acc telegraf.Accumulator) error {
return k.connect(acc)
}

func (k *KinesisConsumer) Gather(acc telegraf.Accumulator) error {
if k.cons == nil {
return k.connect(acc)
}
k.lastSeqNum = maxSeq
// Enforce writing of last received sequence number
k.lastSeqNum = ""

return nil
}
Expand Down Expand Up @@ -138,7 +129,7 @@ func (k *KinesisConsumer) SetCheckpoint(streamName, shardID, sequenceNumber stri
return nil
}

func (k *KinesisConsumer) connect(ac telegraf.Accumulator) error {
func (k *KinesisConsumer) connect(acc telegraf.Accumulator) error {
cfg, err := k.CredentialConfig.Credentials()
if err != nil {
return err
Expand Down Expand Up @@ -180,7 +171,7 @@ func (k *KinesisConsumer) connect(ac telegraf.Accumulator) error {

k.cons = cons

k.acc = ac.WithTracking(k.MaxUndeliveredMessages)
k.acc = acc.WithTracking(k.MaxUndeliveredMessages)
k.records = make(map[telegraf.TrackingID]string, k.MaxUndeliveredMessages)
k.checkpoints = make(map[string]checkpoint, k.MaxUndeliveredMessages)
k.sem = make(chan struct{}, k.MaxUndeliveredMessages)
Expand All @@ -204,8 +195,7 @@ func (k *KinesisConsumer) connect(ac telegraf.Accumulator) error {
case k.sem <- struct{}{}:
break
}
err := k.onMessage(k.acc, r)
if err != nil {
if err := k.onMessage(k.acc, r); err != nil {
<-k.sem
k.Log.Errorf("Scan parser error: %v", err)
}
Expand All @@ -223,7 +213,7 @@ func (k *KinesisConsumer) connect(ac telegraf.Accumulator) error {
}

func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consumer.Record) error {
data, err := k.processContentEncodingFunc(r.Data)
data, err := k.contentDecodingFunc(r.Data)
if err != nil {
return err
}
Expand Down Expand Up @@ -262,111 +252,37 @@ func (k *KinesisConsumer) onDelivery(ctx context.Context) {
delete(k.records, info.ID())
k.recordsTex.Unlock()

if info.Delivered() {
k.checkpointTex.Lock()
chk, ok := k.checkpoints[sequenceNum]
if !ok {
k.checkpointTex.Unlock()
continue
}
delete(k.checkpoints, sequenceNum)
k.checkpointTex.Unlock()

// at least once
if strToBint(sequenceNum).Cmp(k.lastSeqNum) > 0 {
continue
}

k.lastSeqNum = strToBint(sequenceNum)
if err := k.checkpoint.SetCheckpoint(chk.streamName, chk.shardID, sequenceNum); err != nil {
k.Log.Debugf("Setting checkpoint failed: %v", err)
}
} else {
if !info.Delivered() {
k.Log.Debug("Metric group failed to process")
continue
}
}
}
}

func processGzip(data []byte) ([]byte, error) {
zipData, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
defer zipData.Close()
return io.ReadAll(zipData)
}

func processZlib(data []byte) ([]byte, error) {
zlibData, err := zlib.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
defer zlibData.Close()
return io.ReadAll(zlibData)
}

func processNoOp(data []byte) ([]byte, error) {
return data, nil
}

func strToBint(s string) *big.Int {
n, ok := new(big.Int).SetString(s, 10)
if !ok {
return negOne
}
return n
}

func (k *KinesisConsumer) configureProcessContentEncodingFunc() error {
switch k.ContentEncoding {
case "gzip":
k.processContentEncodingFunc = processGzip
case "zlib":
k.processContentEncodingFunc = processZlib
case "none", "identity", "":
k.processContentEncodingFunc = processNoOp
default:
return fmt.Errorf("unknown content encoding %q", k.ContentEncoding)
}
return nil
}

type telegrafLoggerWrapper struct {
telegraf.Logger
}
if k.lastSeqNum != "" {
continue
}

func (t *telegrafLoggerWrapper) Log(args ...interface{}) {
t.Trace(args...)
}
// Store the sequence number at least once per gather cycle using the checkpoint
// storage (usually DynamoDB).
k.checkpointTex.Lock()
chk, ok := k.checkpoints[sequenceNum]
if !ok {
k.checkpointTex.Unlock()
continue
}
delete(k.checkpoints, sequenceNum)
k.checkpointTex.Unlock()

func (t *telegrafLoggerWrapper) Logf(classification logging.Classification, format string, v ...interface{}) {
switch classification {
case logging.Debug:
format = "DEBUG " + format
case logging.Warn:
format = "WARN" + format
default:
format = "INFO " + format
k.Log.Tracef("persisting sequence number %q for stream %q and shard %q", sequenceNum)
k.lastSeqNum = sequenceNum
if err := k.checkpoint.SetCheckpoint(chk.streamName, chk.shardID, sequenceNum); err != nil {
k.Log.Errorf("Setting checkpoint failed: %v", err)
}
}
}
t.Logger.Tracef(format, v...)
}

// noopStore implements the storage interface with discard
type noopStore struct{}

func (n noopStore) SetCheckpoint(_, _, _ string) error { return nil }
func (n noopStore) GetCheckpoint(_, _ string) (string, error) { return "", nil }

func init() {
negOne, _ = new(big.Int).SetString("-1", 10)

inputs.Add("kinesis_consumer", func() telegraf.Input {
return &KinesisConsumer{
ShardIteratorType: "TRIM_HORIZON",
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
lastSeqNum: maxSeq,
ContentEncoding: "identity",
}
return &KinesisConsumer{}
})
}
Loading
Loading