Skip to content

Commit

Permalink
Merge pull request #86 from cezarsa/fixrace
Browse files Browse the repository at this point in the history
Fix data race in drain scheduler
  • Loading branch information
jacobstr committed Aug 21, 2020
2 parents 14fd977 + 49918a3 commit ead5f1c
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 17 deletions.
32 changes: 19 additions & 13 deletions internal/kubernetes/drainSchedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"go.opencensus.io/stats"
Expand All @@ -21,7 +22,7 @@ const (
)

type DrainScheduler interface {
HasSchedule(name string) (has, running, failed bool)
HasSchedule(name string) (has, failed bool)
Schedule(node *v1.Node) (time.Time, error)
DeleteSchedule(name string)
}
Expand All @@ -40,23 +41,22 @@ type DrainSchedules struct {

func NewDrainSchedules(drainer Drainer, eventRecorder record.EventRecorder, period time.Duration, logger *zap.Logger) DrainScheduler {
return &DrainSchedules{
schedules: map[string]*schedule{},
//lastDrainScheduledFor: time.Now(),
schedules: map[string]*schedule{},
period: period,
logger: logger,
drainer: drainer,
eventRecorder: eventRecorder,
}
}

func (d *DrainSchedules) HasSchedule(name string) (has, running, failed bool) {
func (d *DrainSchedules) HasSchedule(name string) (has, failed bool) {
d.Lock()
defer d.Unlock()
sched, ok := d.schedules[name]
if !ok {
return false, false, false
return false, false
}
return true, sched.running, sched.failed
return true, sched.isFailed()
}

func (d *DrainSchedules) DeleteSchedule(name string) {
Expand Down Expand Up @@ -109,11 +109,18 @@ func (d *DrainSchedules) Schedule(node *v1.Node) (time.Time, error) {
}

type schedule struct {
when time.Time
running bool
failed bool
finish time.Time
timer *time.Timer
when time.Time
failed int32
finish time.Time
timer *time.Timer
}

func (s *schedule) setFailed() {
atomic.StoreInt32(&s.failed, 1)
}

func (s *schedule) isFailed() bool {
return atomic.LoadInt32(&s.failed) == 1
}

func (d *DrainSchedules) newSchedule(node *v1.Node, when time.Time) *schedule {
Expand All @@ -122,13 +129,12 @@ func (d *DrainSchedules) newSchedule(node *v1.Node, when time.Time) *schedule {
}
sched.timer = time.AfterFunc(time.Until(when), func() {
log := d.logger.With(zap.String("node", node.GetName()))
sched.running = true
nr := &core.ObjectReference{Kind: "Node", Name: node.GetName(), UID: types.UID(node.GetName())}
tags, _ := tag.New(context.Background(), tag.Upsert(TagNodeName, node.GetName())) // nolint:gosec
d.eventRecorder.Event(nr, core.EventTypeWarning, eventReasonDrainStarting, "Draining node")
if err := d.drainer.Drain(node); err != nil {
sched.finish = time.Now()
sched.failed = true
sched.setFailed()
log.Info("Failed to drain", zap.Error(err))
tags, _ = tag.New(tags, tag.Upsert(TagResult, tagResultFailed)) // nolint:gosec
stats.Record(tags, MeasureNodesDrained.M(1))
Expand Down
49 changes: 46 additions & 3 deletions internal/kubernetes/drainSchedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -55,7 +56,7 @@ func TestDrainSchedules_Schedule(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Check that node is not yet scheduled for drain
hasSchedule, _, _ := scheduler.HasSchedule(tt.node.Name)
hasSchedule, _ := scheduler.HasSchedule(tt.node.Name)
if hasSchedule {
t.Errorf("Node %v should not have any schedule", tt.node.Name)
}
Expand All @@ -66,7 +67,7 @@ func TestDrainSchedules_Schedule(t *testing.T) {
return
}
// Check that node is scheduled for drain
hasSchedule, _, _ = scheduler.HasSchedule(tt.node.Name)
hasSchedule, _ = scheduler.HasSchedule(tt.node.Name)
if !hasSchedule {
t.Errorf("Missing schedule record for node %v", tt.node.Name)
}
Expand All @@ -77,10 +78,52 @@ func TestDrainSchedules_Schedule(t *testing.T) {
// Deleting schedule
scheduler.DeleteSchedule(tt.node.Name)
// Check that node is no more scheduled for drain
hasSchedule, _, _ = scheduler.HasSchedule(tt.node.Name)
hasSchedule, _ = scheduler.HasSchedule(tt.node.Name)
if hasSchedule {
t.Errorf("Node %v should not been scheduled anymore", tt.node.Name)
}
})
}
}

type failDrainer struct {
NoopCordonDrainer
}

func (d *failDrainer) Drain(n *v1.Node) error { return errors.New("myerr") }

// Test to ensure there are no races when calling HasSchedule while the
// scheduler is draining a node.
func TestDrainSchedules_HasSchedule_Polling(t *testing.T) {
scheduler := NewDrainSchedules(&failDrainer{}, &record.FakeRecorder{}, 0, zap.NewNop())
node := &v1.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}}

when, err := scheduler.Schedule(node)
if err != nil {
t.Fatalf("DrainSchedules.Schedule() error = %v", err)
}

timeout := time.After(time.Until(when) + time.Minute)
for {
hasSchedule, failed := scheduler.HasSchedule(node.Name)
if !hasSchedule {
t.Fatalf("Missing schedule record for node %v", node.Name)
}
if failed {
// Having `failed` as true is the expected result here since this
// test is using the `failDrainer{}` drainer. It means that
// HasSchedule was successfully called during or after the draining
// function was scheduled and the test can complete successfully.
break
}
select {
case <-time.After(time.Second):
// Small sleep to ensure we're not running the CPU hot while
// polling `HasSchedule`.
case <-timeout:
// This timeout prevents this test from running forever in case
// some bug caused the draining function never to be scheduled.
t.Fatalf("timeout waiting for HasSchedule to fail")
}
}
}
2 changes: 1 addition & 1 deletion internal/kubernetes/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (h *DrainingResourceEventHandler) HandleNode(n *core.Node) {
}

// Let's ensure that a drain is scheduled
hasSChedule, _, failedDrain := h.drainScheduler.HasSchedule(n.GetName())
hasSChedule, failedDrain := h.drainScheduler.HasSchedule(n.GetName())
if !hasSChedule {
h.scheduleDrain(n)
return
Expand Down

0 comments on commit ead5f1c

Please sign in to comment.