Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Export HTTP server manager runnable implementation #2473

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,8 @@ func (cm *controllerManager) addHealthProbeServer() error {
mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler))
}

return cm.add(&server{
Kind: "health probe",
Log: cm.logger,
return cm.add(&Server{
Name: "health probe",
Server: srv,
Listener: cm.healthProbeListener,
})
Expand All @@ -302,9 +301,8 @@ func (cm *controllerManager) addPprofServer() error {
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

return cm.add(&server{
Kind: "pprof",
Log: cm.logger,
return cm.add(&Server{
Name: "pprof",
Server: srv,
Listener: cm.pprofListener,
})
Expand Down Expand Up @@ -384,11 +382,12 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
}
}

// First start any internal HTTP servers, which includes health probes, metrics and profiling if enabled.
// First start any HTTP servers, which includes health probes and profiling, if enabled.
sbueringer marked this conversation as resolved.
Show resolved Hide resolved
//
// WARNING: Internal HTTP servers MUST start before any cache is populated, otherwise it would block
// conversion webhooks to be ready for serving which make the cache never get ready.
if err := cm.runnables.HTTPServers.Start(cm.internalCtx); err != nil {
// WARNING: HTTPServers includes the health probes, which MUST start before any cache is populated, otherwise
// it would block conversion webhooks to be ready for serving which make the cache never get ready.
logCtx := logr.NewContext(cm.internalCtx, cm.logger)
if err := cm.runnables.HTTPServers.Start(logCtx); err != nil {
return fmt.Errorf("failed to start HTTP servers: %w", err)
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/manager/runnable_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
// The runnables added after Start are started directly.
func (r *runnables) Add(fn Runnable) error {
switch runnable := fn.(type) {
case *server:
case *Server:
if runnable.NeedLeaderElection() {
return r.LeaderElection.Add(fn, nil)
}
return r.HTTPServers.Add(fn, nil)
case hasCache:
return r.Caches.Add(fn, func(ctx context.Context) bool {
Expand Down
3 changes: 2 additions & 1 deletion pkg/manager/runnable_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/utils/ptr"

"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)
Expand All @@ -22,7 +23,7 @@ var _ = Describe("runnables", func() {
})

It("should add HTTP servers to the appropriate group", func() {
server := &server{}
server := &Server{}
r := newRunnables(defaultBaseContext, errCh)
Expect(r.Add(server)).To(Succeed())
Expect(r.HTTPServers.startQueue).To(HaveLen(1))
Expand Down
74 changes: 61 additions & 13 deletions pkg/manager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,89 @@ import (
"errors"
"net"
"net/http"
"time"

"github.com/go-logr/logr"
crlog "sigs.k8s.io/controller-runtime/pkg/log"
)

// server is a general purpose HTTP server Runnable for a manager
// to serve some internal handlers such as health probes, metrics and profiling.
type server struct {
Kind string
Log logr.Logger
Server *http.Server
var (
_ Runnable = (*Server)(nil)
_ LeaderElectionRunnable = (*Server)(nil)
)

// Server is a general purpose HTTP server Runnable for a manager.
// It is used to serve some internal handlers for health probes and profiling,
// but it can also be used to run custom servers.
type Server struct {
// Name is an optional string that describes the purpose of the server. It is used in logs to distinguish
// among multiple servers.
Name string

// Server is the HTTP server to run. It is required.
Server *http.Server

// Listener is an optional listener to use. If not set, the server start a listener using the server.Addr.
// Using a listener is useful when the port reservation needs to happen in advance of this runnable starting.
Listener net.Listener

// OnlyServeWhenLeader is an optional bool that indicates that the server should only be started when the manager is the leader.
OnlyServeWhenLeader bool

// ShutdownTimeout is an optional duration that indicates how long to wait for the server to shutdown gracefully. If not set,
// the server will wait indefinitely for all connections to close.
ShutdownTimeout *time.Duration
}

func (s *server) Start(ctx context.Context) error {
log := s.Log.WithValues("kind", s.Kind, "addr", s.Listener.Addr())
// Start starts the server. It will block until the server is stopped or an error occurs.
func (s *Server) Start(ctx context.Context) error {
log := crlog.FromContext(ctx)
if s.Name != "" {
log = log.WithValues("name", s.Name)
}
log = log.WithValues("addr", s.addr())

serverShutdown := make(chan struct{})
go func() {
<-ctx.Done()
log.Info("shutting down server")
if err := s.Server.Shutdown(context.Background()); err != nil {

shutdownCtx := context.Background()
if s.ShutdownTimeout != nil {
var shutdownCancel context.CancelFunc
shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), *s.ShutdownTimeout)
defer shutdownCancel()
}

if err := s.Server.Shutdown(shutdownCtx); err != nil {
log.Error(err, "error shutting down server")
}
close(serverShutdown)
}()

log.Info("starting server")
if err := s.Server.Serve(s.Listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
if err := s.serve(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}

<-serverShutdown
return nil
}

func (s *server) NeedLeaderElection() bool {
return false
// NeedLeaderElection returns true if the server should only be started when the manager is the leader.
func (s *Server) NeedLeaderElection() bool {
return s.OnlyServeWhenLeader
}
Comment on lines +92 to +95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a use case for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use case I have in mind is when there is a strong coupling between what is being reconciled and what is being served. One example might be an API that represents OCI images, where creating an instance of that API makes the OCI image contents available as a tar archive via the HTTP server.


func (s *Server) addr() string {
if s.Listener != nil {
return s.Listener.Addr().String()
}
return s.Server.Addr
}

func (s *Server) serve() error {
if s.Listener != nil {
return s.Server.Serve(s.Listener)
}
return s.Server.ListenAndServe()
}