Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package kinsumer

import (
"time"

"github.com/aws/aws-sdk-go/service/kinesis"
)

//TODO: Update documentation to include the defaults
Expand Down Expand Up @@ -41,11 +43,17 @@ type Config struct {
dynamoWriteCapacity int64
// Time to wait between attempts to verify tables were created/deleted completely
dynamoWaiterDelay time.Duration

// ---------- [ For the Stream Starting Point ] ----------
shardIteratorType string
atTimestamp *time.Time
sequenceNumber string
}

// NewConfig returns a default Config struct
func NewConfig() Config {
return Config{
shardIteratorType: kinesis.ShardIteratorTypeAfterSequenceNumber,
throttleDelay: 250 * time.Millisecond,
commitFrequency: 1000 * time.Millisecond,
shardCheckFrequency: 1 * time.Minute,
Expand Down Expand Up @@ -119,6 +127,39 @@ func (c Config) WithLogger(logger Logger) Config {
return c
}

// WithShardIteratorAtTimestamp returns a Config with a modified at timestamp and sets shardIteratorType to AT_TIMESTAMP
func (c Config) WithShardIteratorAtTimestamp(t time.Time) Config {
c.shardIteratorType = kinesis.ShardIteratorTypeAtTimestamp
c.atTimestamp = &t
return c
}

// WithShardIteratorLatest returns a Config that sets shardIteratorType to LATEST
func (c Config) WithShardIteratorLatest() Config {
c.shardIteratorType = kinesis.ShardIteratorTypeLatest
return c
}

// WithShardIteratorLatest returns a Config that sets shardIteratorType to AT_SEQUENCE_NUMBER
func (c Config) WithShardIteratorAtSequenceNumber(sequenceNumber string) Config {
c.shardIteratorType = kinesis.ShardIteratorTypeLatest
c.sequenceNumber = sequenceNumber
return c
}

// WithShardIteratorAfterSequenceNumber returns a Config that sets shardIteratorType to AFTER_SEQUENCE_NUMBER
func (c Config) WithShardIteratorAfterSequenceNumber(sequenceNumber string) Config {
c.shardIteratorType = kinesis.ShardIteratorTypeAfterSequenceNumber
c.sequenceNumber = sequenceNumber
return c
}

// WithShardIteratorTrimHorizon returns a Config that sets shardIteratorType to TRIM_HORIZON
func (c Config) WithShardIteratorTrimHorizon() Config {
c.shardIteratorType = kinesis.ShardIteratorTypeTrimHorizon
return c
}

// Verify that a config struct has sane and valid values
func validateConfig(c *Config) error {
if c.throttleDelay < 200*time.Millisecond {
Expand Down
3 changes: 3 additions & 0 deletions kinsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Kinsumer struct {
leaderWG sync.WaitGroup // waitGroup for the leader loop
maxAgeForClientRecord time.Duration // Cutoff for client/checkpoint records we read from dynamodb before we assume the record is stale
maxAgeForLeaderRecord time.Duration // Cutoff for leader/shard cache records we read from dynamodb before we assume the record is stale
fromCheckpoint bool // if there is already a consumer from the shard, we should move on from the checkpoint
}

// New returns a Kinsumer Interface with default kinesis and dynamodb instances, to be used in ec2 instances to get default auth and config
Expand Down Expand Up @@ -203,6 +204,8 @@ func (k *Kinsumer) startConsumers() error {
assigned := false

if k.thisClient >= len(k.shardIDs) {
// as there is already a consumer running, we should move from checkpoint onwards
k.fromCheckpoint = true
return nil
}

Expand Down
56 changes: 45 additions & 11 deletions shard_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,47 @@ const (
)

// getShardIterator gets a shard iterator after the last sequence number we read or at the start of the stream
func getShardIterator(k kinesisiface.KinesisAPI, streamName string, shardID string, sequenceNumber string) (string, error) {
shardIteratorType := kinesis.ShardIteratorTypeAfterSequenceNumber

// If we do not have a sequenceNumber yet we need to get a shardIterator
// from the horizon
func getShardIterator(
k kinesisiface.KinesisAPI,
streamName, shardID, shardIteratorType, sequenceNumber string,
timestamp *time.Time,
fromCheckpoint bool,
) (string, error) {
ps := aws.String(sequenceNumber)
if sequenceNumber == "" {
shardIteratorType = kinesis.ShardIteratorTypeTrimHorizon
ps = nil
} else if sequenceNumber == "LATEST" {
shardIteratorType = kinesis.ShardIteratorTypeLatest

var ts *time.Time

switch shardIteratorType {
case kinesis.ShardIteratorTypeAfterSequenceNumber, kinesis.ShardIteratorTypeAtSequenceNumber:
// If we do not have a sequenceNumber yet we need to get a shardIterator
// from the horizon
if sequenceNumber == "" {
shardIteratorType = kinesis.ShardIteratorTypeTrimHorizon
ps = nil
} else if sequenceNumber == "LATEST" {
shardIteratorType = kinesis.ShardIteratorTypeLatest
ps = nil
}

case kinesis.ShardIteratorTypeLatest, kinesis.ShardIteratorTypeTrimHorizon, kinesis.ShardIteratorTypeAtTimestamp:
if timestamp != nil {
ts = timestamp
}
ps = nil
}
if fromCheckpoint && sequenceNumber != "" && sequenceNumber != "LATEST" {
// we have a sequence number and se should from it
shardIteratorType = kinesis.ShardIteratorTypeAfterSequenceNumber
ps = aws.String(sequenceNumber)
ts = nil
}

resp, err := k.GetShardIterator(&kinesis.GetShardIteratorInput{
ShardId: aws.String(shardID),
ShardIteratorType: &shardIteratorType,
StartingSequenceNumber: ps,
StreamName: aws.String(streamName),
Timestamp: ts,
})
return aws.StringValue(resp.ShardIterator), err
}
Expand Down Expand Up @@ -137,8 +159,20 @@ func (k *Kinsumer) consume(shardID string) {
}
}()

if k.config.sequenceNumber != "" {
sequenceNumber = k.config.sequenceNumber
}

// Get the starting shard iterator
iterator, err := getShardIterator(k.kinesis, k.streamName, shardID, sequenceNumber)
iterator, err := getShardIterator(
k.kinesis,
k.streamName,
shardID,
k.config.shardIteratorType,
sequenceNumber,
k.config.atTimestamp,
k.fromCheckpoint,
)
if err != nil {
k.shardErrors <- shardConsumerError{shardID: shardID, action: "getShardIterator", err: err}
return
Expand Down