Skip to content

adjust shutdown process on the httpserver to remove fsm noise #45

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 61 additions & 62 deletions runnables/httpserver/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ type HttpServer interface {
Shutdown(ctx context.Context) error
}

// Runner implements a configurable HTTP server that supports graceful shutdown,
// dynamic reconfiguration, and state monitoring. It meets the Runnable, Reloadable,
// and Stateable interfaces from the supervisor package.
// Runner implements an HTTP server with graceful shutdown, dynamic reconfiguration,
// and state monitoring. It implements the Runnable, Reloadable, and Stateable
// interfaces from the supervisor package.
type Runner struct {
name string
config atomic.Pointer[Config]
Expand All @@ -52,7 +52,7 @@ type Runner struct {
logger *slog.Logger
}

// NewRunner initializes a new HTTPServer runner instance.
// NewRunner creates a new HTTP server runner instance with the provided options.
func NewRunner(opts ...Option) (*Runner, error) {
// Set default logger
logger := slog.Default().WithGroup("httpserver.Runner")
Expand Down Expand Up @@ -115,7 +115,8 @@ func (r *Runner) String() string {
return fmt.Sprintf("HTTPServer{%s}", strings.Join(args, ", "))
}

// Run starts the HTTP server and listens for incoming requests
// Run starts the HTTP server and handles its lifecycle. It transitions through
// FSM states and returns when the server is stopped or encounters an error.
func (r *Runner) Run(ctx context.Context) error {
runCtx, runCancel := context.WithCancel(ctx)
defer runCancel()
Expand Down Expand Up @@ -152,62 +153,28 @@ func (r *Runner) Run(ctx context.Context) error {
return fmt.Errorf("%w: %w", ErrHttpServer, err)
}

// Try to transition to Stopping state
if !r.fsm.TransitionBool(finitestate.StatusStopping) {
// If already in Stopping state, this is okay and we can continue
if r.fsm.GetState() == finitestate.StatusStopping {
r.logger.Debug("Already in Stopping state, continuing shutdown")
} else {
// Otherwise, this is a real failure
r.setStateError()
return fmt.Errorf("%w: transition to Stopping state", ErrStateTransition)
}
}

r.mutex.Lock()
err = r.stopServer(runCtx)
r.mutex.Unlock()

if err != nil {
r.setStateError()
// Return the error directly so it can be checked with errors.Is
return err
}

err = r.fsm.Transition(finitestate.StatusStopped)
if err != nil {
r.setStateError()
return err
}

r.logger.Debug("HTTP server shut down gracefully")
return nil
return r.shutdown(runCtx)
}

// Stop will cancel the parent context, which will close the HTTP server
// Stop signals the HTTP server to shut down by canceling its context.
func (r *Runner) Stop() {
// Only transition to Stopping if we're currently Running
err := r.fsm.TransitionIfCurrentState(finitestate.StatusRunning, finitestate.StatusStopping)
if err != nil {
// This error is expected if we're already stopping, so only log at debug level
r.logger.Debug("Note: Not transitioning to Stopping state", "error", err)
}
r.logger.Debug("Stopping HTTP server")
r.cancel()
}

// serverReadinessProbe checks if the HTTP server is listening and accepting connections
// by attempting to establish a TCP connection to the server's address
// serverReadinessProbe verifies the HTTP server is accepting connections by
// repeatedly attempting TCP connections until success or timeout.
func (r *Runner) serverReadinessProbe(ctx context.Context, addr string) error {
// Create a dialer with timeout
// Configure TCP dialer with connection timeout
dialer := &net.Dialer{
Timeout: 100 * time.Millisecond,
}

// Set up a timeout context for the entire probe operation
// Set timeout for the readiness probe operation
probeCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

// Retry loop - attempt to connect until success or timeout
// Retry connection attempts until success or timeout
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

Expand All @@ -219,17 +186,17 @@ func (r *Runner) serverReadinessProbe(ctx context.Context, addr string) error {
// Attempt to establish a TCP connection
conn, err := dialer.DialContext(ctx, "tcp", addr)
if err == nil {
// Connection successful, server is accepting connections
// Server is ready and accepting connections
if err := conn.Close(); err != nil {
// Check if it's a "closed network connection" error
// Ignore expected connection close errors
if !errors.Is(err, net.ErrClosed) {
r.logger.Warn("Error closing connection", "error", err)
}
}
return nil
}

// Connection failed, log and retry
// Connection failed, continue retrying
r.logger.Debug("Server not ready yet, retrying", "error", err)
}
}
Expand All @@ -241,7 +208,7 @@ func (r *Runner) boot() error {
return ErrRetrieveConfig
}

// Create a new Config with the same settings but use the Runner's context
// Create server config using Runner's context for request handling
serverCfg, err := NewConfig(
originalCfg.ListenAddr,
originalCfg.Routes,
Expand All @@ -254,7 +221,7 @@ func (r *Runner) boot() error {

listenAddr := serverCfg.ListenAddr

// Create the server, and reset the serverCloseOnce with a mutex
// Initialize server instance and reset shutdown guard
r.serverMutex.Lock()
r.server = serverCfg.createServer()
r.serverCloseOnce = sync.Once{}
Expand All @@ -267,7 +234,7 @@ func (r *Runner) boot() error {
"idleTimeout", serverCfg.IdleTimeout,
"drainTimeout", serverCfg.DrainTimeout)

// Start the server in a goroutine
// Start HTTP server in background goroutine
go func() {
r.serverMutex.RLock()
server := r.server
Expand All @@ -284,16 +251,16 @@ func (r *Runner) boot() error {
r.logger.Debug("HTTP server stopped", "listenOn", listenAddr)
}()

// Wait for the server to be ready or fail
// Verify server is ready to accept connections
if err := r.serverReadinessProbe(r.ctx, listenAddr); err != nil {
// If probe fails, attempt to stop the server since it may be partially started
// Clean up partially started server on readiness failure
if err := r.stopServer(r.ctx); err != nil {
r.logger.Warn("Error stopping server", "error", err)
}
return fmt.Errorf("%w: %w", ErrServerBoot, err)
}

// Get the actual listening address for auto-assigned ports
// Retrieve actual listening address for port 0 assignments
actualAddr := listenAddr
r.serverMutex.RLock()
if tcpAddr, ok := r.server.(interface{ Addr() net.Addr }); ok && tcpAddr.Addr() != nil {
Expand All @@ -307,13 +274,13 @@ func (r *Runner) boot() error {
return nil
}

// setConfig atomically updates the current configuration
// setConfig atomically stores the new configuration.
func (r *Runner) setConfig(config *Config) {
r.config.Store(config)
r.logger.Debug("Config updated", "config", config)
}

// getConfig returns the current configuration, loading it via the callback if necessary
// getConfig returns the current configuration, loading it via callback if not set.
func (r *Runner) getConfig() *Config {
config := r.config.Load()
if config != nil {
Expand All @@ -336,6 +303,8 @@ func (r *Runner) getConfig() *Config {
return newConfig
}

// stopServer performs graceful HTTP server shutdown with timeout handling.
// It uses sync.Once to ensure shutdown occurs only once per server instance.
func (r *Runner) stopServer(ctx context.Context) error {
var shutdownErr error
r.serverCloseOnce.Do(func() {
Expand All @@ -361,7 +330,7 @@ func (r *Runner) stopServer(ctx context.Context) error {

localErr := r.server.Shutdown(shutdownCtx)

// Check if the context deadline was exceeded, regardless of the error from Shutdown
// Detect timeout regardless of Shutdown() return value
select {
case <-shutdownCtx.Done():
if errors.Is(shutdownCtx.Err(), context.DeadlineExceeded) {
Expand All @@ -370,20 +339,50 @@ func (r *Runner) stopServer(ctx context.Context) error {
return
}
default:
// Context not done, normal shutdown
// Shutdown completed within timeout
}

// Handle any other error from shutdown
// Handle other shutdown errors
if localErr != nil {
shutdownErr = fmt.Errorf("%w: %w", ErrGracefulShutdown, localErr)
return
}
})

// if stopServer is called, always reset the server reference
// Reset server reference after shutdown attempt
r.serverMutex.Lock()
r.server = nil
r.serverMutex.Unlock()

return shutdownErr
}

// shutdown coordinates HTTP server shutdown with FSM state management.
// It transitions to Stopping state, calls stopServer, then transitions to Stopped.
func (r *Runner) shutdown(ctx context.Context) error {
logger := r.logger.WithGroup("shutdown")
logger.Debug("Shutting down HTTP server")

// Begin shutdown by transitioning to Stopping state
if err := r.fsm.Transition(finitestate.StatusStopping); err != nil {
logger.Error("Failed to transition to stopping state", "error", err)
// Continue shutdown even if state transition fails
}

r.mutex.Lock()
err := r.stopServer(ctx)
r.mutex.Unlock()

if err != nil {
r.setStateError()
return err
}

if err := r.fsm.Transition(finitestate.StatusStopped); err != nil {
r.setStateError()
return err
}

logger.Debug("HTTP server shutdown complete")
return nil
}
Loading