Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,18 @@ jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v2
uses: actions/setup-go@v5
with:
go-version: 1.25.x
- name: Checkout code
uses: actions/checkout@v2
- name: Go mod cache
uses: actions/cache@v4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go1.25.x
cache: true
- name: Tools bin cache
uses: actions/cache@v4
with:
path: .bin
key: ${{ runner.os }}-go1.25.x-${{ hashFiles('Makefile') }}
key: ${{ runner.os }}-tools-${{ hashFiles('Makefile') }}
- name: Check
run: make check
- name: Test
Expand Down
12 changes: 9 additions & 3 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package errors

import (
"errors"
"sort"
"strings"
)

Expand All @@ -22,10 +23,15 @@ type ErrorTags struct {

// Error returns the error message with the tags attached.
func (e *ErrorTags) Error() string {
md := []string{}
keys := make([]string, 0, len(e.tags))
for k := range e.tags {
keys = append(keys, k)
}
sort.Strings(keys)

for k, v := range e.tags {
md = append(md, k+"="+v)
md := make([]string, 0, len(e.tags))
for _, k := range keys {
md = append(md, k+"="+e.tags[k])
}

return e.err.Error() + " [" + strings.Join(md, ", ") + "]"
Expand Down
2 changes: 1 addition & 1 deletion errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestWrap_MultipleWraps(t *testing.T) {
wrapped1 := errors.Wrap(err, "foo", "bar")
wrapped2 := errors.Wrap(wrapped1, "baz", "qux")

assert.Equal(t, "test error [foo=bar, baz=qux]", wrapped2.Error())
assert.Equal(t, "test error [baz=qux, foo=bar]", wrapped2.Error())
}

func TestWrap_PanicsOnOddNumberOfAttrs(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions xkafka/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ func (b *Batch) GroupMaxOffset() []kafka.TopicPartition {
offsets := make(map[string]map[int32]int64)
for _, m := range b.Messages {
if _, ok := offsets[m.Topic]; !ok {
offsets[m.Topic] = make(map[int32]int64)
offsets[m.Topic] = map[int32]int64{
m.Partition: m.Offset,
}
}

if m.Offset > offsets[m.Topic][m.Partition] {
Expand All @@ -102,7 +104,7 @@ func (b *Batch) GroupMaxOffset() []kafka.TopicPartition {
tps = append(tps, kafka.TopicPartition{
Topic: &topic,
Partition: partition,
Offset: kafka.Offset(offset + 1),
Offset: kafka.Offset(offset),
})
}
}
Expand Down
102 changes: 96 additions & 6 deletions xkafka/batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -20,6 +21,10 @@ type BatchConsumer struct {
middlewares []BatchMiddlewarer
config *consumerConfig
stopOffset atomic.Bool

// partition tracking
mu sync.Mutex
activePartitions map[string]map[int32]struct{}
}

// NewBatchConsumer creates a new BatchConsumer instance.
Expand All @@ -44,10 +49,11 @@ func NewBatchConsumer(name string, handler BatchHandler, opts ...ConsumerOption)
}

return &BatchConsumer{
name: name,
config: cfg,
kafka: consumer,
handler: handler,
name: name,
config: cfg,
kafka: consumer,
handler: handler,
activePartitions: make(map[string]map[int32]struct{}),
}, nil
}

Expand Down Expand Up @@ -255,7 +261,24 @@ func (c *BatchConsumer) storeBatch(batch *Batch) error {
return nil
}

tps := batch.GroupMaxOffset()
allTps := batch.GroupMaxOffset()

// filter to only active partitions
tps := make([]kafka.TopicPartition, 0, len(allTps))
for _, tp := range allTps {
if tp.Topic != nil && c.isPartitionActive(*tp.Topic, tp.Partition) {
// similar to StoreMessage in confluent-kafka-go/consumer.go
// tp.Offset + 1 it ensures that the consumer starts with
// next message when it restarts
tp.Offset = kafka.Offset(tp.Offset + 1)

tps = append(tps, tp)
}
}

if len(tps) == 0 {
return nil
}

_, err := c.kafka.StoreOffsets(tps)
if err != nil {
Expand All @@ -281,7 +304,74 @@ func (c *BatchConsumer) concatMiddlewares(h BatchHandler) BatchHandler {
}

func (c *BatchConsumer) subscribe() error {
return c.kafka.SubscribeTopics(c.config.topics, nil)
return c.kafka.SubscribeTopics(c.config.topics, c.rebalanceCallback)
}

func (c *BatchConsumer) rebalanceCallback(_ *kafka.Consumer, event kafka.Event) error {
switch e := event.(type) {
case kafka.AssignedPartitions:
c.onPartitionsAssigned(e.Partitions)
return c.kafka.Assign(e.Partitions)

case kafka.RevokedPartitions:
if err := c.kafka.Unassign(); err != nil {
return err
}

c.onPartitionsRevoked(e.Partitions)
}

return nil
}

func (c *BatchConsumer) onPartitionsAssigned(partitions []kafka.TopicPartition) {
c.mu.Lock()
defer c.mu.Unlock()

for _, tp := range partitions {
if tp.Topic == nil {
continue
}

topic := *tp.Topic
if c.activePartitions[topic] == nil {
c.activePartitions[topic] = make(map[int32]struct{})
}

c.activePartitions[topic][tp.Partition] = struct{}{}
}
}

func (c *BatchConsumer) onPartitionsRevoked(partitions []kafka.TopicPartition) {
c.mu.Lock()
defer c.mu.Unlock()

for _, tp := range partitions {
if tp.Topic == nil {
continue
}

topic := *tp.Topic
if c.activePartitions[topic] != nil {
delete(c.activePartitions[topic], tp.Partition)

if len(c.activePartitions[topic]) == 0 {
delete(c.activePartitions, topic)
}
}
}
}

func (c *BatchConsumer) isPartitionActive(topic string, partition int32) bool {
c.mu.Lock()
defer c.mu.Unlock()

if partitions, ok := c.activePartitions[topic]; ok {
_, active := partitions[partition]
return active
}

return false
}

func (c *BatchConsumer) unsubscribe() error {
Expand Down
Loading