Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VC-35738] Stop the API server when the context is cancelled #604

Merged
merged 2 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 77 additions & 12 deletions pkg/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"strings"
Expand All @@ -25,6 +26,7 @@ import (
"k8s.io/client-go/kubernetes"
clientgocorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/jetstack/preflight/api"
Expand All @@ -50,7 +52,12 @@ const schemaVersion string = "v2.0.0"
// Run starts the agent process
func Run(cmd *cobra.Command, args []string) {
logs.Log.Printf("Preflight agent version: %s (%s)", version.PreflightVersion, version.Commit)
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(
klog.NewContext(
context.Background(),
klog.Background(),
),
)
defer cancel()

file, err := os.Open(Flags.ConfigFilePath)
Expand Down Expand Up @@ -83,11 +90,13 @@ func Run(cmd *cobra.Command, args []string) {
}
}()

group.Go(func() error {
{
server := http.NewServeMux()
const serverAddress = ":8081"
log := klog.FromContext(ctx).WithName("APIServer").WithValues("addr", serverAddress)

if Flags.Profiling {
logs.Log.Printf("pprof profiling was enabled.")
log.Info("Profiling endpoints enabled", "path", "/debug/pprof")
server.HandleFunc("/debug/pprof/", pprof.Index)
server.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
server.HandleFunc("/debug/pprof/profile", pprof.Profile)
Expand All @@ -96,7 +105,7 @@ func Run(cmd *cobra.Command, args []string) {
}

if Flags.Prometheus {
logs.Log.Printf("Prometheus was enabled.\nRunning prometheus on port :8081")
log.Info("Metrics endpoints enabled", "path", "/metrics")
prometheus.MustRegister(metricPayloadSize)
server.Handle("/metrics", promhttp.Handler())
}
Expand All @@ -105,21 +114,32 @@ func Run(cmd *cobra.Command, args []string) {
// what "ready" means for the agent, we just return 200 OK inconditionally.
// The goal is to satisfy some Kubernetes distributions, like OpenShift,
// that require a liveness and health probe to be present for each pod.
log.Info("Healthz endpoints enabled", "path", "/healthz")
server.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
log.Info("Readyz endpoints enabled", "path", "/readyz")
Copy link
Member

@maelvls maelvls Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, unimportant: weird that we start a readiness and liveness server when running --one-shot, but that's all right IMO

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's weird, perhaps in a future PR we can make it so that the API server is not started in one-shot mode.

server.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})

err := http.ListenAndServe(":8081", server)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("failed to run the health check server: %s", err)
}
// The agent must stop if the management server stops
cancel()
return nil
})
group.Go(func() error {
err := listenAndServe(
klog.NewContext(gctx, log),
&http.Server{
Addr: serverAddress,
Handler: server,
BaseContext: func(_ net.Listener) context.Context {
return gctx
},
},
)
if err != nil {
return fmt.Errorf("APIServer: %s", err)
}
return nil
})
}

_, isVenConn := preflightClient.(*client.VenConnClient)
if isVenConn {
Expand Down Expand Up @@ -412,3 +432,48 @@ func postData(config CombinedConfig, preflightClient client.Client, readings []*

return nil
}

// listenAndServe starts the supplied HTTP server and stops it gracefully when
// the supplied context is cancelled.
// It returns when the graceful server shutdown is complete or when the server
// exits with an error.
// If the server fails to start, it returns the server error.
// If the server fails to shutdown gracefully, it returns the shutdown error.
// The server is given 3 seconds to shutdown gracefully before it is stopped
// forcefully.
func listenAndServe(ctx context.Context, server *http.Server) error {
log := klog.FromContext(ctx).WithName("ListenAndServe")

log.V(1).Info("Starting")

listenCTX, listenCancelCause := context.WithCancelCause(context.WithoutCancel(ctx))
go func() {
err := server.ListenAndServe()
listenCancelCause(fmt.Errorf("ListenAndServe: %s", err))
}()

select {
case <-listenCTX.Done():
log.V(1).Info("Shutdown skipped", "reason", "Server already stopped")
return context.Cause(listenCTX)

case <-ctx.Done():
log.V(1).Info("Shutting down")
}

shutdownCTX, shutdownCancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second*3)
shutdownErr := server.Shutdown(shutdownCTX)
shutdownCancel()
if shutdownErr != nil {
shutdownErr = fmt.Errorf("Shutdown: %s", shutdownErr)
}

closeErr := server.Close()
if closeErr != nil {
closeErr = fmt.Errorf("Close: %s", closeErr)
}

log.V(1).Info("Shutdown complete")

return errors.Join(shutdownErr, closeErr)
}
72 changes: 72 additions & 0 deletions pkg/agent/run_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package agent

