-
Notifications
You must be signed in to change notification settings - Fork 15
/
batcher.go
152 lines (139 loc) · 4.38 KB
/
batcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package nakadi
import (
"reflect"
"time"
)
// publishAPI defines interface that is used for publishing. Used because of unit tests
type publishAPI interface {
Publish(events interface{}) error
}
// BatchPublishAPI allows publishing of events in a batched manner. The batcher collects single events into batches,
// respecting batch collection timeout and max batch size. Instead of creating many separate requests to nakadi it will
// aggregate single events and publish them in batches.
type BatchPublishAPI struct {
publishAPI publishAPI
batchCollectionTimeout time.Duration
maxBatchSize int
eventsChannel chan *eventToPublish
dispatchFinished chan int
}
// BatchOptions specifies parameters that should be used to collect events to batches
type BatchOptions struct {
// Maximum amount of time that event will spend in intermediate queue before being published.
BatchCollectionTimeout time.Duration
// Maximum batch size - it is guaranteed that not more than MaxBatchSize events will be sent within one batch
MaxBatchSize int
// Size of the intermediate queue in which events are stored before being published.
// If the queue is full, publishing call will be blocked, waiting for batch to be assembled
BatchQueueSize int
}
func (o *BatchOptions) withDefaults() *BatchOptions {
var copyOptions BatchOptions
if o != nil {
copyOptions = *o
}
if copyOptions.BatchCollectionTimeout == 0 {
copyOptions.BatchCollectionTimeout = time.Second
}
if copyOptions.MaxBatchSize == 0 {
copyOptions.MaxBatchSize = 10
}
if copyOptions.BatchQueueSize == 0 {
copyOptions.BatchQueueSize = 1000
}
return ©Options
}
// NewBatchPublishAPI creates a proxy for batching from a client, publishOptions and batchOptions.
func NewBatchPublishAPI(
client *Client,
eventType string,
publishOptions *PublishOptions,
batchOptions *BatchOptions,
) *BatchPublishAPI {
publishOptions = publishOptions.withDefaults()
api := NewPublishAPI(client, eventType, publishOptions)
batchOptions = batchOptions.withDefaults()
result := BatchPublishAPI{
publishAPI: api,
batchCollectionTimeout: batchOptions.BatchCollectionTimeout,
maxBatchSize: batchOptions.MaxBatchSize,
eventsChannel: make(chan *eventToPublish, batchOptions.BatchQueueSize),
dispatchFinished: make(chan int),
}
go result.dispatchThread()
return &result
}
// Publish will publish requested data through PublishApi. In case if it is a single event (not a slice), it will be
// added to a batch and published as a part of a batch.
func (p *BatchPublishAPI) Publish(event interface{}) error {
if reflect.TypeOf(event).Kind() == reflect.Slice {
return p.publishAPI.Publish(event)
}
eventProxy := eventToPublish{
requestedAt: time.Now(),
event: event,
publishResult: make(chan error, 1),
}
defer close(eventProxy.publishResult)
p.eventsChannel <- &eventProxy
return <-eventProxy.publishResult
}
type eventToPublish struct {
requestedAt time.Time
event interface{}
publishResult chan error
}
// Close stops batching goroutine and waits for it to confirm stop process
func (p *BatchPublishAPI) Close() {
close(p.eventsChannel)
<-p.dispatchFinished
close(p.dispatchFinished)
}
func (p *BatchPublishAPI) publishBatchToNakadi(events []*eventToPublish) {
itemsToPublish := make([]interface{}, len(events))
for idx, evt := range events {
itemsToPublish[idx] = evt.event
}
err := p.publishAPI.Publish(itemsToPublish)
for _, evt := range events {
evt.publishResult <- err
}
}
func (p *BatchPublishAPI) dispatchThread() {
defer func() { p.dispatchFinished <- 1 }()
batch := make([]*eventToPublish, 0, 1)
var finishBatchCollectionAt *time.Time
flush := func() {
if len(batch) > 0 {
p.publishBatchToNakadi(batch)
batch = make([]*eventToPublish, 0, 1)
}
finishBatchCollectionAt = nil
}
for {
if finishBatchCollectionAt == nil {
event, ok := <-p.eventsChannel
if !ok {
break
}
batch = append(batch, event)
finishAt := event.requestedAt.Add(p.batchCollectionTimeout)
finishBatchCollectionAt = &finishAt
} else {
if len(batch) >= p.maxBatchSize || time.Now().After(*finishBatchCollectionAt) {
flush()
} else {
select {
case <-time.After(time.Until(*finishBatchCollectionAt)):
flush()
case evt, ok := <-p.eventsChannel:
if ok {
batch = append(batch, evt)
break
}
flush()
}
}
}
}
}