-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstream_consumer.go
81 lines (67 loc) · 1.68 KB
/
stream_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package epee
import (
"github.com/Shopify/sarama"
"log"
"sync"
)
type streamConsumer struct {
// Indicates whether or not we should kill this.
closing bool
// The wait group to use when creating the consumer.
wg sync.WaitGroup
// The partition consumer that is backing this whoooole thing.
partitionConsumer sarama.PartitionConsumer
// The channel to deliver messages to
dst chan *Message
}
func (sc *streamConsumer) run() {
// Always make sure these guys get taken care of when we exit this function.
defer func() {
close(sc.dst)
sc.partitionConsumer.Close()
sc.wg.Done()
}()
messages := sc.partitionConsumer.Messages()
errors := sc.partitionConsumer.Errors()
for {
if sc.closing {
break
}
select {
case message := <-messages:
// We'll instantiate one of our own messages and keep it 'round incase we
// want to keep goin'.
sc.dst <- &Message{
Offset: message.Offset,
Value: message.Value,
Topic: message.Topic,
}
case err := <-errors:
if err.Err == sarama.ErrClosedClient {
log.Printf("ERROR: Kafka connection is closed. Stopping consumer.", err.Err)
break
} else {
log.Printf("ERROR: Issue pulling from Kafka. %v\n", err.Err)
}
}
}
}
func (sc *streamConsumer) Messages() <-chan *Message {
return sc.dst
}
func (sc *streamConsumer) Start() {
sc.wg.Add(1)
go sc.run()
}
func (sc *streamConsumer) Close() {
// Signal that it's time for this guy to stop.
sc.closing = true
// Wait for it to stop now...
sc.wg.Wait()
}
func newStreamConsumer(ch chan *Message, partitionConsumer sarama.PartitionConsumer) *streamConsumer {
sc := new(streamConsumer)
sc.dst = ch
sc.partitionConsumer = partitionConsumer
return sc
}