This repository has been archived by the owner on Oct 2, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathconfig.go
192 lines (169 loc) · 6.52 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package kafka
import (
"crypto/tls"
"fmt"
"time"
"github.com/Shopify/sarama"
"go.uber.org/zap/zapcore"
)
const (
// OffsetOldest uses sequence number of oldest known message as the current offset
OffsetOldest = sarama.OffsetOldest
// OffsetNewest option uses sequence number of newest message as the current offset
OffsetNewest = sarama.OffsetNewest
)
type (
// Topic contains information for a topic.
// Our topics are uniquely defined by a Topic Name and Cluster pair.
Topic struct {
// Name for the topic
Name string
// Cluster is the logical name of the cluster to find this topic on.
Cluster string
// Delay is msg consumption delay applied on the topic.
Delay time.Duration
}
// ConsumerTopic contains information for a consumer topic.
ConsumerTopic struct {
Topic
RetryQ Topic
DLQ Topic
MaxRetries int64 // MaxRetries = -1 for infinite retries.
}
// ConsumerTopicList is a list of consumer topics
ConsumerTopicList []ConsumerTopic
// ConsumerConfig describes the config for a consumer group
ConsumerConfig struct {
// GroupName identifies your consumer group. Unless your application creates
// multiple consumer groups (in which case it's suggested to have application name as
// prefix of the group name), this should match your application name.
GroupName string
// TopicList is a list of consumer topics
TopicList ConsumerTopicList
// OffsetConfig is the offset-handling policy for this consumer group.
Offsets struct {
// Initial specifies the fallback offset configuration on consumer start.
// The consumer will use the offsets persisted from its last run unless \
// the offsets are too old or too new.
Initial struct {
// Offset is the initial offset to use if there is no previous offset
// committed. Use OffsetNewest for high watermark and OffsetOldest for
// low watermark. Defaults to OffsetOldest.
Offset int64
}
// Commits a policy for committing consumer offsets to Kafka.
Commits struct {
// Enabled if you want the library to commit offsets on your behalf.
// Defaults to true.
//
// The retry and dlq topic commit will always be committed for you since those topics are abstracted away from you.
Enabled bool
}
}
// Concurrency determines the number of concurrent messages to process.
// When using the handler based API, this corresponds to the number of concurrent go
// routines handler functions the library will run. Default is 1.
Concurrency int
// TLSConfig is the configuration to use for secure connections if
// enabled (not nil) (defaults to disabled, nil).
TLSConfig *tls.Config
}
)
// NewConsumerConfig returns ConsumerConfig with sane defaults.
func NewConsumerConfig(groupName string, topicList ConsumerTopicList) *ConsumerConfig {
cfg := new(ConsumerConfig)
cfg.GroupName = groupName
cfg.TopicList = topicList
cfg.Offsets.Initial.Offset = OffsetOldest
cfg.Offsets.Commits.Enabled = true
cfg.Concurrency = 1
return cfg
}
// MarshalLogObject implements zapcore.ObjectMarshaler for structured logging.
func (c ConsumerConfig) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddString("groupName", c.GroupName)
e.AddArray("topicList", c.TopicList)
e.AddObject("offset", zapcore.ObjectMarshalerFunc(func(ee zapcore.ObjectEncoder) error {
ee.AddObject("initial", zapcore.ObjectMarshalerFunc(func(eee zapcore.ObjectEncoder) error {
eee.AddInt64("offset", c.Offsets.Initial.Offset)
return nil
}))
ee.AddObject("commits", zapcore.ObjectMarshalerFunc(func(eee zapcore.ObjectEncoder) error {
eee.AddBool("enabled", c.Offsets.Commits.Enabled)
return nil
}))
return nil
}))
e.AddInt("concurrency", c.Concurrency)
return nil
}
// MarshalLogArray implements zapcore.ArrayMarshaler for structured logging.
func (c ConsumerTopicList) MarshalLogArray(e zapcore.ArrayEncoder) error {
for _, topic := range c {
e.AppendObject(topic)
}
return nil
}
// MarshalLogObject implements zapcore.ObjectMarshaler for structured logging.
func (c ConsumerTopic) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddObject("defaultQ", c.Topic)
e.AddObject("retryQ", c.RetryQ)
e.AddObject("DLQ", c.DLQ)
e.AddInt64("maxRetries", c.MaxRetries)
return nil
}
// MarshalLogObject implements zapcore.ObjectMarshaler for structured logging.
func (t Topic) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddString("name", t.Name)
e.AddString("cluster", t.Cluster)
e.AddDuration("delay", t.Delay)
return nil
}
// TopicNames returns the list of topics to consume as a string array.
func (c ConsumerTopicList) TopicNames() []string {
output := make([]string, 0, len(c))
for _, topic := range c {
output = append(output, topic.Name)
}
return output
}
// GetConsumerTopicByClusterTopic returns the ConsumerTopic for the cluster, topic pair.
func (c ConsumerTopicList) GetConsumerTopicByClusterTopic(clusterName, topicName string) (ConsumerTopic, error) {
for _, topic := range c {
if topic.Cluster == clusterName && topic.Name == topicName {
return topic, nil
}
}
return ConsumerTopic{}, fmt.Errorf("unable to find TopicConfig with cluster %s and topic %s", clusterName, topicName)
}
// HashKey converts topic to a string for use as a map key
func (t Topic) HashKey() string {
output := t.Name + t.Cluster
return output
}
// DLQEnabled returns true if DLQ.Name and DLQ.Cluster are not empty.
func (c ConsumerTopic) DLQEnabled() bool {
if c.DLQ.Cluster != "" && c.DLQ.Name != "" {
return true
}
return false
}