Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data race in drain scheduler #86

Merged
merged 2 commits into from
Aug 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions internal/kubernetes/drainSchedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"go.opencensus.io/stats"
Expand All @@ -21,7 +22,7 @@ const (
)

type DrainScheduler interface {
HasSchedule(name string) (has, running, failed bool)
HasSchedule(name string) (has, failed bool)
Schedule(node *v1.Node) (time.Time, error)
DeleteSchedule(name string)
}
Expand All @@ -40,23 +41,22 @@ type DrainSchedules struct {

func NewDrainSchedules(drainer Drainer, eventRecorder record.EventRecorder, period time.Duration, logger *zap.Logger) DrainScheduler {
return &DrainSchedules{
schedules: map[string]*schedule{},
//lastDrainScheduledFor: time.Now(),
schedules: map[string]*schedule{},
period: period,
logger: logger,
drainer: drainer,
eventRecorder: eventRecorder,
}
}

func (d *DrainSchedules) HasSchedule(name string) (has, running, failed bool) {
func (d *DrainSchedules) HasSchedule(name string) (has, failed bool) {
d.Lock()
defer d.Unlock()
sched, ok := d.schedules[name]
if !ok {
return false, false, false
return false, false
}
return true, sched.running, sched.failed
return true, sched.isFailed()
}

func (d *DrainSchedules) DeleteSchedule(name string) {
Expand Down Expand Up @@ -109,11 +109,18 @@ func (d *DrainSchedules) Schedule(node *v1.Node) (time.Time, error) {
}

type schedule struct {
when time.Time
running bool
failed bool
finish time.Time
timer *time.Timer
when time.Time
failed int32
finish time.Time
timer *time.Timer
}

func (s *schedule) setFailed() {
atomic.StoreInt32(&s.failed, 1)
}

func (s *schedule) isFailed() bool {
return atomic.LoadInt32(&s.failed) == 1
}

func (d *DrainSchedules) newSchedule(node *v1.Node, when time.Time) *schedule {
Expand All @@ -122,13 +129,12 @@ func (d *DrainSchedules) newSchedule(node *v1.Node, when time.Time) *schedule {
}
sched.timer = time.AfterFunc(time.Until(when), func() {
log := d.logger.With(zap.String("node", node.GetName()))
sched.running = true
nr := &core.ObjectReference{Kind: "Node", Name: node.GetName(), UID: types.UID(node.GetName())}
tags, _ := tag.New(context.Background(), tag.Upsert(TagNodeName, node.GetName())) // nolint:gosec
d.eventRecorder.Event(nr, core.EventTypeWarning, eventReasonDrainStarting, "Draining node")
if err := d.drainer.Drain(node); err != nil {
sched.finish = time.Now()
sched.failed = true
sched.setFailed()
log.Info("Failed to drain", zap.Error(err))
tags, _ = tag.New(tags, tag.Upsert(TagResult, tagResultFailed)) // nolint:gosec
stats.Record(tags, MeasureNodesDrained.M(1))
Expand Down
49 changes: 46 additions & 3 deletions internal/kubernetes/drainSchedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -55,7 +56,7 @@ func TestDrainSchedules_Schedule(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Check that node is not yet scheduled for drain
hasSchedule, _, _ := scheduler.HasSchedule(tt.node.Name)
hasSchedule, _ := scheduler.HasSchedule(tt.node.Name)
if hasSchedule {
t.Errorf("Node %v should not have any schedule", tt.node.Name)
}
Expand All @@ -66,7 +67,7 @@ func TestDrainSchedules_Schedule(t *testing.T) {
return
}
// Check that node is scheduled for drain
hasSchedule, _, _ = scheduler.HasSchedule(tt.node.Name)
hasSchedule, _ = scheduler.HasSchedule(tt.node.Name)
if !hasSchedule {
t.Errorf("Missing schedule record for node %v", tt.node.Name)
}
Expand All @@ -77,10 +78,52 @@ func TestDrainSchedules_Schedule(t *testing.T) {
// Deleting schedule
scheduler.DeleteSchedule(tt.node.Name)
// Check that node is no more scheduled for drain
hasSchedule, _, _ = scheduler.HasSchedule(tt.node.Name)
hasSchedule, _ = scheduler.HasSchedule(tt.node.Name)
if hasSchedule {
t.Errorf("Node %v should not been scheduled anymore", tt.node.Name)
}
})
}
}

type failDrainer struct {
NoopCordonDrainer
}

func (d *failDrainer) Drain(n *v1.Node) error { return errors.New("myerr") }

// Test to ensure there are no races when calling HasSchedule while the
// scheduler is draining a node.
func TestDrainSchedules_HasSchedule_Polling(t *testing.T) {
scheduler := NewDrainSchedules(&failDrainer{}, &record.FakeRecorder{}, 0, zap.NewNop())
node := &v1.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}}

when, err := scheduler.Schedule(node)
if err != nil {
t.Fatalf("DrainSchedules.Schedule() error = %v", err)
}

timeout := time.After(time.Until(when) + time.Minute)
for {
hasSchedule, failed := scheduler.HasSchedule(node.Name)
if !hasSchedule {
t.Fatalf("Missing schedule record for node %v", node.Name)
}
if failed {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So - we want failed when the test passes (counter-intuitive on the first read) because we're inducing an error with the failDrainer. The reason we're doing this is because we specifically want to exercise the atomic setFailed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest just adding a brief comment before the break statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the main reason for adding a test case here is to prevent future races when changing HasSchedule. The goal of this test is to ensure we're calling HasSchedule in parallel with the function passed to time.AfterFunc being executed. Using the failed as a condition was an easy way to ensure that the test has reached its goal of having the timer-triggered function running.
I'll add a comment here to better explain this.

// Having `failed` as true is the expected result here since this
// test is using the `failDrainer{}` drainer. It means that
// HasSchedule was successfully called during or after the draining
// function was scheduled and the test can complete successfully.
break
}
select {
case <-time.After(time.Second):
// Small sleep to ensure we're not running the CPU hot while
// polling `HasSchedule`.
case <-timeout:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we hit the timeout condition vs. falling through via the initial case statement on 114?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 114 case <-time.After(time.Second) acts only as a sleep to ensure that the test is not continuously consuming CPU cycles. Line 115 case <-timeout is a stop condition to prevent the test from hanging forever if for some reason/bug the scheduled function passed to time.AfterFunc was never called. I think some comments would also be nice here, working on it.

// This timeout prevents this test from running forever in case
// some bug caused the draining function never to be scheduled.
t.Fatalf("timeout waiting for HasSchedule to fail")
}
}
}
2 changes: 1 addition & 1 deletion internal/kubernetes/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (h *DrainingResourceEventHandler) HandleNode(n *core.Node) {
}

// Let's ensure that a drain is scheduled
hasSChedule, _, failedDrain := h.drainScheduler.HasSchedule(n.GetName())
hasSChedule, failedDrain := h.drainScheduler.HasSchedule(n.GetName())
if !hasSChedule {
h.scheduleDrain(n)
return
Expand Down