@@ -200,24 +200,28 @@ func (dm *DiscoveryManager) feedEvent(ev *discovery.Event) {
200
200
dm .watchersMutex .Lock ()
201
201
defer dm .watchersMutex .Unlock ()
202
202
203
+ sendToAllWatchers := func (ev * discovery.Event ) {
204
+ // Send the event to all watchers
205
+ for watcher := range dm .watchers {
206
+ select {
207
+ case watcher .feed <- ev :
208
+ // OK
209
+ case <- time .After (time .Millisecond * 500 ):
210
+ // If the watcher is not able to process event fast enough
211
+ // remove the watcher from the list of watchers
212
+ logrus .Info ("Watcher is not able to process events fast enough, removing it from the list of watchers" )
213
+ delete (dm .watchers , watcher )
214
+ }
215
+ }
216
+ }
217
+
203
218
if ev .Type == "stop" {
204
219
// Remove all the cached events for the terminating discovery
205
220
delete (dm .watchersCache , ev .DiscoveryID )
206
221
return
207
222
}
208
223
209
- // Send the event to all watchers
210
- for watcher := range dm .watchers {
211
- select {
212
- case watcher .feed <- ev :
213
- // OK
214
- case <- time .After (time .Millisecond * 500 ):
215
- // If the watcher is not able to process event fast enough
216
- // remove the watcher from the list of watchers
217
- logrus .Info ("Watcher is not able to process events fast enough, removing it from the list of watchers" )
218
- delete (dm .watchers , watcher )
219
- }
220
- }
224
+ sendToAllWatchers (ev )
221
225
222
226
// Cache the event for the discovery
223
227
cache := dm .watchersCache [ev .DiscoveryID ]
0 commit comments