Skip to content

Commit dedeefa

Browse files
committed
implements graceful termination for server
Signed-off-by: Imran Pochi <imranpochi@microsoft.com>
1 parent 7c8afba commit dedeefa

File tree

2 files changed

+101
-18
lines changed

2 files changed

+101
-18
lines changed

cmd/server/app/options/options.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ type ProxyRunOptions struct {
115115
LeaseLabel string
116116
// Needs kubernetes client
117117
NeedsKubernetesClient bool
118+
// Graceful shutdown timeout duration
119+
GracefulShutdownTimeout time.Duration
118120
}
119121

120122
func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
@@ -155,6 +157,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
155157
flags.BoolVar(&o.EnableLeaseController, "enable-lease-controller", o.EnableLeaseController, "Enable lease controller to publish and garbage collect proxy server leases.")
156158
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "The namespace where lease objects are managed by the controller.")
157159
flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.")
160+
flags.DurationVar(&o.GracefulShutdownTimeout, "graceful-shutdown-timeout", o.GracefulShutdownTimeout, "Timeout duration for graceful shutdown of the server. The server will wait for active connections to close before forcefully terminating.")
158161
flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.")
159162
flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.")
160163

@@ -198,6 +201,7 @@ func (o *ProxyRunOptions) Print() {
198201
klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel)
199202
klog.V(1).Infof("CipherSuites set to %q.\n", o.CipherSuites)
200203
klog.V(1).Infof("XfrChannelSize set to %d.\n", o.XfrChannelSize)
204+
klog.V(1).Infof("GracefulShutdownTimeout set to %v.\n", o.GracefulShutdownTimeout)
201205
}
202206

203207
func (o *ProxyRunOptions) Validate() error {
@@ -382,6 +386,7 @@ func NewProxyRunOptions() *ProxyRunOptions {
382386
EnableLeaseController: false,
383387
LeaseNamespace: "kube-system",
384388
LeaseLabel: "k8s-app=konnectivity-server",
389+
GracefulShutdownTimeout: 15 * time.Second,
385390
}
386391
return &o
387392
}

cmd/server/app/server.go

Lines changed: 96 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ type Proxy struct {
9595
server *server.ProxyServer
9696
}
9797

98-
type StopFunc func()
98+
type StopFunc func(context.Context) error
9999

100100
func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
101101
o.Print()
@@ -145,16 +145,12 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
145145
if err != nil {
146146
return fmt.Errorf("failed to run the frontend server: %v", err)
147147
}
148-
if frontendStop != nil {
149-
defer frontendStop()
150-
}
151148

152149
klog.V(1).Infoln("Starting agent server for tunnel connections.")
153150
err = p.runAgentServer(o, p.server)
154151
if err != nil {
155152
return fmt.Errorf("failed to run the agent server: %v", err)
156153
}
157-
defer p.agentServer.Stop()
158154

