Skip to content

Commit

Permalink
Allow access to pods using port forwarder (#249)
Browse files Browse the repository at this point in the history
* Allow access to pods using port forwarder

Signed-off-by: Pablo Chacin <pablochacin@gmail.com>
  • Loading branch information
pablochacin authored Jul 6, 2023
1 parent 49d65e8 commit 796b721
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 0 deletions.
101 changes: 101 additions & 0 deletions e2e/kubectl/kubectl_e2e_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//go:build e2e
// +build e2e

package e2e

import (
"bytes"
"context"
"fmt"
"net/http"
"testing"
"time"

"github.com/grafana/xk6-disruptor/pkg/kubernetes"
"github.com/grafana/xk6-disruptor/pkg/testutils/e2e/cluster"
"github.com/grafana/xk6-disruptor/pkg/testutils/e2e/deploy"
"github.com/grafana/xk6-disruptor/pkg/testutils/e2e/kubectl"
"github.com/grafana/xk6-disruptor/pkg/testutils/e2e/kubernetes/namespace"
"github.com/grafana/xk6-disruptor/pkg/testutils/kubernetes/builders"
)

func Test_Kubectl(t *testing.T) {
cluster, err := cluster.BuildE2eCluster(
t,
cluster.DefaultE2eClusterConfig(),
cluster.WithName("e2e-kubectl"),
cluster.WithIngressPort(30087),
)
if err != nil {
t.Errorf("failed to create cluster: %v", err)
return
}

k8s, err := kubernetes.NewFromKubeconfig(cluster.Kubeconfig())
if err != nil {
t.Errorf("error creating kubernetes client: %v", err)
return
}

// Test Wait Pod Running
t.Run("Test local random port", func(t *testing.T) {
namespace, err := namespace.CreateTestNamespace(context.TODO(), t, k8s.Client())
if err != nil {
t.Errorf("failed to create test namespace: %v", err)
return
}

// Deploy nginx
nginx := builders.NewPodBuilder("nginx").
WithContainer(
*builders.NewContainerBuilder("nginx").
WithImage("nginx").
WithPort("http", 80).
Build(),
).
Build()

err = deploy.RunPod(k8s, namespace, nginx, 20 * time.Second)
if err != nil {
t.Errorf("failed to create test pod: %v", err)
return
}

client, err := kubectl.NewFromKubeconfig(context.TODO(), cluster.Kubeconfig() )
if err != nil {
t.Errorf("failed to create kubectl client: %v", err)
return
}

ctx, stopper := context.WithCancel(context.TODO())
// ensure por forwarder is cancelled
defer stopper()

port, err := client.ForwardPodPort(ctx, namespace, nginx.GetName(), 80)
if err != nil {
t.Errorf("failed to forward local port: %v", err)
return
}

url := fmt.Sprintf("http://localhost:%d", port)
request, err := http.NewRequest("GET", url, bytes.NewReader([]byte{}))
if err != nil {
t.Errorf("failed to create request: %v", err)
return
}

resp, err := http.DefaultClient.Do(request)
if err != nil {
t.Errorf("failed make request: %v", err)
return
}
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
t.Errorf("expected status code %d but %d received", http.StatusOK, resp.StatusCode)
return
}
})
}
114 changes: 114 additions & 0 deletions pkg/testutils/e2e/kubectl/kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ package kubectl
import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"path/filepath"
"strings"

Expand All @@ -20,14 +23,18 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/transport/spdy"
"k8s.io/client-go/util/homedir"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/yaml"

"k8s.io/client-go/tools/portforward"
)

// Client holds the state to access kubernetes
type Client struct {
config *rest.Config
dynamic dynamic.Interface
mapper meta.RESTMapper
serializer runtime.Serializer
Expand Down Expand Up @@ -68,6 +75,7 @@ func NewForConfig(ctx context.Context, config *rest.Config) (*Client, error) {
}

return &Client{
config: config,
mapper: mapper,
dynamic: dynamic,
serializer: yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme),
Expand Down Expand Up @@ -177,3 +185,109 @@ func (c *Client) Apply(ctx context.Context, yaml string) error {

return nil
}

type portForwardConfig struct {
localport uint
stderr io.Writer
stdout io.Writer
}

// PortForwardOption defines a configuration option for port forwarding
type PortForwardOption func(portForwardConfig) portForwardConfig

// WithLocalPort sets the local port to listen for request. Defaults to 0 (random local port)
func WithLocalPort(port uint) PortForwardOption {
return func(p portForwardConfig) portForwardConfig {
p.localport = port
return p
}
}

// WithOutputStreams sets the output streams for the port forwarder. Default to nil
func WithOutputStreams(stdout io.Writer, stderr io.Writer) PortForwardOption {
return func(p portForwardConfig) portForwardConfig {
p.stdout = stdout
p.stderr = stderr
return p
}
}

func newPortForwardConfig(opts ...PortForwardOption) portForwardConfig {
config := portForwardConfig{
localport: 0,
stdout: nil,
stderr: nil,
}

for _, option := range opts {
config = option(config)
}

return config
}

// ForwardPodPort opens a local port for forwards requests to a pod's port.
// Returns the local port used for listening
func (c *Client) ForwardPodPort(
ctx context.Context,
namespace string,
pod string,
port uint,
opts ...PortForwardOption,
) (uint, error) {
config := newPortForwardConfig(opts...)

path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, pod)
host, err := url.Parse(c.config.Host)
if err != nil {
return 0, fmt.Errorf("invalid host URL in k8s client config: %w", err)
}
url := &url.URL{
Scheme: "https",
Path: path,
Host: host.Host,
}

transport, upgrader, err := spdy.RoundTripperFor(c.config)
if err != nil {
return 0, err
}
dialer := spdy.NewDialer(
upgrader,
&http.Client{Transport: transport},
http.MethodPost,
url,
)

ports := []string{fmt.Sprintf("%d:%d", config.localport, port)}
ready := make(chan struct{})
fw, err := portforward.New(
dialer,
ports,
ctx.Done(),
ready,
config.stdout,
config.stderr,
)
if err != nil {
return 0, err
}

errors := make(chan error)
go func() {
errors <- fw.ForwardPorts()
}()

// Wait for the port forwarder to be ready to return port
select {
case <-ready:
// return the local port (we are waiting for ready, so no error expected)
p, _ := fw.GetPorts()
// assumes only one port was forwarded
return uint(p[0].Local), nil
case <-ctx.Done():
return 0, ctx.Err()
case e := <-errors:
return 0, fmt.Errorf("failed to start port forwarding: %w", e)
}
}

0 comments on commit 796b721

Please sign in to comment.