Skip to content

Commit

Permalink
Stop the server when the context is cancelled
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Wall <richard.wall@venafi.com>
  • Loading branch information
wallrj committed Nov 3, 2024
1 parent c87f7c3 commit fddc921
Showing 1 changed file with 77 additions and 12 deletions.
89 changes: 77 additions & 12 deletions pkg/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"strings"
Expand All @@ -25,6 +26,7 @@ import (
"k8s.io/client-go/kubernetes"
clientgocorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/jetstack/preflight/api"
Expand All @@ -50,7 +52,12 @@ const schemaVersion string = "v2.0.0"
// Run starts the agent process
func Run(cmd *cobra.Command, args []string) {
logs.Log.Printf("Preflight agent version: %s (%s)", version.PreflightVersion, version.Commit)
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(
klog.NewContext(
context.Background(),
klog.Background(),
),
)
defer cancel()

file, err := os.Open(Flags.ConfigFilePath)
Expand Down Expand Up @@ -83,11 +90,13 @@ func Run(cmd *cobra.Command, args []string) {
}
}()

group.Go(func() error {
{
server := http.NewServeMux()
const serverAddress = ":8081"
log := klog.FromContext(ctx).WithName("APIServer").WithValues("addr", serverAddress)

if Flags.Profiling {
logs.Log.Printf("pprof profiling was enabled.")
log.Info("Profiling endpoints enabled", "path", "/debug/pprof")
server.HandleFunc("/debug/pprof/", pprof.Index)
server.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
server.HandleFunc("/debug/pprof/profile", pprof.Profile)
Expand All @@ -96,7 +105,7 @@ func Run(cmd *cobra.Command, args []string) {
}

if Flags.Prometheus {
logs.Log.Printf("Prometheus was enabled.\nRunning prometheus on port :8081")
log.Info("Metrics endpoints enabled", "path", "/metrics")
prometheus.MustRegister(metricPayloadSize)
server.Handle("/metrics", promhttp.Handler())
}
Expand All @@ -105,21 +114,32 @@ func Run(cmd *cobra.Command, args []string) {
// what "ready" means for the agent, we just return 200 OK inconditionally.
// The goal is to satisfy some Kubernetes distributions, like OpenShift,
// that require a liveness and health probe to be present for each pod.
log.Info("Healthz endpoints enabled", "path", "/healthz")
server.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
log.Info("Readyz endpoints enabled", "path", "/readyz")
server.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})

err := http.ListenAndServe(":8081", server)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("failed to run the health check server: %s", err)
}
// The agent must stop if the management server stops
cancel()
return nil
})
group.Go(func() error {
err := listenAndServe(
klog.NewContext(gctx, log),
&http.Server{
Addr: serverAddress,
Handler: server,
BaseContext: func(_ net.Listener) context.Context {
return gctx
},
},
)
if err != nil {
return fmt.Errorf("APIServer: %s", err)
}
return nil
})
}

_, isVenConn := preflightClient.(*client.VenConnClient)
if isVenConn {
Expand Down Expand Up @@ -412,3 +432,48 @@ func postData(config CombinedConfig, preflightClient client.Client, readings []*

return nil
}

// listenAndServe starts the supplied HTTP server and stops it gracefully when
// the supplied context is cancelled.
// It returns when the graceful server shutdown is complete or when the server
// exits with an error.
// If the server fails to start, it returns the server error.
// If the server fails to shutdown gracefully, it returns the shutdown error.
// The server is given 3 seconds to shutdown gracefully before it is stopped
// forcefully.
func listenAndServe(ctx context.Context, server *http.Server) error {
log := klog.FromContext(ctx).WithName("ListenAndServe")

log.V(1).Info("Starting")

listenCTX, listenCancelCause := context.WithCancelCause(context.WithoutCancel(ctx))
go func() {
err := server.ListenAndServe()
listenCancelCause(fmt.Errorf("ListenAndServe: %s", err))
}()

select {
case <-listenCTX.Done():
log.V(1).Info("Shutdown skipped", "reason", "Server already stopped")
return context.Cause(listenCTX)

case <-ctx.Done():
log.V(1).Info("Shutting down")
}

shutdownCTX, shutdownCancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second*3)
shutdownErr := server.Shutdown(shutdownCTX)
shutdownCancel()
if shutdownErr != nil {
shutdownErr = fmt.Errorf("Shutdown: %s", shutdownErr)
}

closeErr := server.Close()
if closeErr != nil {
closeErr = fmt.Errorf("Close: %s", closeErr)
}

log.V(1).Info("Shutdown complete")

return errors.Join(shutdownErr, closeErr)
}

0 comments on commit fddc921

Please sign in to comment.