159155
labels, err := util.ParseLabels(o.LeaseLabel)
160156
if err != nil {
@@ -182,17 +178,97 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
182178
if err != nil {
183179
return fmt.Errorf("failed to run the admin server: %v", err)
184180
}
185-
defer p.adminServer.Close()
186181

187182
klog.V(1).Infoln("Starting health server for healthchecks.")
188183
err = p.runHealthServer(o, p.server)
189184
if err != nil {
190185
return fmt.Errorf("failed to run the health server: %v", err)
191186
}
192-
defer p.healthServer.Close()
193187

194188
<-stopCh
195-
klog.V(1).Infoln("Shutting down server.")
189+
klog.V(1).Infoln("Received shutdown signal, initiating graceful shutdown.")
190+
191+
// Start graceful shutdown with timeout
192+
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), o.GracefulShutdownTimeout)
193+
defer shutdownCancel()
194+
195+
// Create a WaitGroup to track shutdown of all components
196+
var wg sync.WaitGroup
197+
198+
// Shutdown frontend server gracefully (if available)
199+
if frontendStop != nil {
200+
wg.Add(1)
201+
go func() {
202+
defer wg.Done()
203+
klog.V(1).Infoln("Gracefully stopping frontend server...")
204+
if err := frontendStop(shutdownCtx); err != nil {
205+
klog.ErrorS(err, "failed to shut down frontend server")
206+
} else {
207+
klog.V(1).Infoln("frontend server stopped.")
208+
}
209+
}()
210+
}
211+
212+
// Shutdown agent server gracefully
213+
wg.Add(1)
214+
go func() {
215+
defer wg.Done()
216+
klog.V(1).Infoln("Gracefully stopping agent server...")
217+
p.agentServer.GracefulStop()
218+
klog.V(1).Infoln("agent server stopped.")
219+
}()
220+
221+
// Shutdown admin server gracefully
222+
wg.Add(1)
223+
go func() {
224+
defer wg.Done()
225+
klog.V(1).Infoln("Gracefully stopping admin server...")
226+
if err := p.adminServer.Shutdown(shutdownCtx); err != nil {
227+
klog.ErrorS(err, "failed to shut down admin server")
228+
229+
} else {
230+
klog.V(1).Infoln("admin server stopped.")
231+
}
232+
}()
233+
234+
// Shutdown health server gracefully
235+
wg.Add(1)
236+
go func() {
237+
defer wg.Done()
238+
klog.V(1).Infoln("Gracefully stopping health server...")
239+
if err := p.healthServer.Shutdown(shutdownCtx); err != nil {
240+
klog.ErrorS(err, "failed to shut down health server")
241+
} else {
242+
klog.V(1).Infoln("health server stopped.")
243+
}
244+
}()
245+
246+
// Wait for all servers to shutdown or timeout
247+
shutdownComplete := make(chan struct{})
248+
go func() {
249+
wg.Wait()
250+
close(shutdownComplete)
251+
}()
252+
253+
select {
254+
case <-shutdownComplete:
255+
klog.V(1).Infoln("Graceful shutdown completed successfully.")
256+
case <-shutdownCtx.Done():
257+
klog.Warningf("Graceful shutdown timed out after %v, forcing termination.", o.GracefulShutdownTimeout)
258+
// Force stop all servers that might still be running
259+
if p.agentServer != nil {
260+
p.agentServer.Stop()
261+
}
262+
if p.adminServer != nil {
263+
p.adminServer.Close()
264+
}
265+
if p.healthServer != nil {
266+
p.healthServer.Close()
267+
}
268+
// frontend server's force-stop is handled by its StopFunc
269+
}
270+
271+
klog.V(1).Infoln("Server shutdown complete.")
196272

197273
return nil
198274
}
@@ -260,7 +336,10 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt
260336
"udsFile", o.UdsName,
261337
)
262338
go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) })
263-
stop = grpcServer.GracefulStop
339+
stop = func(ctx context.Context) error {
340+
grpcServer.GracefulStop()
341+
return nil
342+
}
264343
} else {
265344
// http-connect
266345
server := &http.Server{
@@ -269,9 +348,8 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt
269348
Server: s,
270349
},
271350
}
272-
stop = func() {
273-
err := server.Shutdown(ctx)
274-
klog.ErrorS(err, "error shutting down server")
351+
stop = func(shutdownCtx context.Context) error {
352+
return server.Shutdown(shutdownCtx)
275353
}
276354
labels := runpprof.Labels(
277355
"core", "udsHttpFrontend",
@@ -356,7 +434,10 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp
356434
"port", strconv.Itoa(o.ServerPort),
357435
)
358436
go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) })
359-
stop = grpcServer.GracefulStop
437+
stop = func(ctx context.Context) error {
438+
grpcServer.GracefulStop()
439+
return nil
440+
}
360441
} else {
361442
// http-connect
362443
server := &http.Server{
@@ -368,11 +449,8 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp
368449
},
369450
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
370451
}
371-
stop = func() {
372-
err := server.Shutdown(ctx)
373-
if err != nil {
374-
klog.ErrorS(err, "failed to shutdown server")
375-
}
452+
stop = func(shutdownCtx context.Context) error {
453+
return server.Shutdown(shutdownCtx)
376454
}
377455
labels := runpprof.Labels(
378456
"core", "mtlsHttpFrontend",

0 commit comments

Comments
 (0)