-
Notifications
You must be signed in to change notification settings - Fork 0
/
options.go
310 lines (262 loc) · 8.52 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
package gomdb
import (
"errors"
"math"
"time"
)
var (
// ErrInvalidReadStreamVersion is returned when the stream version inside a
// read call is less than zero.
ErrInvalidReadStreamVersion = errors.New("stream version cannot be less than 0")
// ErrInvalidReadBatchSize is returned when the batch size inside a read
// call is less than one.
ErrInvalidReadBatchSize = errors.New("batch size must be greater than 0")
// ErrInvalidReadPosition is returned when the stream position inside a
// read call is less than zero.
ErrInvalidReadPosition = errors.New("stream position cannot be less than 0")
// ErrInvalidConsumerGroupMember is returned when the consumer group ID
// index is either less than zero or greater than or equal to the consumer
// group size.
ErrInvalidConsumerGroupMember = errors.New("consumer group member must be >= 0 < group size")
// ErrInvalidConsumerGroupSize is returned when the consumer group size is
// less that zero.
ErrInvalidConsumerGroupSize = errors.New("consumer group size must be 0 or greater (0 to disbale consumer groups)")
)
// ClientOption is an option for modifiying how the Message DB client operates.
type ClientOption func(*Client)
// WithDefaultPollingStrategy configures to use the specified PollingStrategy
// as the default for new subscriptions.
func WithDefaultPollingStrategy(strat func() PollingStrategy) ClientOption {
return func(c *Client) {
c.defaultPollingStrat = strat
}
}
// PollingStrategy returns the delay duration before the next polling attempt
// based on how many messages were returned from the previous poll vs how many
// were expected.
type PollingStrategy func(retrieved, expected int64) time.Duration
// ExpBackoffPolling returns an exponential polling backoff strategy that starts
// at the min duration but is multipled for every read that did not return
// any messages up to the max duration. The backoff duration is reset to min
// everytime a message is read.
func ExpBackoffPolling(min, max time.Duration, multiplier float64) func() PollingStrategy {
return func() PollingStrategy {
noMessageCount := 0
return func(retrieved, expected int64) time.Duration {
if retrieved == expected {
noMessageCount = 0
return time.Duration(0)
} else if retrieved > 0 {
noMessageCount = 0
return min
}
backoff := time.Duration(math.Pow(multiplier, float64(noMessageCount))) * min
noMessageCount++
if backoff > max {
return max
}
return backoff
}
}
}
// DynamicPolling returns a factory for a PollingStrategy that will dynamically
// adjust a subscription's polling delay by the step amount so as to hit a
// target read utilisation.
// Read utilisation is the number of messages retreived over the number of
// messages expoected.
func DynamicPolling(target float64, step, min, max time.Duration) func() PollingStrategy {
if target > 1 || target <= 0 {
panic("target percentage must be in the range: 0>n<=1")
}
return func() PollingStrategy {
var (
delay = min
actual = float64(0)
)
return func(retrieved, expected int64) time.Duration {
if retrieved == expected {
return time.Duration(0)
}
actual = float64(retrieved) / float64(expected)
// adjust appropriately to reach target
if actual < target {
delay += step
} else if actual > target {
delay -= step
}
// clamp between max/min
if delay > max {
delay = max
} else if delay < min {
delay = min
}
return delay
}
}
}
// ConstantPolling returns a constant interval polling strategy
func ConstantPolling(interval time.Duration) func() PollingStrategy {
return func() PollingStrategy {
return func(retrieved, expected int64) time.Duration {
if retrieved == expected {
return time.Duration(0)
}
return interval
}
}
}
// GetStreamOption is an option for modifiying how to read from a stream.
type GetStreamOption func(*streamConfig)
// FromVersion specifies the inclusive version from which to read messages.
func FromVersion(version int64) GetStreamOption {
return func(cfg *streamConfig) {
cfg.version = version
}
}
// WithStreamBatchSize specifies the batch size to read messages.
func WithStreamBatchSize(batchSize int64) GetStreamOption {
return func(cfg *streamConfig) {
cfg.batchSize = batchSize
}
}
// WithStreamCondition specifies an SQL condition to apply to the read request.
// For example: "messages.time::time >= current_time"
func WithStreamCondition(condition string) GetStreamOption {
return func(cfg *streamConfig) {
cfg.condition = condition
}
}
// WithStreamPollingStrategy sets the polling strategy for this stream
// subscription. Polling Strategies are only used in subscriptions.
func WithStreamPollingStrategy(strat PollingStrategy) GetStreamOption {
return func(cfg *streamConfig) {
cfg.pollingStrat = strat
}
}
type streamConfig struct {
version int64
batchSize int64
condition string
pollingStrat PollingStrategy
}
func (cfg *streamConfig) validate() error {
if cfg.version < 0 {
return ErrInvalidReadStreamVersion
} else if cfg.batchSize < 1 {
return ErrInvalidReadBatchSize
}
return nil
}
func (cfg *streamConfig) getCondition() interface{} {
if cfg.condition == "" {
return nil
}
return cfg.condition
}
func newDefaultStreamConfig(strat PollingStrategy) *streamConfig {
return &streamConfig{
version: 0,
batchSize: 1000,
pollingStrat: strat,
}
}
// GetCategoryOption is an option for modifiying how to read from a category.
type GetCategoryOption func(*categoryConfig)
// FromPosition specifies the inclusive global position from which to read
// messages.
func FromPosition(position int64) GetCategoryOption {
return func(cfg *categoryConfig) {
cfg.position = position
}
}
// WithCategoryBatchSize specifies the batch size to read messages.
func WithCategoryBatchSize(batchSize int64) GetCategoryOption {
return func(cfg *categoryConfig) {
cfg.batchSize = batchSize
}
}
// AsConsumerGroup specifies the consumer group options for this read. Size is
// used to specify the number of consumers, and member specifies which consumer
// is currently reading. Message-db used consistent hashing on stream names
// within a category and then distributes the streams amoungst the consumer
// group members.
func AsConsumerGroup(member, size int64) GetCategoryOption {
return func(cfg *categoryConfig) {
cfg.consumerGroupMember = member
cfg.consumerGroupSize = size
}
}
// WithCorrelation sets the correlation value that messages will be filtered by.
// correlation is compared against each messages medatadata
// correlationStreamName field.
func WithCorrelation(correlation string) GetCategoryOption {
return func(cfg *categoryConfig) {
cfg.correlation = correlation
}
}
// WithCategoryCondition specifies an SQL condition to apply to the read
// request. For example: "messages.time::time >= current_time"
func WithCategoryCondition(condition string) GetCategoryOption {
return func(cfg *categoryConfig) {
cfg.condition = condition
}
}
// WithCategoryPollingStrategy sets the polling strategy for this category
// subscription. Polling Strategies are only used in subscriptions.
func WithCategoryPollingStrategy(strat PollingStrategy) GetCategoryOption {
return func(cfg *categoryConfig) {
cfg.pollingStrat = strat
}
}
type categoryConfig struct {
position int64
batchSize int64
correlation string
consumerGroupMember int64
consumerGroupSize int64
condition string
pollingStrat PollingStrategy
}
func newDefaultCategoryConfig(strat PollingStrategy) *categoryConfig {
return &categoryConfig{
position: 0,
batchSize: 1000,
pollingStrat: strat,
}
}
func (cfg *categoryConfig) validate() error {
if cfg.position < 0 {
return ErrInvalidReadPosition
} else if cfg.batchSize < 1 {
return ErrInvalidReadBatchSize
} else if cfg.consumerGroupMember < 0 || (cfg.consumerGroupSize > 0 && cfg.consumerGroupMember >= cfg.consumerGroupSize) {
return ErrInvalidConsumerGroupMember
} else if cfg.consumerGroupSize < 0 {
return ErrInvalidConsumerGroupSize
}
return nil
}
func (cfg *categoryConfig) getConsumerGroupMember() interface{} {
if cfg.consumerGroupSize == 0 {
return nil
}
return cfg.consumerGroupMember
}
func (cfg *categoryConfig) getConsumerGroupSize() interface{} {
if cfg.consumerGroupSize == 0 {
return nil
}
return cfg.consumerGroupSize
}
func (cfg *categoryConfig) getCorrelation() interface{} {
if cfg.correlation == "" {
return nil
}
return cfg.correlation
}
func (cfg *categoryConfig) getCondition() interface{} {
if cfg.condition == "" {
return nil
}
return cfg.condition
}