-
-
Notifications
You must be signed in to change notification settings - Fork 70
/
Copy pathpond.go
325 lines (266 loc) · 6.94 KB
/
pond.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
package pond
import (
"fmt"
"runtime/debug"
"sync"
"sync/atomic"
"time"
)
const (
defaultIdleTimeout = 5 * time.Second
)
func defaultPanicHandler(panic interface{}) {
fmt.Printf("Worker exits from a panic: %v\nStack trace: %s\n", panic, string(debug.Stack()))
}
func linearGrowthFn(workerCount, minWorkers, maxWorkers int) int {
if workerCount < minWorkers {
return minWorkers
}
if workerCount < maxWorkers {
return 1
}
return 0
}
// Option represents an option that can be passed when building a worker pool to customize it
type Option func(*WorkerPool)
// IdleTimeout allows to change the idle timeout for a worker pool
func IdleTimeout(idleTimeout time.Duration) Option {
return func(pool *WorkerPool) {
pool.idleTimeout = idleTimeout
}
}
// PanicHandler allows to change the panic handler function for a worker pool
func PanicHandler(panicHandler func(interface{})) Option {
return func(pool *WorkerPool) {
pool.panicHandler = panicHandler
}
}
// MinWorkers allows to change the minimum number of workers of a worker pool
func MinWorkers(minWorkers int) Option {
return func(pool *WorkerPool) {
pool.minWorkers = minWorkers
}
}
// WorkerPool models a pool of workers
type WorkerPool struct {
minWorkers int
maxWorkers int
maxCapacity int
idleTimeout time.Duration
workerCount int32
tasks chan func()
dispatchedTasks chan func()
purgerQuit chan struct{}
stopOnce sync.Once
waitGroup sync.WaitGroup
panicHandler func(interface{})
growthFn func(int, int, int) int
}
// New creates a worker pool with that can scale up to the given number of workers and capacity
func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {
pool := &WorkerPool{
maxWorkers: maxWorkers,
maxCapacity: maxCapacity,
idleTimeout: defaultIdleTimeout,
tasks: make(chan func(), maxCapacity),
dispatchedTasks: make(chan func(), maxWorkers),
purgerQuit: make(chan struct{}),
panicHandler: defaultPanicHandler,
growthFn: linearGrowthFn,
}
// Apply all options
for _, opt := range options {
opt(pool)
}
// Make sure options are consistent
if pool.maxWorkers <= 0 {
pool.maxWorkers = 1
}
if pool.minWorkers > pool.maxWorkers {
pool.minWorkers = pool.maxWorkers
}
// Start dispatcher goroutine
pool.waitGroup.Add(1)
go func() {
defer pool.waitGroup.Done()
pool.dispatch()
}()
// Start purger goroutine
pool.waitGroup.Add(1)
go func() {
defer pool.waitGroup.Done()
pool.purge()
}()
// Start minWorkers workers
if pool.minWorkers > 0 {
pool.startWorkers()
}
return pool
}
// Running returns the number of running workers
func (p *WorkerPool) Running() int {
return int(atomic.LoadInt32(&p.workerCount))
}
// Submit sends a task to this worker pool for execution. If the queue is full,
// it will wait until the task can be enqueued.
func (p *WorkerPool) Submit(task func()) {
if task == nil {
return
}
// Submit the task to the task channel
p.tasks <- task
}
// SubmitAndWait sends a task to this worker pool for execution and waits for it to complete
// before returning.
func (p *WorkerPool) SubmitAndWait(task func()) {
if task == nil {
return
}
done := make(chan struct{})
p.Submit(func() {
defer close(done)
task()
})
<-done
}
// Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit
func (p *WorkerPool) Stop() {
p.stop(false)
}
// StopAndWait causes this pool to stop accepting tasks, waiting for all tasks in the queue to complete
func (p *WorkerPool) StopAndWait() {
p.stop(true)
}
// dispatch represents the work done by the dispatcher goroutine
func (p *WorkerPool) dispatch() {
for task := range p.tasks {
if task == nil {
// Received the signal to exit gracefully
break
}
select {
// Attempt to submit the task to a worker without blocking
case p.dispatchedTasks <- task:
if p.Running() == 0 {
p.startWorkers()
}
default:
// Create a new worker if we haven't reached the limit yet
if p.Running() < p.maxWorkers {
p.startWorkers()
}
// Block until a worker accepts this task
p.dispatchedTasks <- task
}
}
// Send signal to stop all workers
close(p.dispatchedTasks)
// Send signal to stop the purger
close(p.purgerQuit)
}
// purge represents the work done by the purger goroutine
func (p *WorkerPool) purge() {
ticker := time.NewTicker(p.idleTimeout)
defer ticker.Stop()
for {
select {
// Timed out waiting for any activity to happen, attempt to stop an idle worker
case <-ticker.C:
if p.Running() > p.minWorkers {
select {
case p.dispatchedTasks <- nil:
default:
// If dispatchedTasks channel is full, no need to kill the worker
}
}
// Received the signal to exit
case <-p.purgerQuit:
return
}
}
}
func (p *WorkerPool) startWorkers() {
count := p.growthFn(p.Running(), p.minWorkers, p.maxWorkers)
if count == 0 {
return
}
// Increment worker count
atomic.AddInt32(&p.workerCount, int32(count))
// Increment waiting group semaphore
p.waitGroup.Add(count)
//go func() {
for i := 0; i < count; i++ {
worker(p.dispatchedTasks, func() {
// Decrement worker count
atomic.AddInt32(&p.workerCount, -1)
// Decrement waiting group semaphore
p.waitGroup.Done()
}, p.panicHandler)
}
//}()
}
// Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit
func (p *WorkerPool) stop(wait bool) {
p.stopOnce.Do(func() {
if wait {
// Make sure all queued tasks complete before stopping the dispatcher
p.tasks <- nil
// Close the tasks channel to prevent receiving new tasks
close(p.tasks)
// Wait for all goroutines to exit
p.waitGroup.Wait()
} else {
// Close the tasks channel to prevent receiving new tasks
close(p.tasks)
}
})
}
// Group creates a new task group
func (p *WorkerPool) Group() *TaskGroup {
return &TaskGroup{
pool: p,
}
}
// worker launches a worker goroutine
func worker(tasks chan func(), exitHandler func(), panicHandler func(interface{})) {
go func() {
defer func() {
if panic := recover(); panic != nil {
// Handle panic
panicHandler(panic)
// Restart goroutine
worker(tasks, exitHandler, panicHandler)
} else {
// Handle exit
exitHandler()
}
}()
for task := range tasks {
if task == nil {
// We have received a signal to quit
return
}
// We have received a task, execute it
task()
}
}()
}
// TaskGroup represents a group of related tasks
type TaskGroup struct {
pool *WorkerPool
waitGroup sync.WaitGroup
}
// Submit adds a task to this group and sends it to the worker pool to be executed.
func (g *TaskGroup) Submit(task func()) {
g.waitGroup.Add(1)
g.pool.Submit(func() {
defer g.waitGroup.Done()
task()
})
}
// Wait waits until all the tasks in this group have completed. It returns
// a slice with all (non-nil) errors returned by tasks in this group.
func (g *TaskGroup) Wait() {
// Wait for all tasks to complete
g.waitGroup.Wait()
}