Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel agent command #279

Merged
merged 4 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions cmd/agent/commands/cleanup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package commands

import (
"syscall"

"github.com/grafana/xk6-disruptor/pkg/runtime"
"github.com/spf13/cobra"
)

// BuiltCleanupCmd returns a cobra command with the specification of the kill command
func BuiltCleanupCmd(env runtime.Environment) *cobra.Command {
cmd := &cobra.Command{
Use: "cleanup",
Short: "stops any ongoing fault injection and cleans resources",
RunE: func(cmd *cobra.Command, args []string) error {
runningProcess := env.Lock().Owner()
// no instance is currently running
if runningProcess == -1 {
return nil
}

return syscall.Kill(runningProcess, syscall.SIGTERM)

// TODO: cleanup resources (e.g iptables)
nadiamoe marked this conversation as resolved.
Show resolved Hide resolved
},
}

return cmd
}
1 change: 1 addition & 0 deletions cmd/agent/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func NewRootCommand(env runtime.Environment) *RootCommand {
rootCmd := buildRootCmd(config)
rootCmd.AddCommand(BuildHTTPCmd(env, config))
rootCmd.AddCommand(BuildGrpcCmd(env, config))
rootCmd.AddCommand(BuiltCleanupCmd(env))

return &RootCommand{
cmd: rootCmd,
Expand Down
1 change: 1 addition & 0 deletions e2e/agent/agent_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ func Test_Agent(t *testing.T) {
return
}
_, stderr, err := k8s.PodHelper(namespace).Exec(
context.TODO(),
"httpbin",
"xk6-disruptor-agent",
injectHTTP500,
Expand Down
2 changes: 2 additions & 0 deletions e2e/kubernetes/kubernetes_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func Test_Kubernetes(t *testing.T) {
}

stdout, _, err := k8s.PodHelper(namespace).Exec(
context.TODO(),
"busybox",
"busybox",
[]string{"echo", "-n", "hello", "world"},
Expand Down Expand Up @@ -177,6 +178,7 @@ func Test_Kubernetes(t *testing.T) {
}

stdout, _, err := k8s.PodHelper(namespace).Exec(
context.TODO(),
"paused",
"ephemeral",
[]string{"echo", "-n", "hello", "world"},
Expand Down
4 changes: 4 additions & 0 deletions pkg/disruptors/cmd_builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,7 @@ func buildHTTPFaultCmd(

return cmd
}

func buildCleanupCmd() []string {
return []string{"xk6-disruptor-agent", "cleanup"}
}
99 changes: 71 additions & 28 deletions pkg/disruptors/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package disruptors

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -13,6 +14,14 @@ import (
corev1 "k8s.io/api/core/v1"
)

// VisitCommands define the commands used for visiting a Pod
type VisitCommands struct {
// Exec defines the command to be executed
Exec []string
// Cleanup defines the command to execute for cleaning up if command execution fails
Cleanup []string
}

// AgentController defines the interface for controlling agents in a set of targets
type AgentController interface {
// InjectDisruptorAgent injects the Disruptor agent in the target pods
Expand All @@ -22,7 +31,7 @@ type AgentController interface {
// Targets retrieves the names of the target of the controller
Targets(ctx context.Context) ([]string, error)
// Visit allows executing a different command on each target returned by a visiting function
Visit(ctx context.Context, visitor func(target corev1.Pod) ([]string, error)) error
Visit(ctx context.Context, visitor func(target corev1.Pod) (VisitCommands, error)) error
}

// AgentController controls de agents in a set of target pods
Expand Down Expand Up @@ -96,42 +105,76 @@ func (c *agentController) InjectDisruptorAgent(ctx context.Context) error {
// ExecCommand executes a command in the targets of the AgentController and reports any error
func (c *agentController) ExecCommand(ctx context.Context, cmd []string) error {
// visit each target with the same command
return c.Visit(ctx, func(corev1.Pod) ([]string, error) {
return cmd, nil
return c.Visit(ctx, func(corev1.Pod) (VisitCommands, error) {
return VisitCommands{Exec: cmd}, nil
})
}

// Visit allows executing a different command on each target returned by a visiting function
func (c *agentController) Visit(_ context.Context, visitor func(corev1.Pod) ([]string, error)) error {
var wg sync.WaitGroup
// ensure errors channel has enough space to avoid blocking gorutines
errors := make(chan error, len(c.targets))
for _, pod := range c.targets {
wg.Add(1)
// attach each container asynchronously
go func(pod corev1.Pod) {
// get the command to execute in the target
cmd, err := visitor(pod)
if err != nil {
errors <- fmt.Errorf("error building command for pod %s: %w", pod.Name, err)
}
func (c *agentController) Visit(ctx context.Context, visitor func(corev1.Pod) (VisitCommands, error)) error {
// if there are no targets, nothing to do
if len(c.targets) == 0 {
return nil
}

_, stderr, err := c.helper.Exec(pod.Name, "xk6-agent", cmd, []byte{})
if err != nil {
errors <- fmt.Errorf("error invoking agent: %w \n%s", err, string(stderr))
}
execContext, cancel := context.WithCancel(context.Background())
defer cancel()

wg.Done()
}(pod)
// ensure errCh channel has enough space to avoid blocking gorutines
errCh := make(chan error, len(c.targets))
for _, pod := range c.targets {
pod := pod
// visit each target asynchronously
go func() {
errCh <- func(pod corev1.Pod) error {
// get the command to execute in the target
visitCommands, err := visitor(pod)
if err != nil {
return fmt.Errorf("unable to get command for pod %q: %w", pod.Name, err)
}

_, stderr, err := c.helper.Exec(execContext, pod.Name, "xk6-agent", visitCommands.Exec, []byte{})

// if command failed, ensure the agent execution is terminated
if err != nil && visitCommands.Cleanup != nil {
// we ignore errors because k6 was cancelled, so there's no point in reporting
// use a fresh context because the exec context may have been cancelled or expired
//nolint:contextcheck
_, _, _ = c.helper.Exec(context.TODO(), pod.Name, "xk6-agent", visitCommands.Cleanup, []byte{})
}

// if the context is cancelled, it is reported in the main loop
if err != nil && !errors.Is(err, context.Canceled) {
nadiamoe marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("failed command execution for pod %q: %w \n%s", pod.Name, err, string(stderr))
}

return nil
}(pod)
}()
}

wg.Wait()
var err error
pending := len(c.targets)
for {
select {
case e := <-errCh:
pending--
if e != nil {
// cancel ongoing commands
cancel()
// Save first received error as reason for ending execution
err = e
nadiamoe marked this conversation as resolved.
Show resolved Hide resolved
}

select {
case err := <-errors:
return err
default:
return nil
if pending == 0 {
return err
}
case <-ctx.Done():
// cancel ongoing commands
cancel()
// save the reason for ending execution
err = ctx.Err()
}
}
}

Expand Down
51 changes: 29 additions & 22 deletions pkg/disruptors/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,20 @@ func Test_InjectAgent(t *testing.T) {
}
}

func Test_ExecCommand(t *testing.T) {
func Test_VisitPod(t *testing.T) {
t.Parallel()

testCases := []struct {
title string
namespace string
pods []*corev1.Pod
command []string
visitCmds VisitCommands
err error
stdout []byte
stderr []byte
timeout time.Duration
expectError bool
expected []helpers.Command
}{
{
title: "successful execution",
Expand All @@ -145,9 +146,16 @@ func Test_ExecCommand(t *testing.T) {
WithNamespace("test-ns").
Build(),
},
command: []string{"echo", "-n", "hello", "world"},
visitCmds: VisitCommands{
Exec: []string{"command"},
Cleanup: []string{"cleanup"},
},
err: nil,
expectError: false,
expected: []helpers.Command{
{Pod: "pod1", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}},
{Pod: "pod2", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}},
},
},
{
title: "failed execution",
Expand All @@ -160,10 +168,18 @@ func Test_ExecCommand(t *testing.T) {
WithNamespace("test-ns").
Build(),
},
command: []string{"echo", "-n", "hello", "world"},
err: fmt.Errorf("fake error"),
visitCmds: VisitCommands{
Exec: []string{"echo", "-n", "hello", "world"},
Cleanup: []string{"cleanup"},
}, err: fmt.Errorf("fake error"),
stderr: []byte("error output"),
expectError: true,
expected: []helpers.Command{
{Pod: "pod1", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}},
{Pod: "pod1", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"cleanup"}, Stdin: []byte{}},
{Pod: "pod2", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}},
{Pod: "pod2", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"cleanup"}, Stdin: []byte{}},
},
},
}

Expand Down Expand Up @@ -192,7 +208,9 @@ func Test_ExecCommand(t *testing.T) {
)

executor.SetResult(tc.stdout, tc.stderr, tc.err)
err := controller.ExecCommand(context.TODO(), tc.command)
err := controller.Visit(context.TODO(), func(target corev1.Pod) (VisitCommands, error) {
return tc.visitCmds, nil
})
if tc.expectError && err == nil {
t.Errorf("should had failed")
return
Expand All @@ -210,29 +228,18 @@ func Test_ExecCommand(t *testing.T) {
return
}

// expect same command to be executed for each pod
expected := []helpers.Command{}
for _, p := range targets {
expected = append(expected, helpers.Command{
Pod: p.Name,
Namespace: p.Namespace,
Container: "xk6-agent",
Command: tc.command,
Stdin: []byte{},
})
}
sort.Slice(tc.expected, func(i, j int) bool {
return tc.expected[i].Pod < tc.expected[j].Pod
})

history := executor.GetHistory()

sort.Slice(expected, func(i, j int) bool {
return expected[i].Pod < expected[j].Pod
})
sort.Slice(history, func(i, j int) bool {
return history[i].Pod < history[j].Pod
})

if diff := cmp.Diff(expected, history); diff != "" {
t.Errorf("Expected headers did not match returned:\n%s", diff)
if diff := cmp.Diff(tc.expected, history); diff != "" {
t.Errorf("Expected command did not match returned:\n%s", diff)
}
})
}
Expand Down
29 changes: 18 additions & 11 deletions pkg/disruptors/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,26 @@ func (d *podDisruptor) InjectHTTPFaults(
fault.Port = DefaultTargetPort
}

return d.controller.Visit(ctx, func(pod corev1.Pod) ([]string, error) {
return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitCommands, error) {
if !utils.HasPort(pod, fault.Port) {
return nil, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
return VisitCommands{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
}

if utils.HasHostNetwork(pod) {
return nil, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name)
return VisitCommands{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name)
}

targetAddress, err := utils.PodIP(pod)
if err != nil {
return nil, err
return VisitCommands{}, err
}

cmd := buildHTTPFaultCmd(targetAddress, fault, duration, options)
visitCommands := VisitCommands{
Exec: buildHTTPFaultCmd(targetAddress, fault, duration, options),
Cleanup: buildCleanupCmd(),
}

return cmd, nil
return visitCommands, nil
})
}

Expand All @@ -146,17 +149,21 @@ func (d *podDisruptor) InjectGrpcFaults(
duration time.Duration,
options GrpcDisruptionOptions,
) error {
return d.controller.Visit(ctx, func(pod corev1.Pod) ([]string, error) {
return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitCommands, error) {
if !utils.HasPort(pod, fault.Port) {
return nil, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
return VisitCommands{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
}

targetAddress, err := utils.PodIP(pod)
if err != nil {
return nil, err
return VisitCommands{}, err
}

visitCommands := VisitCommands{
Exec: buildGrpcFaultCmd(targetAddress, fault, duration, options),
Cleanup: buildCleanupCmd(),
}

cmd := buildGrpcFaultCmd(targetAddress, fault, duration, options)
return cmd, nil
return visitCommands, nil
})
}
Loading