-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathscheduler.go
232 lines (204 loc) · 5.49 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
package services
import (
"errors"
"fmt"
"sync"
"time"
"github.com/mrwonko/cron"
"github.com/smartcontractkit/chainlink/logger"
"github.com/smartcontractkit/chainlink/store"
"github.com/smartcontractkit/chainlink/store/models"
)
// Scheduler contains fields for Recurring and OneTime for occurrences,
// a pointer to the store and a started field to indicate if the Scheduler
// has started or not.
type Scheduler struct {
Recurring *Recurring
OneTime *OneTime
store *store.Store
startedMutex sync.RWMutex
started bool
}
// NewScheduler initializes the Scheduler instances with both Recurring
// and OneTime fields since jobs can contain tasks which utilize both.
func NewScheduler(store *store.Store) *Scheduler {
return &Scheduler{
Recurring: NewRecurring(store),
OneTime: &OneTime{
Store: store,
Clock: store.Clock,
},
store: store,
}
}
// Start checks to ensure the Scheduler has not already started,
// calls the Start function for both Recurring and OneTime types,
// sets the started field to true, and adds jobs relevant to its
// initiator ("cron" and "runat").
func (s *Scheduler) Start() error {
s.startedMutex.Lock()
defer s.startedMutex.Unlock()
if s.started {
return errors.New("Scheduler already started")
}
if err := s.OneTime.Start(); err != nil {
return err
}
if err := s.Recurring.Start(); err != nil {
return err
}
s.started = true
jobs, err := s.store.Jobs()
if err != nil {
return fmt.Errorf("Scheduler: %v", err)
}
for _, job := range jobs {
s.addJob(job)
}
return nil
}
// Stop is the governing function for both Recurring and OneTime
// Stop function. Sets the started field to false.
func (s *Scheduler) Stop() {
s.startedMutex.Lock()
defer s.startedMutex.Unlock()
if s.started {
s.Recurring.Stop()
s.OneTime.Stop()
s.started = false
}
}
func (s *Scheduler) addJob(job models.JobSpec) {
s.Recurring.AddJob(job)
s.OneTime.AddJob(job)
}
// AddJob is the governing function for Recurring and OneTime,
// and will only execute if the Scheduler has not already started.
func (s *Scheduler) AddJob(job models.JobSpec) {
s.startedMutex.RLock()
defer s.startedMutex.RUnlock()
if !s.started {
return
}
s.addJob(job)
}
// Recurring is used for runs that need to execute on a schedule,
// and is configured with cron.
// Instances of Recurring must be initialized using NewRecurring().
type Recurring struct {
Cron Cron
Clock Nower
store *store.Store
}
// NewRecurring create a new instance of Recurring, ready to use.
func NewRecurring(store *store.Store) *Recurring {
return &Recurring{
store: store,
Clock: store.Clock,
}
}
// Start for Recurring types executes tasks with a "cron" initiator
// based on the configured schedule for the run.
func (r *Recurring) Start() error {
r.Cron = newChainlinkCron()
r.Cron.Start()
return nil
}
// Stop stops the cron scheduler and waits for running jobs to finish.
func (r *Recurring) Stop() {
r.Cron.Stop()
}
// AddJob looks for "cron" initiators, adds them to cron's schedule
// for execution when specified.
func (r *Recurring) AddJob(job models.JobSpec) {
for _, i := range job.InitiatorsFor(models.InitiatorCron) {
initr := i
if !job.Ended(r.Clock.Now()) {
r.Cron.AddFunc(string(initr.Schedule), func() {
_, err := BeginRun(job, initr, models.RunResult{}, r.store)
if err != nil && !expectedRecurringError(err) {
logger.Error(err.Error())
}
})
}
}
}
// OneTime represents runs that are to be executed only once.
type OneTime struct {
Store *store.Store
Clock Afterer
done chan struct{}
}
// Start allocates a channel for the "done" field with an empty struct.
func (ot *OneTime) Start() error {
ot.done = make(chan struct{})
return nil
}
// AddJob runs the job at the time specified for the "runat" initiator.
func (ot *OneTime) AddJob(job models.JobSpec) {
for _, initr := range job.InitiatorsFor(models.InitiatorRunAt) {
go ot.RunJobAt(initr, job)
}
}
// Stop closes the "done" field's channel.
func (ot *OneTime) Stop() {
close(ot.done)
}
// RunJobAt wait until the Stop() function has been called on the run
// or the specified time for the run is after the present time.
func (ot *OneTime) RunJobAt(initr models.Initiator, job models.JobSpec) {
select {
case <-ot.done:
case <-ot.Clock.After(initr.Time.DurationFromNow()):
if err := ot.Store.MarkRan(&initr); err != nil {
logger.Error(err.Error())
return
}
jr, err := BeginRun(job, initr, models.RunResult{}, ot.Store)
if err != nil {
logger.Error(err.Error())
}
if jr.Status == models.RunStatusUnstarted {
initr.Ran = false
if err := ot.Store.Save(&initr); err != nil {
logger.Error(err.Error())
}
}
}
}
func expectedRecurringError(err error) bool {
switch err.(type) {
case JobRunnerError:
return true
default:
return false
}
}
// Cron is an interface for scheduling recurring functions to run.
// Cron's schedule format is similar to the standard cron format
// but with an extra field at the beginning for seconds.
type Cron interface {
Start()
Stop()
AddFunc(string, func()) error
}
type chainlinkCron struct {
*cron.Cron
}
func newChainlinkCron() *chainlinkCron {
return &chainlinkCron{cron.New()}
}
func (cc *chainlinkCron) Stop() {
cc.Cron.Stop()
cc.Cron.Wait()
}
// Nower is an interface that fulfills the Now method,
// following the behavior of time.Now.
type Nower interface {
Now() time.Time
}
// Afterer is an interface that fulfills the After method,
// following the behavior of time.After.
type Afterer interface {
After(d time.Duration) <-chan time.Time
}