From 796b7212941d4727eb19266aae4f8a96425ff9bd Mon Sep 17 00:00:00 2001 From: pablochacin Date: Thu, 6 Jul 2023 19:21:18 +0200 Subject: [PATCH] Allow access to pods using port forwarder (#249) * Allow access to pods using port forwarder Signed-off-by: Pablo Chacin --- e2e/kubectl/kubectl_e2e_test.go | 101 ++++++++++++++++++++++++ pkg/testutils/e2e/kubectl/kubectl.go | 114 +++++++++++++++++++++++++++ 2 files changed, 215 insertions(+) create mode 100644 e2e/kubectl/kubectl_e2e_test.go diff --git a/e2e/kubectl/kubectl_e2e_test.go b/e2e/kubectl/kubectl_e2e_test.go new file mode 100644 index 00000000..2f0da5f8 --- /dev/null +++ b/e2e/kubectl/kubectl_e2e_test.go @@ -0,0 +1,101 @@ +//go:build e2e +// +build e2e + +package e2e + +import ( + "bytes" + "context" + "fmt" + "net/http" + "testing" + "time" + + "github.com/grafana/xk6-disruptor/pkg/kubernetes" + "github.com/grafana/xk6-disruptor/pkg/testutils/e2e/cluster" + "github.com/grafana/xk6-disruptor/pkg/testutils/e2e/deploy" + "github.com/grafana/xk6-disruptor/pkg/testutils/e2e/kubectl" + "github.com/grafana/xk6-disruptor/pkg/testutils/e2e/kubernetes/namespace" + "github.com/grafana/xk6-disruptor/pkg/testutils/kubernetes/builders" +) + +func Test_Kubectl(t *testing.T) { + cluster, err := cluster.BuildE2eCluster( + t, + cluster.DefaultE2eClusterConfig(), + cluster.WithName("e2e-kubectl"), + cluster.WithIngressPort(30087), + ) + if err != nil { + t.Errorf("failed to create cluster: %v", err) + return + } + + k8s, err := kubernetes.NewFromKubeconfig(cluster.Kubeconfig()) + if err != nil { + t.Errorf("error creating kubernetes client: %v", err) + return + } + + // Test Wait Pod Running + t.Run("Test local random port", func(t *testing.T) { + namespace, err := namespace.CreateTestNamespace(context.TODO(), t, k8s.Client()) + if err != nil { + t.Errorf("failed to create test namespace: %v", err) + return + } + + // Deploy nginx + nginx := builders.NewPodBuilder("nginx"). + WithContainer( + *builders.NewContainerBuilder("nginx"). + WithImage("nginx"). + WithPort("http", 80). + Build(), + ). + Build() + + err = deploy.RunPod(k8s, namespace, nginx, 20 * time.Second) + if err != nil { + t.Errorf("failed to create test pod: %v", err) + return + } + + client, err := kubectl.NewFromKubeconfig(context.TODO(), cluster.Kubeconfig() ) + if err != nil { + t.Errorf("failed to create kubectl client: %v", err) + return + } + + ctx, stopper := context.WithCancel(context.TODO()) + // ensure por forwarder is cancelled + defer stopper() + + port, err := client.ForwardPodPort(ctx, namespace, nginx.GetName(), 80) + if err != nil { + t.Errorf("failed to forward local port: %v", err) + return + } + + url := fmt.Sprintf("http://localhost:%d", port) + request, err := http.NewRequest("GET", url, bytes.NewReader([]byte{})) + if err != nil { + t.Errorf("failed to create request: %v", err) + return + } + + resp, err := http.DefaultClient.Do(request) + if err != nil { + t.Errorf("failed make request: %v", err) + return + } + defer func() { + _ = resp.Body.Close() + }() + + if resp.StatusCode != http.StatusOK { + t.Errorf("expected status code %d but %d received", http.StatusOK, resp.StatusCode) + return + } + }) +} \ No newline at end of file diff --git a/pkg/testutils/e2e/kubectl/kubectl.go b/pkg/testutils/e2e/kubectl/kubectl.go index 9e9b2b65..7483047e 100644 --- a/pkg/testutils/e2e/kubectl/kubectl.go +++ b/pkg/testutils/e2e/kubectl/kubectl.go @@ -7,6 +7,9 @@ package kubectl import ( "context" "fmt" + "io" + "net/http" + "net/url" "path/filepath" "strings" @@ -20,14 +23,18 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/transport/spdy" "k8s.io/client-go/util/homedir" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer/yaml" + + "k8s.io/client-go/tools/portforward" ) // Client holds the state to access kubernetes type Client struct { + config *rest.Config dynamic dynamic.Interface mapper meta.RESTMapper serializer runtime.Serializer @@ -68,6 +75,7 @@ func NewForConfig(ctx context.Context, config *rest.Config) (*Client, error) { } return &Client{ + config: config, mapper: mapper, dynamic: dynamic, serializer: yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme), @@ -177,3 +185,109 @@ func (c *Client) Apply(ctx context.Context, yaml string) error { return nil } + +type portForwardConfig struct { + localport uint + stderr io.Writer + stdout io.Writer +} + +// PortForwardOption defines a configuration option for port forwarding +type PortForwardOption func(portForwardConfig) portForwardConfig + +// WithLocalPort sets the local port to listen for request. Defaults to 0 (random local port) +func WithLocalPort(port uint) PortForwardOption { + return func(p portForwardConfig) portForwardConfig { + p.localport = port + return p + } +} + +// WithOutputStreams sets the output streams for the port forwarder. Default to nil +func WithOutputStreams(stdout io.Writer, stderr io.Writer) PortForwardOption { + return func(p portForwardConfig) portForwardConfig { + p.stdout = stdout + p.stderr = stderr + return p + } +} + +func newPortForwardConfig(opts ...PortForwardOption) portForwardConfig { + config := portForwardConfig{ + localport: 0, + stdout: nil, + stderr: nil, + } + + for _, option := range opts { + config = option(config) + } + + return config +} + +// ForwardPodPort opens a local port for forwards requests to a pod's port. +// Returns the local port used for listening +func (c *Client) ForwardPodPort( + ctx context.Context, + namespace string, + pod string, + port uint, + opts ...PortForwardOption, +) (uint, error) { + config := newPortForwardConfig(opts...) + + path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, pod) + host, err := url.Parse(c.config.Host) + if err != nil { + return 0, fmt.Errorf("invalid host URL in k8s client config: %w", err) + } + url := &url.URL{ + Scheme: "https", + Path: path, + Host: host.Host, + } + + transport, upgrader, err := spdy.RoundTripperFor(c.config) + if err != nil { + return 0, err + } + dialer := spdy.NewDialer( + upgrader, + &http.Client{Transport: transport}, + http.MethodPost, + url, + ) + + ports := []string{fmt.Sprintf("%d:%d", config.localport, port)} + ready := make(chan struct{}) + fw, err := portforward.New( + dialer, + ports, + ctx.Done(), + ready, + config.stdout, + config.stderr, + ) + if err != nil { + return 0, err + } + + errors := make(chan error) + go func() { + errors <- fw.ForwardPorts() + }() + + // Wait for the port forwarder to be ready to return port + select { + case <-ready: + // return the local port (we are waiting for ready, so no error expected) + p, _ := fw.GetPorts() + // assumes only one port was forwarded + return uint(p[0].Local), nil + case <-ctx.Done(): + return 0, ctx.Err() + case e := <-errors: + return 0, fmt.Errorf("failed to start port forwarding: %w", e) + } +}