Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate and modify agent commands #244

Merged
merged 5 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ issues:
- lll
- bodyclose
- maintidx
- unparam
- linters:
- paralleltest # false positive: https://github.com/kunwardeep/paralleltest/issues/8.
text: "does not use range value in test Run"
Expand Down
21 changes: 11 additions & 10 deletions pkg/disruptors/cmd_builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@ import (
"github.com/grafana/xk6-disruptor/pkg/utils"
)

//nolint:dupl
func buildGrpcFaultCmd(fault GrpcFault, duration time.Duration, options GrpcDisruptionOptions) []string {
cmd := []string{
"xk6-disruptor-agent",
"grpc",
"-d", utils.DurationSeconds(duration),
"-t", fmt.Sprint(fault.Port),
}

// TODO: make port mandatory
if fault.Port != 0 {
cmd = append(cmd, "-t", fmt.Sprint(fault.Port))
}

if fault.AverageDelay > 0 {
Expand All @@ -38,10 +43,6 @@ func buildGrpcFaultCmd(fault GrpcFault, duration time.Duration, options GrpcDisr
}
}

if fault.Port != 0 {
cmd = append(cmd, "-t", fmt.Sprint(fault.Port))
}

if len(fault.Exclude) > 0 {
cmd = append(cmd, "-x", fault.Exclude)
}
Expand All @@ -57,14 +58,18 @@ func buildGrpcFaultCmd(fault GrpcFault, duration time.Duration, options GrpcDisr
return cmd
}

//nolint:dupl
func buildHTTPFaultCmd(fault HTTPFault, duration time.Duration, options HTTPDisruptionOptions) []string {
cmd := []string{
"xk6-disruptor-agent",
"http",
"-d", utils.DurationSeconds(duration),
}

// TODO: make port mandatory
if fault.Port != 0 {
cmd = append(cmd, "-t", fmt.Sprint(fault.Port))
}

if fault.AverageDelay > 0 {
cmd = append(
cmd,
Expand All @@ -88,10 +93,6 @@ func buildHTTPFaultCmd(fault HTTPFault, duration time.Duration, options HTTPDisr
}
}

if fault.Port != 0 {
cmd = append(cmd, "-t", fmt.Sprint(fault.Port))
}

if len(fault.Exclude) > 0 {
cmd = append(cmd, "-x", fault.Exclude)
}
Expand Down
36 changes: 22 additions & 14 deletions pkg/disruptors/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ type AgentController interface {
InjectDisruptorAgent(ctx context.Context) error
// ExecCommand executes a command in the targets of the AgentController and reports any error
ExecCommand(ctx context.Context, cmd []string) error
// Targets returns the list of targets for the controller
// Targets retrieves the names of the target of the controller
Targets(ctx context.Context) ([]string, error)
// Visit allows executing a different command on each target returned by a visiting function
Visit(ctx context.Context, visitor func(target string) []string) error
Visit(ctx context.Context, visitor func(target corev1.Pod) ([]string, error)) error
}

// AgentController controls de agents in a set of target pods
type agentController struct {
helper helpers.PodHelper
namespace string
targets []string
targets []corev1.Pod
timeout time.Duration
}

Expand Down Expand Up @@ -80,7 +80,7 @@ func (c *agentController) InjectDisruptorAgent(ctx context.Context) error {
if err != nil {
errors <- err
}
}(pod)
}(pod.Name)
}

wg.Wait()
Expand All @@ -96,23 +96,27 @@ func (c *agentController) InjectDisruptorAgent(ctx context.Context) error {
// ExecCommand executes a command in the targets of the AgentController and reports any error
func (c *agentController) ExecCommand(ctx context.Context, cmd []string) error {
// visit each target with the same command
return c.Visit(ctx, func(string) []string {
return cmd
return c.Visit(ctx, func(corev1.Pod) ([]string, error) {
return cmd, nil
})
}

// Visit allows executing a different command on each target returned by a visiting function
func (c *agentController) Visit(ctx context.Context, visitor func(string) []string) error {
func (c *agentController) Visit(ctx context.Context, visitor func(corev1.Pod) ([]string, error)) error {
var wg sync.WaitGroup
// ensure errors channel has enough space to avoid blocking gorutines
errors := make(chan error, len(c.targets))
for _, pod := range c.targets {
// get the command to execute in the target
cmd := visitor(pod)
wg.Add(1)
// attach each container asynchronously
go func(pod string) {
_, stderr, err := c.helper.Exec(pod, "xk6-agent", cmd, []byte{})
go func(pod corev1.Pod) {
// get the command to execute in the target
cmd, err := visitor(pod)
if err != nil {
errors <- fmt.Errorf("error building command for pod %s: %w", pod.Name, err)
}

_, stderr, err := c.helper.Exec(pod.Name, "xk6-agent", cmd, []byte{})
if err != nil {
errors <- fmt.Errorf("error invoking agent: %w \n%s", err, string(stderr))
}
Expand All @@ -131,17 +135,21 @@ func (c *agentController) Visit(ctx context.Context, visitor func(string) []stri
}
}

// Targets retrieves the list of target pods for the given PodSelector
// Targets retrieves the list of names of the target pods
func (c *agentController) Targets(ctx context.Context) ([]string, error) {
return c.targets, nil
names := []string{}
for _, p := range c.targets {
names = append(names, p.Name)
}
return names, nil
}

// NewAgentController creates a new controller for a list of target pods
func NewAgentController(
ctx context.Context,
helper helpers.PodHelper,
namespace string,
targets []string,
targets []corev1.Pod,
timeout time.Duration,
) AgentController {
if timeout == 0 {
Expand Down
20 changes: 12 additions & 8 deletions pkg/disruptors/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ func Test_InjectAgent(t *testing.T) {

objs := []runtime.Object{}

targets := []string{}
targets := []corev1.Pod{}
for _, pod := range tc.pods {
objs = append(objs, pod)
targets = append(targets, pod.Name)
targets = append(targets, *pod)
}

client := fake.NewSimpleClientset(objs...)
Expand Down Expand Up @@ -98,10 +98,10 @@ func Test_InjectAgent(t *testing.T) {
return
}

for _, podName := range targets {
for _, p := range targets {
pod, err := client.CoreV1().
Pods(tc.namespace).
Get(context.TODO(), podName, metav1.GetOptions{})
Get(context.TODO(), p.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("failed: %v", err)
return
Expand Down Expand Up @@ -171,10 +171,10 @@ func Test_ExecCommand(t *testing.T) {

objs := []runtime.Object{}

targets := []string{}
targets := []corev1.Pod{}
for _, pod := range tc.pods {
objs = append(objs, pod)
targets = append(targets, pod.Name)
targets = append(targets, *pod)
}
client := fake.NewSimpleClientset(objs...)
executor := helpers.NewFakePodCommandExecutor()
Expand Down Expand Up @@ -208,7 +208,7 @@ func Test_ExecCommand(t *testing.T) {

pods := map[string]bool{}
for _, p := range targets {
pods[p] = true
pods[p.Name] = true
}

history := executor.GetHistory()
Expand All @@ -217,7 +217,11 @@ func Test_ExecCommand(t *testing.T) {
}
for _, c := range history {
if _, found := pods[c.Pod]; !found {
t.Errorf("invalid pod name. Expected to be in %s got %s", targets, c.Pod)
podNames := []string{}
for _, p := range targets {
podNames = append(podNames, p.Name)
}
t.Errorf("invalid pod name. Expected to be in %s got %s", podNames, c.Pod)
return
}
// TODO: don't use hard-coded agent name
Expand Down
2 changes: 1 addition & 1 deletion pkg/disruptors/disruptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import "context"

// Disruptor defines the generic interface implemented by all disruptors
type Disruptor interface {
// Targets returns the list of targets for the disruptor
// Targets returns the names of the targets for the disruptor
Targets(ctx context.Context) ([]string, error)
}
42 changes: 23 additions & 19 deletions pkg/disruptors/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (

"github.com/grafana/xk6-disruptor/pkg/kubernetes"
"github.com/grafana/xk6-disruptor/pkg/kubernetes/helpers"
"github.com/grafana/xk6-disruptor/pkg/utils"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -101,7 +103,6 @@ func NewPodDisruptor(
}, nil
}

// Targets retrieves the list of target pods for the given PodSelector
func (d *podDisruptor) Targets(ctx context.Context) ([]string, error) {
return d.controller.Targets(ctx)
}
Expand All @@ -113,14 +114,22 @@ func (d *podDisruptor) InjectHTTPFaults(
duration time.Duration,
options HTTPDisruptionOptions,
) error {
// TODO: make port mandatory instead of using a default
if fault.Port == 0 {
fault.Port = DefaultTargetPort
}

// TODO: adapt command to each pod
cmd := buildHTTPFaultCmd(fault, duration, options)

err := d.validatePort(ctx, fault.Port)
if err != nil {
return fmt.Errorf("validate fault port: %w", err)
}
err := d.controller.Visit(ctx, func(pod corev1.Pod) ([]string, error) {
if !utils.HasPort(pod, fault.Port) {
return nil, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
}

return cmd, nil
})

err = d.controller.ExecCommand(ctx, cmd)
return err
}

Expand All @@ -131,21 +140,16 @@ func (d *podDisruptor) InjectGrpcFaults(
duration time.Duration,
options GrpcDisruptionOptions,
) error {
// TODO: adapt command to each pod
cmd := buildGrpcFaultCmd(fault, duration, options)

err := d.validatePort(ctx, fault.Port)
if err != nil {
return fmt.Errorf("validate fault port: %w", err)
}

err = d.controller.ExecCommand(ctx, cmd)
return err
}
err := d.controller.Visit(ctx, func(pod corev1.Pod) ([]string, error) {
if !utils.HasPort(pod, fault.Port) {
return nil, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
}

func (d *podDisruptor) validatePort(ctx context.Context, port uint) error {
if port == 0 {
port = DefaultTargetPort
}
return cmd, nil
})

return d.podHelper.ValidatePort(ctx, d.podFilter, port)
return err
}
Loading