Skip to content

Commit

Permalink
Merge pull request #379 from JohnRoesler/kafkaStartOffset
Browse files Browse the repository at this point in the history
add option to set kafka's StartOffset on the consumer
  • Loading branch information
maxekman authored Feb 11, 2022
2 parents 58d34e6 + 30ded5e commit 9c37770
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 12 deletions.
33 changes: 24 additions & 9 deletions eventbus/kafka/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type EventBus struct {
addr string
appID string
topic string
startOffset int64
client *kafka.Client
writer *kafka.Writer
registered map[eh.EventHandlerType]struct{}
Expand All @@ -51,14 +52,15 @@ func NewEventBus(addr, appID string, options ...Option) (*EventBus, error) {
ctx, cancel := context.WithCancel(context.Background())

b := &EventBus{
addr: addr,
appID: appID,
topic: appID + "_events",
registered: map[eh.EventHandlerType]struct{}{},
errCh: make(chan error, 100),
cctx: ctx,
cancel: cancel,
codec: &json.EventCodec{},
addr: addr,
appID: appID,
topic: appID + "_events",
startOffset: kafka.LastOffset, // Default: Don't read old messages.
registered: map[eh.EventHandlerType]struct{}{},
errCh: make(chan error, 100),
cctx: ctx,
cancel: cancel,
codec: &json.EventCodec{},
}

// Apply configuration options.
Expand Down Expand Up @@ -134,6 +136,19 @@ func WithCodec(codec eh.EventCodec) Option {
}
}

// WithStartOffset sets the consumer group's offset to start at
// Defaults to: LastOffset
// Per the kafka client documentation
// StartOffset determines from whence the consumer group should begin
// consuming when it finds a partition without a committed offset. If
// non-zero, it must be set to one of FirstOffset or LastOffset.
func WithStartOffset(startOffset int64) Option {
return func(b *EventBus) error {
b.startOffset = startOffset
return nil
}
}

// HandlerType implements the HandlerType method of the eventhorizon.EventHandler interface.
func (b *EventBus) HandlerType() eh.EventHandlerType {
return "eventbus"
Expand Down Expand Up @@ -197,7 +212,7 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event
GroupID: groupID, // Send messages to only one subscriber per group.
MaxWait: time.Second, // Allow to exit readloop in max 1s.
WatchPartitionChanges: true,
StartOffset: kafka.LastOffset, // Don't read old messages.
StartOffset: b.startOffset,
})

req := &kafka.ListGroupsRequest{
Expand Down
33 changes: 31 additions & 2 deletions eventbus/kafka/eventbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ import (
"crypto/rand"
"encoding/hex"
"fmt"
"github.com/stretchr/testify/assert"
"os"
"testing"
"time"

eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/eventbus"
"github.com/segmentio/kafka-go"
"github.com/stretchr/testify/require"
)

func TestAddHandlerIntegration(t *testing.T) {
Expand Down Expand Up @@ -100,7 +103,7 @@ func BenchmarkEventBus(b *testing.B) {
eventbus.Benchmark(b, bus)
}

func newTestEventBus(appID string) (eh.EventBus, string, error) {
func newTestEventBus(appID string, options ...Option) (eh.EventBus, string, error) {
// Connect to localhost if not running inside docker
addr := os.Getenv("KAFKA_ADDR")
if addr == "" {
Expand All @@ -117,10 +120,36 @@ func newTestEventBus(appID string) (eh.EventBus, string, error) {
appID = "app-" + hex.EncodeToString(b)
}

bus, err := NewEventBus(addr, appID)
bus, err := NewEventBus(addr, appID, options...)
if err != nil {
return nil, "", fmt.Errorf("could not create event bus: %w", err)
}

return bus, appID, nil
}

func TestWithStartOffset(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

testCases := map[string]struct {
startOffset int64
}{
"FirstOffset": {kafka.FirstOffset},
"zero": {0},
}

for desc, tc := range testCases {
t.Run(desc, func(t *testing.T) {
eb, _, err := newTestEventBus("", WithStartOffset(tc.startOffset))
if err != nil {
t.Fatal("there should be no error:", err)
}
require.NoError(t, err)

underlyingEb := eb.(*EventBus)
assert.Equal(t, tc.startOffset, underlyingEb.startOffset)
})
}
}
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/nats-io/nats.go v1.13.0
github.com/opentracing/opentracing-go v1.2.0
github.com/segmentio/kafka-go v0.4.25
github.com/stretchr/testify v1.7.0
github.com/uber/jaeger-client-go v2.29.1+incompatible
go.mongodb.org/mongo-driver v1.8.0
google.golang.org/api v0.61.0
Expand All @@ -27,6 +28,7 @@ require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 // indirect
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0 // indirect
github.com/envoyproxy/protoc-gen-validate v0.1.0 // indirect
Expand All @@ -42,10 +44,11 @@ require (
github.com/nats-io/nats-server/v2 v2.6.2 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.6.1 // indirect
github.com/stretchr/testify v1.7.0 // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.0.2 // indirect
Expand All @@ -63,4 +66,6 @@ require (
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 // indirect
google.golang.org/grpc v1.40.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
Expand Down Expand Up @@ -699,6 +700,7 @@ google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+Rur
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
Expand Down

0 comments on commit 9c37770

Please sign in to comment.