Skip to content

Commit b6e1fd0

Browse files
committed
fix: add more graceful shutdown
Signed-off-by: Timur Tuktamyshev <timur.tuktamyshev@flant.com>
1 parent b752779 commit b6e1fd0

File tree

6 files changed

+104
-11
lines changed

6 files changed

+104
-11
lines changed

pkg/kube_events_manager/factory.go

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,38 +36,44 @@ type Factory struct {
3636
handlerRegistrations map[string]cache.ResourceEventHandlerRegistration
3737
ctx context.Context
3838
cancel context.CancelFunc
39+
// done is closed when the underlying informer.Run returns
40+
done chan struct{}
3941
}
4042

4143
type FactoryStore struct {
4244
mu sync.Mutex
43-
data map[FactoryIndex]Factory
45+
cond *sync.Cond
46+
data map[FactoryIndex]*Factory
4447
}
4548

4649
func NewFactoryStore() *FactoryStore {
47-
return &FactoryStore{
48-
data: make(map[FactoryIndex]Factory),
50+
fs := &FactoryStore{
51+
data: make(map[FactoryIndex]*Factory),
4952
}
53+
fs.cond = sync.NewCond(&fs.mu)
54+
return fs
5055
}
5156

5257
func (c *FactoryStore) Reset() {
5358
c.mu.Lock()
5459
defer c.mu.Unlock()
55-
c.data = make(map[FactoryIndex]Factory)
60+
c.data = make(map[FactoryIndex]*Factory)
5661
}
5762

5863
func (c *FactoryStore) add(index FactoryIndex, f dynamicinformer.DynamicSharedInformerFactory) {
5964
ctx, cancel := context.WithCancel(context.Background())
60-
c.data[index] = Factory{
65+
c.data[index] = &Factory{
6166
shared: f,
6267
handlerRegistrations: make(map[string]cache.ResourceEventHandlerRegistration),
6368
ctx: ctx,
6469
cancel: cancel,
70+
done: nil,
6571
}
6672
log.Debug("Factory store: added a new factory for index",
6773
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
6874
}
6975

70-
func (c *FactoryStore) get(client dynamic.Interface, index FactoryIndex) Factory {
76+
func (c *FactoryStore) get(client dynamic.Interface, index FactoryIndex) *Factory {
7177
f, ok := c.data[index]
7278
if ok {
7379
log.Debug("Factory store: the factory with index found",
@@ -115,9 +121,18 @@ func (c *FactoryStore) Start(ctx context.Context, informerId string, client dyna
115121
slog.Int("value", len(factory.handlerRegistrations)),
116122
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
117123

118-
if !informer.HasSynced() {
119-
go informer.Run(factory.ctx.Done())
124+
// Ensure informer.Run is started once and tracked
125+
if factory.done == nil {
126+
factory.done = make(chan struct{})
127+
go func() {
128+
informer.Run(factory.ctx.Done())
129+
close(factory.done)
130+
log.Debug("Factory store: informer goroutine exited",
131+
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
132+
}()
133+
}
120134

135+
if !informer.HasSynced() {
121136
if err := wait.PollUntilContextCancel(ctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) {
122137
return informer.HasSynced(), nil
123138
}); err != nil {
@@ -131,11 +146,10 @@ func (c *FactoryStore) Start(ctx context.Context, informerId string, client dyna
131146

132147
func (c *FactoryStore) Stop(informerId string, index FactoryIndex) {
133148
c.mu.Lock()
134-
defer c.mu.Unlock()
135-
136149
f, ok := c.data[index]
137150
if !ok {
138151
// already deleted
152+
c.mu.Unlock()
139153
return
140154
}
141155

@@ -152,10 +166,32 @@ func (c *FactoryStore) Stop(informerId string, index FactoryIndex) {
152166
slog.Int("value", len(f.handlerRegistrations)),
153167
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
154168
if len(f.handlerRegistrations) == 0 {
169+
log.Debug("Factory store: last handler removed, canceling shared informer",
170+
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
171+
done := f.done
155172
f.cancel()
173+
c.mu.Unlock()
174+
if done != nil {
175+
<-done
176+
}
177+
c.mu.Lock()
156178
delete(c.data, index)
157179
log.Debug("Factory store: deleted factory",
158180
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
181+
c.cond.Broadcast()
182+
}
183+
}
184+
c.mu.Unlock()
185+
}
186+
187+
// WaitStopped blocks until there is no factory for the index
188+
func (c *FactoryStore) WaitStopped(index FactoryIndex) {
189+
c.mu.Lock()
190+
for {
191+
if _, ok := c.data[index]; !ok {
192+
c.mu.Unlock()
193+
return
159194
}
195+
c.cond.Wait()
160196
}
161197
}

pkg/kube_events_manager/kube_events_manager.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type KubeEventsManager interface {
2525

2626
Ch() chan kemtypes.KubeEvent
2727
Stop()
28+
Wait()
2829
}
2930

3031
// kubeEventsManager is a main implementation of KubeEventsManager.
@@ -143,6 +144,18 @@ func (mgr *kubeEventsManager) Stop() {
143144
mgr.cancel()
144145
}
145146

147+
func (mgr *kubeEventsManager) Wait() {
148+
mgr.m.RLock()
149+
monitors := make([]Monitor, 0, len(mgr.Monitors))
150+
for _, mon := range mgr.Monitors {
151+
monitors = append(monitors, mon)
152+
}
153+
mgr.m.RUnlock()
154+
for _, mon := range monitors {
155+
mon.Wait()
156+
}
157+
}
158+
146159
func (mgr *kubeEventsManager) MetricStorage() metric.Storage {
147160
return mgr.metricStorage
148161
}

pkg/kube_events_manager/monitor.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type Monitor interface {
1919
CreateInformers() error
2020
Start(context.Context)
2121
Stop()
22+
Wait()
2223
Snapshot() []kemtypes.ObjectAndFilterResult
2324
EnableKubeEventCb()
2425
GetConfig() *MonitorConfig
@@ -372,6 +373,21 @@ func (m *monitor) Stop() {
372373
}
373374
}
374375

376+
// Wait waits for all started informers to stop
377+
func (m *monitor) Wait() {
378+
for _, informer := range m.ResourceInformers {
379+
informer.wait()
380+
}
381+
m.VaryingInformers.RangeValue(func(value []*resourceInformer) {
382+
for _, informer := range value {
383+
informer.wait()
384+
}
385+
})
386+
if m.NamespaceInformer != nil {
387+
m.NamespaceInformer.wait()
388+
}
389+
}
390+
375391
func (m *monitor) SnapshotOperations() (*CachedObjectsInfo /*total*/, *CachedObjectsInfo /*last*/) {
376392
total := &CachedObjectsInfo{}
377393
last := &CachedObjectsInfo{}

pkg/kube_events_manager/namespace_informer.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type namespaceInformer struct {
2121
ctx context.Context
2222
cancel context.CancelFunc
2323
stopped bool
24+
done chan struct{}
2425

2526
KubeClient *klient.Client
2627
Monitor *MonitorConfig
@@ -128,13 +129,18 @@ func (ni *namespaceInformer) start() {
128129
return
129130
}
130131
cctx, cancel := context.WithCancel(ni.ctx)
132+
ni.done = make(chan struct{})
131133
go func() {
132134
<-ni.ctx.Done()
133135
ni.stopped = true
134136
cancel()
135137
}()
136138

137-
go ni.SharedInformer.Run(cctx.Done())
139+
go func() {
140+
ni.SharedInformer.Run(cctx.Done())
141+
close(ni.done)
142+
log.Debug("Namespace informer goroutine exited", slog.String("name", ni.Monitor.Metadata.DebugName))
143+
}()
138144

139145
if err := wait.PollUntilContextCancel(cctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) {
140146
return ni.SharedInformer.HasSynced(), nil
@@ -145,3 +151,9 @@ func (ni *namespaceInformer) start() {
145151

146152
log.Debug("Informer is ready", slog.String("debugName", ni.Monitor.Metadata.DebugName))
147153
}
154+
155+
func (ni *namespaceInformer) wait() {
156+
if ni.done != nil {
157+
<-ni.done
158+
}
159+
}

pkg/kube_events_manager/resource_informer.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,11 @@ func (ei *resourceInformer) start() {
469469
log.Debug("informer is ready", slog.String("debugName", ei.Monitor.Metadata.DebugName))
470470
}
471471

472+
// wait blocks until the underlying shared informer for this FactoryIndex is stopped
473+
func (ei *resourceInformer) wait() {
474+
DefaultFactoryStore.WaitStopped(ei.FactoryIndex)
475+
}
476+
472477
// CachedObjectsInfo returns info accumulated from start.
473478
func (ei *resourceInformer) getCachedObjectsInfo() CachedObjectsInfo {
474479
ei.cacheLock.RLock()

pkg/shell-operator/operator.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -960,9 +960,20 @@ func (op *ShellOperator) runMetrics() {
960960

961961
// Shutdown pause kubernetes events handling and stop queues. Wait for queues to stop.
962962
func (op *ShellOperator) Shutdown() {
963+
log.Info("shutdown: begin")
963964
op.ScheduleManager.Stop()
965+
log.Info("shutdown: schedule manager stopped")
966+
964967
op.KubeEventsManager.Stop()
968+
log.Info("shutdown: kube events manager canceled, waiting for informers")
969+
if kem, ok := op.KubeEventsManager.(interface{ Wait() }); ok {
970+
kem.Wait()
971+
}
972+
log.Info("shutdown: kube events manager done")
973+
965974
op.TaskQueues.Stop()
975+
log.Info("shutdown: task queues stop signaled, waiting")
966976
// Wait for queues to stop, but no more than 10 seconds
967977
op.TaskQueues.WaitStopWithTimeout(WaitQueuesTimeout)
978+
log.Info("shutdown: task queues stopped")
968979
}

0 commit comments

Comments
 (0)