Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions plugins/inputs/kinesis_consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,10 @@ func (c *consumer) updateShardConsumers(ctx context.Context) error {

// Filter out all shards actively consumed already
inactiveShards := make([]types.Shard, 0, len(availableShards))
availableShardIDs := make(map[string]bool, len(availableShards))
for _, shard := range availableShards {
id := *shard.ShardId
availableShardIDs[id] = true
if _, found := c.shardConsumers[id]; found {
c.log.Tracef("shard %s is actively consumed...", id)
continue
Expand Down Expand Up @@ -268,16 +270,31 @@ func (c *consumer) updateShardConsumers(ctx context.Context) error {
// by the call to `GetRecords` as a child later.
if shard.ParentShardId != nil && *shard.ParentShardId != "" {
pid := *shard.ParentShardId
if !c.shardsConsumed[pid] {
c.log.Tracef("shard %s has parent %s which is not fully consumed yet...", id, pid)
continue

// The parent shard might be expired and thus not available anymore.
// In those cases, we need to start consuming the child shard
// instead. Data in the parent shard is lost.
if availableShardIDs[pid] {
if !c.shardsConsumed[pid] {
c.log.Tracef("shard %s has parent %s which is not fully consumed yet...", id, pid)
continue
}
} else {
c.log.Tracef("shard %s has parent %s which is is expired...", id, pid)
}
}
if shard.AdjacentParentShardId != nil && *shard.AdjacentParentShardId != "" {
if shard.AdjacentParentShardId != nil && *shard.AdjacentParentShardId != "" && availableShardIDs[*shard.AdjacentParentShardId] {
pid := *shard.AdjacentParentShardId
if !c.shardsConsumed[pid] {
c.log.Tracef("shard %s has adjacent parent %s which is not fully consumed yet...", id, pid)
continue
// The parent shard might be expired and thus not available anymore.
// In those cases, we need to start consuming the child shard
// instead. Data in the parent shard is lost.
if availableShardIDs[pid] {
if !c.shardsConsumed[pid] {
c.log.Tracef("shard %s has adjacent parent %s which is not fully consumed yet...", id, pid)
continue
}
} else {
c.log.Tracef("shard %s has adjacent parent %s which is is expired...", id, pid)
}
}

Expand Down
Loading