-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstream_producer.go
62 lines (49 loc) · 1.67 KB
/
stream_producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package epee
import (
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
)
// StreamProducers are used to publish protocol buffer messages to a Kafka
// stream on the topic of your choice. By default, epee uses a Hash partitioner
// for messages. This means that all messages with the same key will end up in
// the same partition in Kafka. This is the primary mechanism by which epee
// guarantees worker affinity for messages.
type StreamProducer struct {
client sarama.Client
producer sarama.SyncProducer
}
// Serializes a protocol buffer message to a byte string and publishes it to
// the given topic with the given key. Hash partitioner is used during
// publishing to determine the partition the message should go in based on the
// key supplied.
func (sp *StreamProducer) Publish(topic, key string, message proto.Message) error {
m := sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key),
Value: newProtobufMessageEncoder(message),
}
_, _, err := sp.producer.SendMessage(&m)
return err
}
// Creates a new StreamProducer. Looks up the brokers' addresses in Zookeeper
// to figure out where to connect to. If the brokers aren't found in zookeeper,
// this is considered an error.
func NewStreamProducer(clientID string, zk ZookeeperClient) (*StreamProducer, error) {
brokers, err := findRegisteredBrokers(zk)
if err != nil {
return nil, err
}
client, err := sarama.NewClient(brokers, getConfig(clientID))
if err != nil {
return nil, err
}
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
client.Close()
return nil, err
}
sp := new(StreamProducer)
sp.client = client
sp.producer = producer
return sp, nil
}