-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
84 lines (65 loc) · 2.13 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package main
import (
"flag"
"fmt"
"github.com/bradhe/epee"
"github.com/golang/protobuf/proto"
)
const (
// This is the topic that the stream processor will listen to.
TopicName = "my-topic"
// This is the partition that the processor will listen on.
Partition = 1
// NOTE: This must be unique to the topic and partition the stream processor is
// going to consume.
DefaultClientID = "my-client-1"
)
var (
stream *epee.Stream
// Parameterize where Zookeeper lives.
ZookeeperHost = flag.String("zookeeper-host", "localhost:2181", "zookeeper host")
)
// This type encapsulates the stream processor and will implement the
// epee.StreamProcessor interface.
type MyStreamProcessor struct {
Total int64
}
// The process method is called once for each message in the queue. If the message
// is successfully processed the related offset will be marked as "processed" so
// that when clients resume later this message doesn't get re-processed.
func (sp *MyStreamProcessor) Process(offset int64, message proto.Message) error {
counter, ok := message.(*MyCounter)
if !ok {
return fmt.Errorf("failed to convert message to application-native type")
}
sp.Total += counter.GetCount()
return nil
}
// The flush method will be periodically called (once every 10 seconds by
// default). This method is used to flush the processor's state so the jobs can
// be resumed if something goes wrong.
func (sp *MyStreamProcessor) Flush() error {
// TODO: Flush the total to something here.
return nil
}
func init() {
// Parse CLI flags.
flag.Parse()
zk, err := epee.NewZookeeperClient([]string{*ZookeeperHost})
if err != nil {
panic(err)
}
// Assuming your Kafka brokers are registered in Zookeeper...
stream, err = epee.NewStreamFromZookeeper(DefaultClientID, zk)
if err != nil {
panic(err)
}
// This tells the stream how to deserialize the message in Kafka.
stream.Register(TopicName, &MyCounter{})
}
func main() {
stream.Stream(TopicName, Partition, &MyStreamProcessor{})
// The stream processor is now running in a goroutine in the background. The
// main thread can continue doing whatever, or we can just sit here and wait.
stream.Wait()
}