Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (

// Config holds all configuration values for a single Kinsumer instance
type Config struct {
stats StatReceiver
logger Logger
stats StatReceiver
logger Logger
fanoutConsumerARN string

// ---------- [ Per Shard Worker ] ----------
// Time to sleep if no records are found
Expand Down Expand Up @@ -59,6 +60,12 @@ func NewConfig() Config {
}
}

// WithEnhancedFanoutConsumerARN returns a Config that uses the supplied consumer ARN to enable Kinesis's enhanced fanout feature
func (c Config) WithEnhancedFanoutConsumerARN(arn string) Config {
c.fanoutConsumerARN = arn
return c
}

// WithThrottleDelay returns a Config with a modified throttle delay
func (c Config) WithThrottleDelay(delay time.Duration) Config {
c.throttleDelay = delay
Expand Down
80 changes: 79 additions & 1 deletion shard_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,12 @@ func (k *Kinsumer) consume(shardID string) {

sequenceNumber := checkpointer.sequenceNumber

evtCh := k.consumePolling(ctx, shardID, sequenceNumber)
var evtCh <-chan consumeEvent
if k.config.fanoutConsumerARN != "" {
evtCh = k.consumeFanout(ctx, shardID, sequenceNumber)
} else {
evtCh = k.consumePolling(ctx, shardID, sequenceNumber)
}

mainloop:
// Continue processing until both the checkpointer and event goroutines are done
Expand Down Expand Up @@ -292,3 +297,76 @@ func (k *Kinsumer) consumePolling(ctx context.Context, shardID string, sequenceN

return ch
}

// consumeFanout is a blocking call that captures then consumes the given shard in a loop.
// It is also responsible for writing out the checkpoint updates to dynamo.
// It is a subfunction of consume and shouldn't be called directly
func (k *Kinsumer) consumeFanout(ctx context.Context, shardID string, sequenceNumber string) <-chan consumeEvent {
ch := make(chan consumeEvent)

go func() {
defer close(ch)

for {
shardIteratorType := kinesis.ShardIteratorTypeAfterSequenceNumber
ps := aws.String(sequenceNumber)
if sequenceNumber == "" {
shardIteratorType = kinesis.ShardIteratorTypeTrimHorizon
ps = nil
} else if sequenceNumber == "LATEST" {
shardIteratorType = kinesis.ShardIteratorTypeLatest
ps = nil
}

out, err := k.kinesis.SubscribeToShard(&kinesis.SubscribeToShardInput{
ConsumerARN: aws.String(k.config.fanoutConsumerARN),
ShardId: aws.String(shardID),
StartingPosition: &kinesis.StartingPosition{
Type: aws.String(shardIteratorType),
SequenceNumber: ps,
},
})
if err != nil {
k.shardErrors <- shardConsumerError{shardID: shardID, action: "SubscribeToShard", err: err}
return
}

// This is an unbuffered channel of kinesis.SubscribeToShardEventStreamEvent
// The channel will close when the connection is lost
events := out.EventStream.Events()

getrecordloop:
for {
select {
case <-ctx.Done():
return
case e, ok := <-events:
if !ok {
break getrecordloop
}
switch ev := e.(type) {
case *kinesis.SubscribeToShardEvent:
sequenceNumber = aws.StringValue(ev.ContinuationSequenceNumber)
evnt := consumeEvent{
Records: ev.Records,
Lag: time.Duration(aws.Int64Value(ev.MillisBehindLatest)) * time.Millisecond,
SequenceNumber: sequenceNumber,
// TODO: how do we know when a shard is finished in enhanced fanout?
}

select {
case <-ctx.Done():
return
case ch <- evnt:
}

default:
// TODO: what to do if this is an unknown type?
}
}
}
}
}()

return ch
}