-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.go
54 lines (49 loc) · 1.13 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Package pool implements a generic worker pool with shared channels.
package pool
// Pool contains channels for the generic worker() to run at limited concurrency.
type Pool[I, O any] struct {
in chan I
results chan I
out chan O
worker func(I) I
}
// New creates the channels and kicks off the input producer and the output consumer.
func New[I, O any](
input func(chan<- I),
worker func(I) I,
output func(<-chan I) O,
) (p Pool[I, O]) {
p.in = make(chan I)
go func() {
input(p.in)
close(p.in)
}()
p.out = make(chan O, 1)
p.worker = worker
p.results = make(chan I)
go func() {
p.out <- output(p.results)
}()
return
}
// Wait runs workers concurrently and returns the result of output.
// If there's no worker, input is sent directly to output.
func (p Pool[I, O]) Wait(concurrency int) O {
if p.worker == nil {
p.results <- <-p.in
} else {
sem := make(chan bool, concurrency)
for i := range p.in {
sem <- true
go func(i I) {
p.results <- p.worker(i)
<-sem
}(i)
}
for ; concurrency > 0; concurrency-- {
sem <- true
}
}
close(p.results)
return <-p.out // return the result of output()
}