-
Notifications
You must be signed in to change notification settings - Fork 48
/
Copy pathasync.go
112 lines (93 loc) · 2.41 KB
/
async.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package sparta
import (
"sync"
)
// workResult is the result from a worker task
type workResult interface {
Result() interface{}
Error() error
}
// taskResult is a convenience type for a task poll return value
type taskResult struct {
result interface{}
err error
}
func (tr *taskResult) Result() interface{} {
return tr.result
}
func (tr *taskResult) Error() error {
return tr.err
}
func newTaskResult(taskValue interface{}, err error) workResult {
return &taskResult{
result: taskValue,
err: err,
}
}
type taskFunc func() workResult
// workTask encapsulates a work item that should go in a work pool.
type workTask struct {
// Result is the result of the work action
Result workResult
task taskFunc
}
// Run runs a Task and does appropriate accounting via a given sync.WorkGroup.
func (t *workTask) Run(wg *sync.WaitGroup) {
t.Result = t.task()
wg.Done()
}
// newWorkTask initializes a new task based on a given work function.
func newWorkTask(f taskFunc) *workTask {
return &workTask{task: f}
}
// workerPool is a worker group that runs a number of tasks at a configured
// concurrency.
type workerPool struct {
Tasks []*workTask
concurrency int
tasksChan chan *workTask
wg sync.WaitGroup
}
// newWorkerPool initializes a new pool with the given tasks and at the given
// concurrency.
func newWorkerPool(tasks []*workTask, concurrency int) *workerPool {
return &workerPool{
Tasks: tasks,
concurrency: concurrency,
tasksChan: make(chan *workTask),
}
}
// HasErrors indicates whether there were any errors from tasks run. Its result
// is only meaningful after Run has been called.
func (p *workerPool) workResults() ([]interface{}, []error) {
result := []interface{}{}
errors := []error{}
for _, eachResult := range p.Tasks {
if eachResult.Result.Error() != nil {
errors = append(errors, eachResult.Result.Error())
} else {
result = append(result, eachResult.Result.Result())
}
}
return result, errors
}
// Run runs all work within the pool and blocks until it's finished.
func (p *workerPool) Run() ([]interface{}, []error) {
for i := 0; i < p.concurrency; i++ {
go p.work()
}
p.wg.Add(len(p.Tasks))
for _, task := range p.Tasks {
p.tasksChan <- task
}
// all workers return
close(p.tasksChan)
p.wg.Wait()
return p.workResults()
}
// The work loop for any single goroutine.
func (p *workerPool) work() {
for task := range p.tasksChan {
task.Run(&p.wg)
}
}