-
Notifications
You must be signed in to change notification settings - Fork 369
/
Copy pathtasks.go
138 lines (118 loc) · 3.56 KB
/
tasks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright 2018 The Cluster Monitoring Operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tasks
import (
"context"
"fmt"
"strings"
"golang.org/x/sync/errgroup"
"k8s.io/klog/v2"
"github.com/openshift/cluster-monitoring-operator/pkg/client"
)
// TaskRunner manages lists of task groups. Through the RunAll method task groups are
// executed, the groups sequentially, each group of tasks concurrently.
type TaskRunner struct {
client *client.Client
taskGroups []*TaskGroup
}
// NewTaskRunner returns a task runner. tasks is the first task group that will
// be executed, before any list added via AppendTaskGroup.
func NewTaskRunner(client *client.Client, taskGroups ...*TaskGroup) *TaskRunner {
return &TaskRunner{
client: client,
taskGroups: append([]*TaskGroup{}, taskGroups...),
}
}
// RunAll executes all registered task groups sequentially. For each group the
// taskGroup.RunConcurrently function is called.
func (tl *TaskRunner) RunAll(ctx context.Context) TaskGroupErrors {
for i, tGroup := range tl.taskGroups {
klog.V(2).Infof("processing task group %d of %d", i+1, len(tl.taskGroups))
tErrors := tGroup.RunConcurrently(ctx)
if len(tErrors) > 0 {
return tErrors
}
}
return nil
}
// RunConcurrently dispatches all tasks in a task group. The tasks are scheduled
// concurrently. Returns all the errors that are encountered.
func (tg *TaskGroup) RunConcurrently(ctx context.Context) TaskGroupErrors {
var g errgroup.Group
tgLength := len(tg.tasks)
errChan := make(chan TaskErr, tgLength)
for i, ts := range tg.tasks {
// shadow vars due to concurrency
ts := ts
i := i
g.Go(func() error {
klog.V(2).Infof("running task %d of %d: %v", i+1, tgLength, ts.Name)
err := ts.Task.Run(ctx)
if err != nil {
klog.Warningf("task %d of %d: %v failed: %v", i+1, tgLength, ts.Name, err)
errChan <- TaskErr{Err: err, Name: ts.Name}
} else {
klog.V(2).Infof("ran task %d of %d: %v", i+1, tgLength, ts.Name)
}
return nil
})
}
//nolint:errcheck
g.Wait()
// To be able to use the range function on the buffered channel
// the channel needs to closed. Otherwise, the range will keep waiting
// till the channel is closed. This is why defer is not used.
close(errChan)
var taskGroupErrors TaskGroupErrors
for tErr := range errChan {
taskGroupErrors = append(taskGroupErrors, tErr)
}
return taskGroupErrors
}
func NewTaskGroup(tasks []*TaskSpec) *TaskGroup {
return &TaskGroup{
tasks: tasks,
}
}
type TaskGroup struct {
tasks []*TaskSpec
}
func NewTaskSpec(name string, task Task) *TaskSpec {
return &TaskSpec{
Name: name,
Task: task,
}
}
type TaskSpec struct {
Name string
Task Task
}
type Task interface {
Run(ctx context.Context) error
}
type TaskErr struct {
Err error
Name string
}
type TaskGroupErrors []TaskErr
func (tge TaskGroupErrors) Error() string {
if len(tge) == 0 {
return ""
}
messages := make([]string, 0, len(tge))
for _, err := range tge {
messages = append(messages, fmt.Sprintf("%v: %v", strings.ToLower(err.Name), err.Err))
}
return strings.Join(messages, "\n")
}