-
Notifications
You must be signed in to change notification settings - Fork 93
/
worker.go
188 lines (165 loc) · 6.59 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package river
import (
"context"
"fmt"
"time"
"github.com/riverqueue/river/internal/workunit"
"github.com/riverqueue/river/rivertype"
)
// Worker is an interface that can perform a job with args of type T. A typical
// implementation will be a JSON-serializable `JobArgs` struct that implements
// `Kind()`, along with a Worker that embeds WorkerDefaults and implements `Work()`.
// Workers may optionally override other methods to provide job-specific
// configuration for all jobs of that type:
//
// type SleepArgs struct {
// Duration time.Duration `json:"duration"`
// }
//
// func (SleepArgs) Kind() string { return "sleep" }
//
// type SleepWorker struct {
// WorkerDefaults[SleepArgs]
// }
//
// func (w *SleepWorker) Work(ctx context.Context, job *Job[SleepArgs]) error {
// select {
// case <-ctx.Done():
// return ctx.Err()
// case <-time.After(job.Args.Duration):
// return nil
// }
// }
//
// In addition to fulfilling the Worker interface, workers must be registered
// with the client using the AddWorker function.
type Worker[T JobArgs] interface {
// Middleware returns the type-specific middleware for this job.
Middleware(job *Job[T]) []rivertype.WorkerMiddleware
// NextRetry calculates when the next retry for a failed job should take
// place given when it was last attempted and its number of attempts, or any
// other of the job's properties a user-configured retry policy might want
// to consider.
//
// Note that this method on a worker overrides any client-level retry policy.
// To use the client-level retry policy, return an empty `time.Time{}` or
// include WorkerDefaults to do this for you.
NextRetry(job *Job[T]) time.Time
// Timeout is the maximum amount of time the job is allowed to run before
// its context is cancelled. A timeout of zero (the default) means the job
// will inherit the Client-level timeout. A timeout of -1 means the job's
// context will never time out.
Timeout(job *Job[T]) time.Duration
// Work performs the job and returns an error if the job failed. The context
// will be configured with a timeout according to the worker settings and may
// be cancelled for other reasons.
//
// If no error is returned, the job is assumed to have succeeded and will be
// marked completed.
//
// It is important for any worker to respect context cancellation to enable
// the client to respond to shutdown requests; there is no way to cancel a
// running job that does not respect context cancellation, other than
// terminating the process.
Work(ctx context.Context, job *Job[T]) error
}
// WorkerDefaults is an empty struct that can be embedded in your worker
// struct to make it fulfill the Worker interface with default values.
type WorkerDefaults[T JobArgs] struct{}
func (w WorkerDefaults[T]) Middleware(*Job[T]) []rivertype.WorkerMiddleware { return nil }
// NextRetry returns an empty time.Time{} to avoid setting any job or
// Worker-specific overrides on the next retry time. This means that the
// Client-level retry policy schedule will be used instead.
func (w WorkerDefaults[T]) NextRetry(*Job[T]) time.Time { return time.Time{} }
// Timeout returns the job-specific timeout. Override this method to set a
// job-specific timeout, otherwise the Client-level timeout will be applied.
func (w WorkerDefaults[T]) Timeout(*Job[T]) time.Duration { return 0 }
// AddWorker registers a Worker on the provided Workers bundle. Each Worker must
// be registered so that the Client knows it should handle a specific kind of
// job (as returned by its `Kind()` method).
//
// Use by explicitly specifying a JobArgs type and then passing an instance of a
// worker for the same type:
//
// river.AddWorker(workers, &SortWorker{})
//
// Note that AddWorker can panic in some situations, such as if the worker is
// already registered or if its configuration is otherwise invalid. This default
// probably makes sense for most applications because you wouldn't want to start
// an application with invalid hardcoded runtime configuration. If you want to
// avoid panics, use AddWorkerSafely instead.
func AddWorker[T JobArgs](workers *Workers, worker Worker[T]) {
if err := AddWorkerSafely[T](workers, worker); err != nil {
panic(err)
}
}
// AddWorkerSafely registers a worker on the provided Workers bundle. Unlike AddWorker,
// AddWorkerSafely does not panic and instead returns an error if the worker
// is already registered or if its configuration is invalid.
//
// Use by explicitly specifying a JobArgs type and then passing an instance of a
// worker for the same type:
//
// river.AddWorkerSafely[SortArgs](workers, &SortWorker{}).
func AddWorkerSafely[T JobArgs](workers *Workers, worker Worker[T]) error {
var jobArgs T
return workers.add(jobArgs, &workUnitFactoryWrapper[T]{worker: worker})
}
// Workers is a list of available job workers. A Worker must be registered for
// each type of Job to be handled.
//
// Use the top-level AddWorker function combined with a Workers to register a
// worker.
type Workers struct {
workersMap map[string]workerInfo // job kind -> worker info
}
// workerInfo bundles information about a registered worker for later lookup
// in a Workers bundle.
type workerInfo struct {
jobArgs JobArgs
workUnitFactory workunit.WorkUnitFactory
}
// NewWorkers initializes a new registry of available job workers.
//
// Use the top-level AddWorker function combined with a Workers registry to
// register each available worker.
func NewWorkers() *Workers {
return &Workers{
workersMap: make(map[string]workerInfo),
}
}
func (w Workers) add(jobArgs JobArgs, workUnitFactory workunit.WorkUnitFactory) error {
kind := jobArgs.Kind()
if _, ok := w.workersMap[kind]; ok {
return fmt.Errorf("worker for kind %q is already registered", kind)
}
w.workersMap[kind] = workerInfo{
jobArgs: jobArgs,
workUnitFactory: workUnitFactory,
}
return nil
}
// workFunc implements JobArgs and is used to wrap a function given to WorkFunc.
type workFunc[T JobArgs] struct {
WorkerDefaults[T]
kind string
f func(context.Context, *Job[T]) error
}
func (wf *workFunc[T]) Kind() string {
return wf.kind
}
func (wf *workFunc[T]) Work(ctx context.Context, job *Job[T]) error {
return wf.f(ctx, job)
}
// WorkFunc wraps a function to implement the Worker interface. A job args
// struct implementing JobArgs will still be required to specify a Kind.
//
// For example:
//
// river.AddWorker(workers, river.WorkFunc(func(ctx context.Context, job *river.Job[WorkFuncArgs]) error {
// fmt.Printf("Message: %s", job.Args.Message)
// return nil
// }))
func WorkFunc[T JobArgs](f func(context.Context, *Job[T]) error) Worker[T] {
return &workFunc[T]{f: f, kind: (*new(T)).Kind()}
}