-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathworkerpool.go
166 lines (148 loc) · 4.36 KB
/
workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// Copyright 2021 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package workerpool implements a concurrency limiting worker pool.
// Worker routines are spawned on demand as tasks are submitted.
package workerpool
import (
"errors"
"fmt"
"sync"
)
var (
// ErrDraining is returned when an operation is not possible because
// draining is in progress.
ErrDraining = errors.New("drain operation in progress")
// ErrClosed is returned when operations are attempted after a call to Close.
ErrClosed = errors.New("worker pool is closed")
)
// WorkerPool spawns, on demand, a number of worker routines to process
// submitted tasks concurrently. The number of concurrent routines never
// exceeds the specified limit.
type WorkerPool struct {
workers chan struct{}
tasks chan *task
results []*task
wg sync.WaitGroup
mu sync.Mutex
draining bool
closed bool
}
// New creates a new pool of workers where at most n workers process submitted
// tasks concurrently. New panic if n is no greater than 0.
func New(n int) *WorkerPool {
if n <= 0 {
panic(fmt.Sprintf("workerpool.New: n must be > 0, got %d", n))
}
wp := &WorkerPool{
workers: make(chan struct{}, n),
tasks: make(chan *task),
}
go wp.run()
return wp
}
// Cap returns the concurrent workers capacity, see New().
func (wp *WorkerPool) Cap() int {
return cap(wp.workers)
}
// Submit submits f for processing by a worker. The given id is useful for
// identifying the task once it is completed.
// Submit blocks until a routine start processing the task.
// If a drain operation is in progress, ErrDraining is returned and the task
// is not submitted for processing.
// If the worker pool is closed, ErrClosed is returned and the task is not
// submitted for processing.
func (wp *WorkerPool) Submit(id string, f func() error) error {
wp.mu.Lock()
if wp.closed {
wp.mu.Unlock()
return ErrClosed
}
if wp.draining {
wp.mu.Unlock()
return ErrDraining
}
wp.mu.Unlock()
wp.wg.Add(1)
wp.tasks <- &task{
id: id,
run: f,
}
return nil
}
// Drain waits until all tasks are completed. This operation prevents
// submitting new tasks to the worker pool. Drain returns the results of the
// tasks that have been processed.
// If a drain operation is already in progress, ErrDraining is returned.
// If the worker pool is closed, ErrClosed is returned and the task is not
// submitted for processing.
func (wp *WorkerPool) Drain() ([]Task, error) {
wp.mu.Lock()
if wp.closed {
wp.mu.Unlock()
return nil, ErrClosed
}
if wp.draining {
wp.mu.Unlock()
return nil, ErrDraining
}
wp.draining = true
wp.mu.Unlock()
wp.wg.Wait()
// It's not necessary to hold a lock when reading wp.results as no other
// routine is running at this point besides the "run" routine which should
// be waiting on the tasks channel.
res := make([]Task, len(wp.results))
for i, t := range wp.results {
res[i] = t
}
wp.results = nil
wp.mu.Lock()
wp.draining = false
wp.mu.Unlock()
return res, nil
}
// Close closes the worker pool, rendering it unable to process new tasks.
// It should be called after a call to Drain and the worker pool is no longer
// needed.
func (wp *WorkerPool) Close() error {
wp.mu.Lock()
if wp.closed {
wp.mu.Unlock()
return nil
}
wp.closed = true
wp.mu.Unlock()
wp.wg.Wait()
// At this point, all routines have returned. This means that Submit is not
// pending to write to the task channel and it is thus safe to close it.
close(wp.tasks)
// wait for the "run" routine
<-wp.workers
return nil
}
// run loops over the tasks channel and starts processing routines. It should
// only be called once during the lifetime of a WorkerPool.
func (wp *WorkerPool) run() {
for t := range wp.tasks {
t := t
wp.results = append(wp.results, t)
wp.workers <- struct{}{}
go func() {
defer wp.wg.Done()
t.err = t.run()
<-wp.workers
}()
}
close(wp.workers)
}