-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler.go
146 lines (129 loc) · 2.8 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package scheduler
import (
"time"
)
type TickInfo struct {
TickCount int64
TickNow int64
TickDelta int64
}
// scheduler is a tick based scheduler.
// A tick is a unit of time that is used to schedule jobs.
// The tick rate is the time between two ticks.
// The tick rate is set when creating a new scheduler.
// The scheduler is started by calling the Run method.
// The scheduler will run until the context is canceled.
// A Job is a function that is executed at a specific time.
// A Job execution will block the scheduler. So make sure that the job is fast.
// The schedule is designed to run in millisecond resolution.
type scheduler struct {
done chan bool
jobs JobArray
tickRate time.Duration
isRunning bool
onTick func(t, dt int64)
onJobAdd func(j *Job)
onJobRm func(j *Job)
}
// New creates a new scheduler.
func New(tickRate time.Duration) *scheduler {
return &scheduler{
tickRate: tickRate,
done: make(chan bool),
}
}
// RunAsync starts the scheduler in a new goroutine.
func (s *scheduler) RunAsync() {
s.isRunning = true
go s.loop()
}
// Run starts the scheduler.
func (s *scheduler) Run() {
s.isRunning = true
s.loop()
}
// IsRunning returns true if the scheduler is running.
func (s *scheduler) IsRunning() bool {
return s.isRunning
}
func (s *scheduler) loop() {
ticker := time.NewTicker(s.tickRate)
defer func() {
s.isRunning = false
ticker.Stop()
}()
var timeNow int64
var tickDelta int64
timeStart := time.Now().UnixMilli()
timeLast := timeStart
var tickNow int64
for {
select {
case <-ticker.C:
timeNow = time.Now().UnixMilli()
tickDelta = timeNow - timeLast
tickNow = timeNow - timeStart
s.doTick(tickNow, tickDelta)
timeLast = timeNow
case <-s.done:
return
}
}
}
// OnTick is called on every tick.
func (s *scheduler) OnTick(f func(t, dt int64)) {
s.onTick = f
}
func (s *scheduler) doTick(t, dt int64) {
if s.onTick != nil {
s.onTick(t, dt)
}
for _, j := range s.jobs {
if !j.Valid() {
if s.onJobRm != nil {
s.onJobRm(j)
}
s.jobs.Remove(j)
continue
}
if j.Tick(t) {
j.Run(t)
}
if j.Finished() {
if s.onJobRm != nil {
s.onJobRm(j)
}
s.jobs.Remove(j)
}
}
}
// CreateJob creates a new job.
func (s *scheduler) CreateJob() *Job {
j := NewJob()
s.jobs.Append(j)
if s.onJobAdd != nil {
s.onJobAdd(j)
}
return j
}
// Jobs returns the job collection.
func (s *scheduler) Jobs() *JobArray {
return &s.jobs
}
// OnJobAdd is called when a job is added.
func (s *scheduler) OnJobAdd(f func(j *Job)) {
s.onJobAdd = f
}
// OnJobRemove is called when a job is removed.
func (s *scheduler) OnJobRemove(f func(j *Job)) {
s.onJobRm = f
}
// Clear removes all jobs.
func (s *scheduler) Clear() {
s.Stop()
s.jobs.Clear()
}
// Stop stops the scheduler.
func (s *scheduler) Stop() {
s.done <- true
}