Skip to content

Commit

Permalink
Fix data race in drain scheduler
Browse files Browse the repository at this point in the history
This commit fixes a data race when calling HasSchedule. This happens because
the `time.AfterFunc` function is called by a new goroutine and it updates
fields which may be read in parallel by `HasSchedule`.

The `running` return value from `HasSchedule` was removed to simplify the code
as it wasn't used anywhere.
  • Loading branch information
cezarsa committed Aug 20, 2020
1 parent 8a24920 commit 49918a3
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 18 deletions.
32 changes: 19 additions & 13 deletions internal/kubernetes/drainSchedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"go.opencensus.io/stats"
Expand All @@ -21,7 +22,7 @@ const (
)

type DrainScheduler interface {
HasSchedule(name string) (has, running, failed bool)
HasSchedule(name string) (has, failed bool)
Schedule(node *v1.Node) (time.Time, error)
DeleteSchedule(name string)
}
Expand All @@ -40,23 +41,22 @@ type DrainSchedules struct {

func NewDrainSchedules(drainer Drainer, eventRecorder record.EventRecorder, period time.Duration, logger *zap.Logger) DrainScheduler {
return &DrainSchedules{
schedules: map[string]*schedule{},
//lastDrainScheduledFor: time.Now(),
schedules: map[string]*schedule{},
period: period,
logger: logger,
drainer: drainer,
eventRecorder: eventRecorder,
}
}

func (d *DrainSchedules) HasSchedule(name string) (has, running, failed bool) {
func (d *DrainSchedules) HasSchedule(name string) (has, failed bool) {
d.Lock()
defer d.Unlock()
sched, ok := d.schedules[name]
if !ok {
return false, false, false
return false, false
}
return true, sched.running, sched.failed
return true, sched.isFailed()
}

func (d *DrainSchedules) DeleteSchedule(name string) {
Expand Down Expand Up @@ -109,11 +109,18 @@ func (d *DrainSchedules) Schedule(node *v1.Node) (time.Time, error) {
}

type schedule struct {
when time.Time
running bool
failed bool
finish time.Time
timer *time.Timer
when time.Time
failed int32
finish time.Time
timer *time.Timer
}

func (s *schedule) setFailed() {
atomic.StoreInt32(&s.failed, 1)
}

func (s *schedule) isFailed() bool {
return atomic.LoadInt32(&s.failed) == 1
}

func (d *DrainSchedules) newSchedule(node *v1.Node, when time.Time) *schedule {
Expand All @@ -122,13 +129,12 @@ func (d *DrainSchedules) newSchedule(node *v1.Node, when time.Time) *schedule {
}
sched.timer = time.AfterFunc(time.Until(when), func() {
log := d.logger.With(zap.String("node", node.GetName()))
sched.running = true
nr := &core.ObjectReference{Kind: "Node", Name: node.GetName(), UID: types.UID(node.GetName())}
tags, _ := tag.New(context.Background(), tag.Upsert(TagNodeName, node.GetName())) // nolint:gosec
d.eventRecorder.Event(nr, core.EventTypeWarning, eventReasonDrainStarting, "Draining node")
if err := d.drainer.Drain(node); err != nil {
sched.finish = time.Now()
sched.failed = true
sched.setFailed()
log.Info("Failed to drain", zap.Error(err))
tags, _ = tag.New(tags, tag.Upsert(TagResult, tagResultFailed)) // nolint:gosec
stats.Record(tags, MeasureNodesDrained.M(1))
Expand Down
8 changes: 4 additions & 4 deletions internal/kubernetes/drainSchedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestDrainSchedules_Schedule(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Check that node is not yet scheduled for drain
hasSchedule, _, _ := scheduler.HasSchedule(tt.node.Name)
hasSchedule, _ := scheduler.HasSchedule(tt.node.Name)
if hasSchedule {
t.Errorf("Node %v should not have any schedule", tt.node.Name)
}
Expand All @@ -67,7 +67,7 @@ func TestDrainSchedules_Schedule(t *testing.T) {
return
}
// Check that node is scheduled for drain
hasSchedule, _, _ = scheduler.HasSchedule(tt.node.Name)
hasSchedule, _ = scheduler.HasSchedule(tt.node.Name)
if !hasSchedule {
t.Errorf("Missing schedule record for node %v", tt.node.Name)
}
Expand All @@ -78,7 +78,7 @@ func TestDrainSchedules_Schedule(t *testing.T) {
// Deleting schedule
scheduler.DeleteSchedule(tt.node.Name)
// Check that node is no more scheduled for drain
hasSchedule, _, _ = scheduler.HasSchedule(tt.node.Name)
hasSchedule, _ = scheduler.HasSchedule(tt.node.Name)
if hasSchedule {
t.Errorf("Node %v should not been scheduled anymore", tt.node.Name)
}
Expand All @@ -105,7 +105,7 @@ func TestDrainSchedules_HasSchedule_Polling(t *testing.T) {

timeout := time.After(time.Until(when) + time.Minute)
for {
hasSchedule, _, failed := scheduler.HasSchedule(node.Name)
hasSchedule, failed := scheduler.HasSchedule(node.Name)
if !hasSchedule {
t.Fatalf("Missing schedule record for node %v", node.Name)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/kubernetes/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (h *DrainingResourceEventHandler) HandleNode(n *core.Node) {
}

// Let's ensure that a drain is scheduled
hasSChedule, _, failedDrain := h.drainScheduler.HasSchedule(n.GetName())
hasSChedule, failedDrain := h.drainScheduler.HasSchedule(n.GetName())
if !hasSChedule {
h.scheduleDrain(n)
return
Expand Down

0 comments on commit 49918a3

Please sign in to comment.