Skip to content

Commit

Permalink
Merge pull request #368 from grafana/fix-pod-controller-wait-logic
Browse files Browse the repository at this point in the history
Fix pod controller wait logic
  • Loading branch information
pablochacin authored Oct 30, 2023
2 parents c747e45 + 46bee4a commit 9ddd019
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 33 deletions.
43 changes: 25 additions & 18 deletions pkg/disruptors/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/grafana/xk6-disruptor/pkg/internal/version"
Expand Down Expand Up @@ -40,29 +39,28 @@ func (c *PodController) Visit(ctx context.Context, visitor PodVisitor) error {
defer cancelVisit()

// make space to prevent blocking go routines
errCh := make(chan error, len(c.targets))
doneCh := make(chan error, len(c.targets))

wg := sync.WaitGroup{}
for _, pod := range c.targets {
wg.Add(1)
go func(pod corev1.Pod) {
if err := visitor.Visit(visitCtx, pod); err != nil {
errCh <- err
}

wg.Done()
doneCh <- visitor.Visit(visitCtx, pod)
}(pod)
}

wg.Wait()

select {
case e := <-errCh:
return e
case <-ctx.Done():
return ctx.Err()
default:
return nil
pending := len(c.targets)
for {
select {
case e := <-doneCh:
if e != nil {
return e
}
pending--
if pending == 0 {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}

Expand All @@ -77,6 +75,15 @@ type PodVisitor interface {
Visit(context.Context, corev1.Pod) error
}

// PodVisitorFunc defines a function that implements the PodVisitor interface
// This allows using anonymous functions as pod visitor:
type PodVisitorFunc func(context.Context, corev1.Pod) error

// Visit implements PodVisitor interface's Visit function
func (f PodVisitorFunc) Visit(ctx context.Context, pod corev1.Pod) error {
return f(ctx, pod)
}

// PodAgentVisitor implements PodVisitor, performing actions in a Pod by means of running a PodVisitCommand on the pod.
type PodAgentVisitor struct {
helper helpers.PodHelper
Expand Down
49 changes: 34 additions & 15 deletions pkg/disruptors/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,6 @@ func Test_PodAgentVisitor(t *testing.T) {
}
}

type fakePodVisitor struct {
delay time.Duration
err error
}

func (f fakePodVisitor) Visit(_ context.Context, _ corev1.Pod) error {
time.Sleep(f.delay)
return f.err
}

var errFailed = errors.New("failed")

func Test_PodController(t *testing.T) {
Expand All @@ -169,7 +159,7 @@ func Test_PodController(t *testing.T) {
expectError error
}{
{
title: "visit pods",
title: "successful visits",
targets: []corev1.Pod{
builders.NewPodBuilder("pod1").
WithNamespace("test-ns").
Expand All @@ -180,11 +170,13 @@ func Test_PodController(t *testing.T) {
WithIP("192.0.2.7").
Build(),
},
visitor: fakePodVisitor{},
visitor: PodVisitorFunc(func(ctx context.Context, pod corev1.Pod) error {
return nil
}),
expectError: nil,
},
{
title: "failed visit command",
title: "failed visit",
targets: []corev1.Pod{
builders.NewPodBuilder("pod1").
WithNamespace("test-ns").
Expand All @@ -195,9 +187,33 @@ func Test_PodController(t *testing.T) {
WithIP("192.0.2.7").
Build(),
},
visitor: fakePodVisitor{err: errFailed},
visitor: PodVisitorFunc(func(ctx context.Context, pod corev1.Pod) error {
return errFailed
}),
expectError: errFailed,
},
{
title: "one failed visit",
targets: []corev1.Pod{
builders.NewPodBuilder("pod1").
WithNamespace("test-ns").
WithIP("192.0.2.6").
Build(),
builders.NewPodBuilder("pod2").
WithNamespace("test-ns").
WithIP("192.0.2.7").
Build(),
},
visitor: PodVisitorFunc(func(ctx context.Context, pod corev1.Pod) error {
// for pod-1 return error, for pod-2 wait until context expires
if pod.Name == "pod1" {
return errFailed
}
time.Sleep(2 * time.Second)
return nil
}),
expectError: errFailed, // if error is not handled immediately, DeadlineExceeded would be returned
},
{
title: "context expired",
targets: []corev1.Pod{
Expand All @@ -210,7 +226,10 @@ func Test_PodController(t *testing.T) {
WithIP("192.0.2.7").
Build(),
},
visitor: fakePodVisitor{delay: 2 * time.Second},
visitor: PodVisitorFunc(func(ctx context.Context, pod corev1.Pod) error {
time.Sleep(2 * time.Second)
return nil
}),
expectError: context.DeadlineExceeded,
},
}
Expand Down

0 comments on commit 9ddd019

Please sign in to comment.