import (
"bytes"
"context"
"os"
"os/exec"
"testing"
"time"

"github.com/spf13/cobra"
"github.com/stretchr/testify/require"
"k8s.io/klog/v2"

"github.com/jetstack/preflight/pkg/logs"
)

// TestRunOneShot runs the agent in `--one-shot` mode and verifies that it exits
// after the first data gathering iteration.
func TestRunOneShot(t *testing.T) {
if _, found := os.LookupEnv("GO_CHILD"); found {
// Silence the warning about missing pod name for event generation
// TODO(wallrj): This should not be required when an `--input-file` has been supplied.
t.Setenv("POD_NAME", "venafi-kubernetes-e2e")
// Silence the error about missing kubeconfig.
// TODO(wallrj): This should not be required when an `--input-file` has been supplied.
t.Setenv("KUBECONFIG", "testdata/one-shot/success/kubeconfig.yaml")

c := &cobra.Command{}
InitAgentCmdFlags(c, &Flags)
logs.AddFlags(c.Flags())

err := c.ParseFlags([]string{
"--one-shot",
// TODO(wallrj): This should not be required when an `--input-file` has been supplied.
"--api-token=should-not-be-required",
// TODO(wallrj): This should not be required when an `--input-file` has been supplied.
"--install-namespace=default",
"--agent-config-file=testdata/one-shot/success/config.yaml",
"--input-path=testdata/one-shot/success/input.json",
"--output-path=/dev/null",
"-v=1",
})
require.NoError(t, err)

logs.Initialize()
Run(c, nil)
klog.Flush()
return
}
t.Log("Running child process")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
cmd := exec.CommandContext(ctx, os.Args[0], "-test.run=^TestRunOneShot$")
var (
stdout bytes.Buffer
stderr bytes.Buffer
)
cmd.Stdout = &stdout
cmd.Stderr = &stderr
cmd.Env = append(
os.Environ(),
"GO_CHILD=true",
)
err := cmd.Run()

stdoutStr := stdout.String()
stderrStr := stderr.String()
t.Logf("STDOUT\n%s\n", stdoutStr)
t.Logf("STDERR\n%s\n", stderrStr)
require.NoError(t, err, context.Cause(ctx))
}
4 changes: 4 additions & 0 deletions pkg/agent/testdata/one-shot/success/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Just enough venafi-kubernetes-agent config to allow it to run with an input
# file in one-shot mode.
cluster_id: "venafi-kubernetes-agent-e2e"
organization_id: "venafi-kubernetes-agent-e2e"
1 change: 1 addition & 0 deletions pkg/agent/testdata/one-shot/success/input.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[]
15 changes: 15 additions & 0 deletions pkg/agent/testdata/one-shot/success/kubeconfig.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Just enough kubeconfig to satisfy client-go
apiVersion: v1
kind: Config
current-context: cluster-1
contexts:
- name: cluster-1
context:
cluster: cluster-1
user: user-1
clusters:
- name: cluster-1
cluster:
server: https://192.0.2.1:8443
preferences: {}
users: []