Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/server/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ type ProxyRunOptions struct {
LeaseLabel string
// Needs kubernetes client
NeedsKubernetesClient bool
// Graceful shutdown timeout duration
GracefulShutdownTimeout time.Duration
}

func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
Expand Down Expand Up @@ -155,6 +157,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
flags.BoolVar(&o.EnableLeaseController, "enable-lease-controller", o.EnableLeaseController, "Enable lease controller to publish and garbage collect proxy server leases.")
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "The namespace where lease objects are managed by the controller.")
flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.")
flags.DurationVar(&o.GracefulShutdownTimeout, "graceful-shutdown-timeout", o.GracefulShutdownTimeout, "Timeout duration for graceful shutdown of the server. The server will wait for active connections to close before forcefully terminating.")
flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.")
flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.")

Expand Down Expand Up @@ -198,6 +201,7 @@ func (o *ProxyRunOptions) Print() {
klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel)
klog.V(1).Infof("CipherSuites set to %q.\n", o.CipherSuites)
klog.V(1).Infof("XfrChannelSize set to %d.\n", o.XfrChannelSize)
klog.V(1).Infof("GracefulShutdownTimeout set to %v.\n", o.GracefulShutdownTimeout)
}

func (o *ProxyRunOptions) Validate() error {
Expand Down Expand Up @@ -382,6 +386,7 @@ func NewProxyRunOptions() *ProxyRunOptions {
EnableLeaseController: false,
LeaseNamespace: "kube-system",
LeaseLabel: "k8s-app=konnectivity-server",
GracefulShutdownTimeout: 15 * time.Second,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the feature but I think we need to be backward compatible. We should default to 0 which means shutdown immediately and is the current behavior.

}
return &o
}
Expand Down
116 changes: 97 additions & 19 deletions cmd/server/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type Proxy struct {
server *server.ProxyServer
}

type StopFunc func()
type StopFunc func(context.Context) error

func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
o.Print()
Expand Down Expand Up @@ -145,16 +145,12 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return fmt.Errorf("failed to run the frontend server: %v", err)
}
if frontendStop != nil {
defer frontendStop()
}

klog.V(1).Infoln("Starting agent server for tunnel connections.")
err = p.runAgentServer(o, p.server)
if err != nil {
return fmt.Errorf("failed to run the agent server: %v", err)
}
defer p.agentServer.Stop()

labels, err := util.ParseLabels(o.LeaseLabel)
if err != nil {
Expand Down Expand Up @@ -182,17 +178,97 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return fmt.Errorf("failed to run the admin server: %v", err)
}
defer p.adminServer.Close()

klog.V(1).Infoln("Starting health server for healthchecks.")
err = p.runHealthServer(o, p.server)
if err != nil {
return fmt.Errorf("failed to run the health server: %v", err)
}
defer p.healthServer.Close()

<-stopCh
klog.V(1).Infoln("Shutting down server.")
klog.V(1).Infoln("Received shutdown signal, initiating graceful shutdown.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should wrap most of this new code in a if graceful timeout != 0 block.


// Start graceful shutdown with timeout
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), o.GracefulShutdownTimeout)
defer shutdownCancel()

// Create a WaitGroup to track shutdown of all components
var wg sync.WaitGroup

// Shutdown frontend server gracefully (if available)
if frontendStop != nil {
wg.Add(1)
go func() {
defer wg.Done()
klog.V(1).Infoln("Gracefully stopping frontend server...")
if err := frontendStop(shutdownCtx); err != nil {
klog.ErrorS(err, "failed to shut down frontend server")
} else {
klog.V(1).Infoln("frontend server stopped.")
}
}()
}

// Shutdown agent server gracefully
wg.Add(1)
go func() {
defer wg.Done()
klog.V(1).Infoln("Gracefully stopping agent server...")
p.agentServer.GracefulStop()
klog.V(1).Infoln("agent server stopped.")
}()

// Shutdown admin server gracefully
wg.Add(1)
go func() {
defer wg.Done()
klog.V(1).Infoln("Gracefully stopping admin server...")
if err := p.adminServer.Shutdown(shutdownCtx); err != nil {
klog.ErrorS(err, "failed to shut down admin server")

} else {
klog.V(1).Infoln("admin server stopped.")
}
}()

// Shutdown health server gracefully
wg.Add(1)
go func() {
defer wg.Done()
klog.V(1).Infoln("Gracefully stopping health server...")
if err := p.healthServer.Shutdown(shutdownCtx); err != nil {
klog.ErrorS(err, "failed to shut down health server")
} else {
klog.V(1).Infoln("health server stopped.")
}
}()

// Wait for all servers to shutdown or timeout
shutdownComplete := make(chan struct{})
go func() {
wg.Wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know what happens if the other goroutined have completed before this Wait is called? My guess is that is will never trigger. I think it makes more sense to do the non-optional adds. Kick off this goroutines, then kick off the workers.

close(shutdownComplete)
}()

select {
case <-shutdownComplete:
klog.V(1).Infoln("Graceful shutdown completed successfully.")
case <-shutdownCtx.Done():
klog.Warningf("Graceful shutdown timed out after %v, forcing termination.", o.GracefulShutdownTimeout)
// Force stop all servers that might still be running
if p.agentServer != nil {
p.agentServer.Stop()
}
if p.adminServer != nil {
p.adminServer.Close()
}
if p.healthServer != nil {
p.healthServer.Close()
}
// frontend server's force-stop is handled by its StopFunc
}

klog.V(1).Infoln("Server shutdown complete.")

return nil
}
Expand Down Expand Up @@ -260,7 +336,10 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt
"udsFile", o.UdsName,
)
go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) })
stop = grpcServer.GracefulStop
stop = func(_ context.Context) error {
grpcServer.GracefulStop()
return nil
}
} else {
// http-connect
server := &http.Server{
Expand All @@ -269,9 +348,8 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt
Server: s,
},
}
stop = func() {
err := server.Shutdown(ctx)
klog.ErrorS(err, "error shutting down server")
stop = func(shutdownCtx context.Context) error {
return server.Shutdown(shutdownCtx)
}
labels := runpprof.Labels(
"core", "udsHttpFrontend",
Expand Down Expand Up @@ -329,7 +407,7 @@ func (p *Proxy) getTLSConfig(caFile, certFile, keyFile string, cipherSuites []st
return tlsConfig, nil
}

func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOptions, s *server.ProxyServer) (StopFunc, error) {
func (p *Proxy) runMTLSFrontendServer(_ context.Context, o *options.ProxyRunOptions, s *server.ProxyServer) (StopFunc, error) {
var stop StopFunc

var tlsConfig *tls.Config
Expand All @@ -356,7 +434,10 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp
"port", strconv.Itoa(o.ServerPort),
)
go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) })
stop = grpcServer.GracefulStop
stop = func(_ context.Context) error {
grpcServer.GracefulStop()
return nil
}
} else {
// http-connect
server := &http.Server{
Expand All @@ -368,11 +449,8 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp
},
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
}
stop = func() {
err := server.Shutdown(ctx)
if err != nil {
klog.ErrorS(err, "failed to shutdown server")
}
stop = func(shutdownCtx context.Context) error {
return server.Shutdown(shutdownCtx)
}
labels := runpprof.Labels(
"core", "mtlsHttpFrontend",
Expand Down