Skip to content

Commit da48b69

Browse files
committed
chore: add more tests
1 parent 345d8af commit da48b69

File tree

3 files changed

+207
-19
lines changed

3 files changed

+207
-19
lines changed

kq/pusher.go

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -143,23 +143,10 @@ func (p *Pusher) PushWithKey(ctx context.Context, key, v string) error {
143143
}
144144
}
145145

146-
// SetWriterBalancer set kafka-go custom writer balancer.
147-
func (p *Pusher) SetWriterBalancer(balancer kafka.Balancer) {
148-
if p.producer != nil {
149-
p.producer.Balancer = balancer
150-
}
151-
}
152-
153-
// PushWithKey sends a message to the Kafka topic with custom message key.
154-
func (p *Pusher) PushWithKey(k, v string) error {
155-
msg := kafka.Message{
156-
Key: []byte(k), // custom message key
157-
Value: []byte(v),
158-
}
159-
if p.executor != nil {
160-
return p.executor.Add(msg, len(v))
161-
} else {
162-
return p.producer.WriteMessages(context.Background(), msg)
146+
// WithAllowAutoTopicCreation allows the Pusher to create the given topic if it does not exist.
147+
func WithAllowAutoTopicCreation() PushOption {
148+
return func(options *pushOptions) {
149+
options.allowAutoTopicCreation = true
163150
}
164151
}
165152

kq/queue.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ type (
4242
Consume(ctx context.Context, key, value string) error
4343
}
4444

45+
kafkaReader interface {
46+
FetchMessage(ctx context.Context) (kafka.Message, error)
47+
CommitMessages(ctx context.Context, msgs ...kafka.Message) error
48+
Close() error
49+
}
50+
4551
queueOptions struct {
4652
commitInterval time.Duration
4753
queueCapacity int
@@ -54,7 +60,7 @@ type (
5460

5561
kafkaQueue struct {
5662
c KqConf
57-
consumer *kafka.Reader
63+
consumer kafkaReader
5864
handler ConsumeHandler
5965
channel chan kafka.Message
6066
producerRoutines *threading.RoutineGroup
@@ -191,7 +197,7 @@ func (q *kafkaQueue) Start() {
191197
q.producerRoutines.Wait()
192198
close(q.channel)
193199
q.consumerRoutines.Wait()
194-
200+
195201
logx.Infof("Consumer %s is closed", q.c.Name)
196202
}
197203
}

kq/queue_test.go

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
package kq
2+
3+
import (
4+
"context"
5+
"io"
6+
"testing"
7+
"time"
8+
9+
"github.com/segmentio/kafka-go"
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/mock"
12+
"github.com/zeromicro/go-zero/core/service"
13+
"github.com/zeromicro/go-zero/core/stat"
14+
"github.com/zeromicro/go-zero/core/threading"
15+
)
16+
17+
// mockKafkaReader is a mock for kafka.Reader
18+
type mockKafkaReader struct {
19+
mock.Mock
20+
}
21+
22+
func (m *mockKafkaReader) FetchMessage(ctx context.Context) (kafka.Message, error) {
23+
args := m.Called(ctx)
24+
return args.Get(0).(kafka.Message), args.Error(1)
25+
}
26+
27+
func (m *mockKafkaReader) CommitMessages(ctx context.Context, msgs ...kafka.Message) error {
28+
args := m.Called(ctx, msgs)
29+
return args.Error(0)
30+
}
31+
32+
func (m *mockKafkaReader) Close() error {
33+
args := m.Called()
34+
return args.Error(0)
35+
}
36+
37+
// mockConsumeHandler is a mock for ConsumeHandler
38+
type mockConsumeHandler struct {
39+
mock.Mock
40+
}
41+
42+
func (m *mockConsumeHandler) Consume(ctx context.Context, key, value string) error {
43+
args := m.Called(ctx, key, value)
44+
return args.Error(0)
45+
}
46+
47+
func TestNewQueue(t *testing.T) {
48+
c := KqConf{
49+
ServiceConf: service.ServiceConf{
50+
Name: "test-queue",
51+
},
52+
Brokers: []string{"localhost:9092"},
53+
Group: "test-group",
54+
Topic: "test-topic",
55+
Offset: "first",
56+
Conns: 1,
57+
}
58+
handler := &mockConsumeHandler{}
59+
60+
q, err := NewQueue(c, handler)
61+
assert.NoError(t, err)
62+
assert.NotNil(t, q)
63+
}
64+
65+
func TestKafkaQueue_consumeOne(t *testing.T) {
66+
handler := &mockConsumeHandler{}
67+
q := &kafkaQueue{
68+
handler: handler,
69+
metrics: stat.NewMetrics("test"),
70+
}
71+
72+
ctx := context.Background()
73+
key := "test-key"
74+
value := "test-value"
75+
76+
handler.On("Consume", ctx, key, value).Return(nil)
77+
78+
err := q.consumeOne(ctx, key, value)
79+
assert.NoError(t, err)
80+
handler.AssertExpectations(t)
81+
}
82+
83+
func TestKafkaQueue_consume(t *testing.T) {
84+
mockReader := &mockKafkaReader{}
85+
q := &kafkaQueue{
86+
consumer: mockReader,
87+
}
88+
89+
msg := kafka.Message{
90+
Key: []byte("test-key"),
91+
Value: []byte("test-value"),
92+
}
93+
94+
mockReader.On("FetchMessage", mock.Anything).Return(msg, nil).Once()
95+
mockReader.On("FetchMessage", mock.Anything).Return(kafka.Message{}, io.EOF).Once()
96+
97+
called := false
98+
err := q.consume(func(msg kafka.Message) {
99+
called = true
100+
assert.Equal(t, "test-key", string(msg.Key))
101+
assert.Equal(t, "test-value", string(msg.Value))
102+
})
103+
104+
assert.Error(t, err)
105+
assert.True(t, called)
106+
mockReader.AssertExpectations(t)
107+
}
108+
109+
func TestKafkaQueue_Start(t *testing.T) {
110+
mockReader := &mockKafkaReader{}
111+
handler := &mockConsumeHandler{}
112+
q := &kafkaQueue{
113+
c: KqConf{
114+
ServiceConf: service.ServiceConf{
115+
Name: "test-queue",
116+
},
117+
Processors: 1,
118+
Consumers: 1,
119+
},
120+
consumer: mockReader,
121+
handler: handler,
122+
consumerRoutines: threading.NewRoutineGroup(),
123+
producerRoutines: threading.NewRoutineGroup(),
124+
channel: make(chan kafka.Message, 1),
125+
metrics: stat.NewMetrics("test"),
126+
}
127+
128+
msg := kafka.Message{
129+
Key: []byte("test-key"),
130+
Value: []byte("test-value"),
131+
}
132+
133+
mockReader.On("FetchMessage", mock.Anything).Return(msg, nil).Once()
134+
mockReader.On("FetchMessage", mock.Anything).Return(kafka.Message{}, io.EOF).Once()
135+
handler.On("Consume", mock.Anything, "test-key", "test-value").Return(nil)
136+
mockReader.On("CommitMessages", mock.Anything, []kafka.Message{msg}).Return(nil)
137+
138+
go func() {
139+
time.Sleep(100 * time.Millisecond)
140+
q.Stop()
141+
}()
142+
143+
q.Start()
144+
145+
mockReader.AssertExpectations(t)
146+
handler.AssertExpectations(t)
147+
}
148+
149+
func TestKafkaQueue_Stop(t *testing.T) {
150+
mockReader := &mockKafkaReader{}
151+
q := &kafkaQueue{
152+
consumer: mockReader,
153+
}
154+
155+
mockReader.On("Close").Return(nil)
156+
157+
q.Stop()
158+
159+
mockReader.AssertExpectations(t)
160+
}
161+
162+
func TestWithCommitInterval(t *testing.T) {
163+
options := &queueOptions{}
164+
interval := time.Second * 5
165+
WithCommitInterval(interval)(options)
166+
assert.Equal(t, interval, options.commitInterval)
167+
}
168+
169+
func TestWithQueueCapacity(t *testing.T) {
170+
options := &queueOptions{}
171+
capacity := 100
172+
WithQueueCapacity(capacity)(options)
173+
assert.Equal(t, capacity, options.queueCapacity)
174+
}
175+
176+
func TestWithMaxWait(t *testing.T) {
177+
options := &queueOptions{}
178+
wait := time.Second * 2
179+
WithMaxWait(wait)(options)
180+
assert.Equal(t, wait, options.maxWait)
181+
}
182+
183+
func TestWithMetrics(t *testing.T) {
184+
options := &queueOptions{}
185+
metrics := stat.NewMetrics("test")
186+
WithMetrics(metrics)(options)
187+
assert.Equal(t, metrics, options.metrics)
188+
}
189+
190+
func TestWithErrorHandler(t *testing.T) {
191+
options := &queueOptions{}
192+
handler := func(ctx context.Context, msg kafka.Message, err error) {}
193+
WithErrorHandler(handler)(options)
194+
assert.NotNil(t, options.errorHandler)
195+
}

0 commit comments

Comments
 (0)