-
Notifications
You must be signed in to change notification settings - Fork 3
/
consumer_test.go
52 lines (47 loc) · 982 Bytes
/
consumer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package consumer
import (
"fmt"
"testing"
)
func TestGetOffset(t *testing.T) {
consumer := getConsumer(t)
offset, err := consumer.Offset("test", 0, "test-consumergroup-b")
if err != nil {
t.Fatal(err)
}
fmt.Println("get offset: ", offset)
}
func TestConsumeAll(t *testing.T) {
consumer := getConsumer(t)
for partition := int32(0); partition < 3; partition++ {
values, err := consumer.Consume("test", partition, 0)
if err != nil {
t.Fatal(err)
}
fmt.Println("Partition", partition)
for _, value := range values {
fmt.Println(string(value))
}
fmt.Println()
}
}
func TestCommitOffset(t *testing.T) {
consumer := getConsumer(t)
err := consumer.Commit("test", 0, "test-consumergroup-b", 2)
if err != nil {
t.Fatal(err)
}
}
func getConsumer(t *testing.T) *C {
consumer, err := New(DefaultConfig(
"docker:32776",
"docker:32777",
"docker:32778",
"docker:32779",
"docker:32780",
))
if err != nil {
t.Fatal(err)
}
return consumer
}