@@ -73,8 +73,8 @@ type scheduler struct {
73
73
latestConfig configID
74
74
latestMutex sync.RWMutex
75
75
76
- done chan struct {}
77
- terminated chan struct {}
76
+ stop chan struct {}
77
+ done chan struct {}
78
78
}
79
79
80
80
// newScheduler makes a new scheduler.
@@ -86,15 +86,15 @@ func newScheduler(configsAPI configsAPI, evaluationInterval, pollInterval time.D
86
86
q : NewSchedulingQueue (clockwork .NewRealClock ()),
87
87
cfgs : map [string ]cortexConfig {},
88
88
89
- done : make (chan struct {}),
90
- terminated : make (chan struct {}),
89
+ stop : make (chan struct {}),
90
+ done : make (chan struct {}),
91
91
}
92
92
}
93
93
94
94
// Run polls the source of configurations for changes.
95
95
func (s * scheduler ) Run () {
96
96
log .Debugf ("Scheduler started" )
97
- defer close (s .terminated )
97
+ defer close (s .done )
98
98
// Load initial set of all configurations before polling for new ones.
99
99
s .addNewConfigs (time .Now (), s .loadAllConfigs ())
100
100
ticker := time .NewTicker (s .pollInterval )
@@ -105,16 +105,17 @@ func (s *scheduler) Run() {
105
105
if err != nil {
106
106
log .Warnf ("Scheduler: error updating configs: %v" , err )
107
107
}
108
- case <- s .done :
108
+ case <- s .stop :
109
109
ticker .Stop ()
110
+ return
110
111
}
111
112
}
112
113
}
113
114
114
115
func (s * scheduler ) Stop () {
115
- close (s .done )
116
+ close (s .stop )
116
117
s .q .Close ()
117
- <- s .terminated
118
+ <- s .done
118
119
log .Debugf ("Scheduler stopped" )
119
120
}
120
121
@@ -186,6 +187,7 @@ func (s *scheduler) addNewConfigs(now time.Time, cfgs map[string]cortexConfigVie
186
187
}
187
188
188
189
func (s * scheduler ) addWorkItem (i workItem ) {
190
+ // The queue is keyed by user ID, so items for existing user IDs will be replaced.
189
191
s .q .Enqueue (i )
190
192
log .Debugf ("Scheduler: work item added: %v" , i )
191
193
}
0 commit comments