Skip to content

Commit

Permalink
fix(inputs.kinesis_consumer): Store last delivered message, not first
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Dec 13, 2024
1 parent d2e032e commit d79538b
Showing 1 changed file with 34 additions and 21 deletions.
55 changes: 34 additions & 21 deletions plugins/inputs/kinesis_consumer/kinesis_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
_ "embed"
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -49,7 +50,8 @@ type KinesisConsumer struct {

contentDecodingFunc decodingFunc

lastSeqNum string
lastSeqNum string
lastSeqNumTex sync.Mutex
}

type dynamoDB struct {
Expand Down Expand Up @@ -100,8 +102,34 @@ func (k *KinesisConsumer) Gather(acc telegraf.Accumulator) error {
if k.cons == nil {
return k.connect(acc)
}
// Enforce writing of last received sequence number
// Enforce writing of sequence number for the latest metric successfully
// delivered to the outputs. If no metric was delivered, skip that step.
k.lastSeqNumTex.Lock()
sequenceNum := k.lastSeqNum
k.lastSeqNumTex.Unlock()

if sequenceNum == "" {
return nil
}

// Store the sequence number at least once per gather cycle using the checkpoint
// storage (usually DynamoDB).
k.checkpointTex.Lock()
chk, ok := k.checkpoints[sequenceNum]
if !ok {
k.checkpointTex.Unlock()
return nil
}
delete(k.checkpoints, sequenceNum)
k.checkpointTex.Unlock()

k.Log.Tracef("persisting sequence number %q for stream %q and shard %q", sequenceNum, chk.streamName, chk.shardID)
if err := k.checkpoint.SetCheckpoint(chk.streamName, chk.shardID, sequenceNum); err != nil {
return fmt.Errorf("setting checkpoint failed: %w", err)
}
k.lastSeqNumTex.Lock()
k.lastSeqNum = ""
k.lastSeqNumTex.Unlock()

return nil
}
Expand Down Expand Up @@ -212,6 +240,7 @@ func (k *KinesisConsumer) connect(acc telegraf.Accumulator) error {
return nil
}

// onMessage is called for new messages consumed from Kinesis
func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consumer.Record) error {
data, err := k.contentDecodingFunc(r.Data)
if err != nil {
Expand All @@ -236,6 +265,7 @@ func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consume
return nil
}

// onDelivery is called for every metric successfully delivered to the outputs
func (k *KinesisConsumer) onDelivery(ctx context.Context) {
for {
select {
Expand All @@ -257,26 +287,9 @@ func (k *KinesisConsumer) onDelivery(ctx context.Context) {
continue
}

if k.lastSeqNum != "" {
continue
}

// Store the sequence number at least once per gather cycle using the checkpoint
// storage (usually DynamoDB).
k.checkpointTex.Lock()
chk, ok := k.checkpoints[sequenceNum]
if !ok {
k.checkpointTex.Unlock()
continue
}
delete(k.checkpoints, sequenceNum)
k.checkpointTex.Unlock()

k.Log.Tracef("persisting sequence number %q for stream %q and shard %q", sequenceNum)
k.lastSeqNumTex.Lock()
k.lastSeqNum = sequenceNum
if err := k.checkpoint.SetCheckpoint(chk.streamName, chk.shardID, sequenceNum); err != nil {
k.Log.Errorf("Setting checkpoint failed: %v", err)
}
k.lastSeqNumTex.Unlock()
}
}
}
Expand Down

0 comments on commit d79538b

Please sign in to comment.