From 8a2492000a547a9182754e5b53e62bbb3c110c76 Mon Sep 17 00:00:00 2001 From: Cezar Sa Espinola Date: Thu, 20 Aug 2020 16:15:46 -0300 Subject: [PATCH 1/2] Add test case to expose race conditions in drain scheduler The new test helps exposing a current race condition while also helping preventing similar races in calls `HasSchedule` interacting with in-progress draining operations. The race condition also happens during normal draino operation because the event handler may call `HasSchedule` multiple times for an already scheduled node. Data race sample: ``` $ go test -race Now: 2020-08-20T16:22:10-03:00 ================== WARNING: DATA RACE Write at 0x00c0000c8098 by goroutine 16: github.com/planetlabs/draino/internal/kubernetes.(*DrainSchedules).newSchedule.func1() /Users/cezarsa/code/draino/internal/kubernetes/drainSchedule.go:125 +0x235 Previous read at 0x00c0000c8098 by goroutine 12: github.com/planetlabs/draino/internal/kubernetes.(*DrainSchedules).HasSchedule() /Users/cezarsa/code/draino/internal/kubernetes/drainSchedule.go:59 +0x142 github.com/planetlabs/draino/internal/kubernetes.TestDrainSchedules_HasSchedule_Polling() /Users/cezarsa/code/draino/internal/kubernetes/drainSchedule_test.go:108 +0x54a testing.tRunner() /usr/local/Cellar/go/1.14.5/libexec/src/testing/testing.go:991 +0x1eb Goroutine 16 (running) created at: time.goFunc() /usr/local/Cellar/go/1.14.5/libexec/src/time/sleep.go:168 +0x51 Goroutine 12 (running) created at: testing.(*T).Run() /usr/local/Cellar/go/1.14.5/libexec/src/testing/testing.go:1042 +0x660 testing.runTests.func1() /usr/local/Cellar/go/1.14.5/libexec/src/testing/testing.go:1284 +0xa6 testing.tRunner() /usr/local/Cellar/go/1.14.5/libexec/src/testing/testing.go:991 +0x1eb testing.runTests() /usr/local/Cellar/go/1.14.5/libexec/src/testing/testing.go:1282 +0x527 testing.(*M).Run() /usr/local/Cellar/go/1.14.5/libexec/src/testing/testing.go:1199 +0x2ff main.main() _testmain.go:66 +0x223 ================== ================== WARNING: DATA RACE Write at 0x00c0000c8099 by goroutine 16: github.com/planetlabs/draino/internal/kubernetes.(*DrainSchedules).newSchedule.func1() /Users/cezarsa/code/draino/internal/kubernetes/drainSchedule.go:131 +0x65b Previous read at 0x00c0000c8099 by goroutine 12: github.com/planetlabs/draino/internal/kubernetes.(*DrainSchedules).HasSchedule() /Users/cezarsa/code/draino/internal/kubernetes/drainSchedule.go:59 +0x15f github.com/planetlabs/draino/internal/kubernetes.TestDrainSchedules_HasSchedule_Polling() /Users/cezarsa/code/draino/internal/kubernetes/drainSchedule_test.go:108 +0x54a testing.tRunner() /usr/local/Cellar/go/1.14.5/libexec/src/testing/testing.go:991 +0x1eb Goroutine 16 (running) created at: time.goFunc() /usr/local/Cellar/go/1.14.5/libexec/src/time/sleep.go:168 +0x51 Goroutine 12 (running) created at: testing.(*T).Run() /usr/local/Cellar/go/1.14.5/libexec/src/testing/testing.go:1042 +0x660 testing.runTests.func1() /usr/local/Cellar/go/1.14.5/libexec/src/testing/testing.go:1284 +0xa6 testing.tRunner() /usr/local/Cellar/go/1.14.5/libexec/src/testing/testing.go:991 +0x1eb testing.runTests() /usr/local/Cellar/go/1.14.5/libexec/src/testing/testing.go:1282 +0x527 testing.(*M).Run() /usr/local/Cellar/go/1.14.5/libexec/src/testing/testing.go:1199 +0x2ff main.main() _testmain.go:66 +0x223 ================== --- FAIL: TestDrainSchedules_HasSchedule_Polling (11.04s) testing.go:906: race detected during execution of test FAIL exit status 1 FAIL github.com/planetlabs/draino/internal/kubernetes 13.829s ``` --- internal/kubernetes/drainSchedule_test.go | 43 +++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/internal/kubernetes/drainSchedule_test.go b/internal/kubernetes/drainSchedule_test.go index 2ff13548..0b675aee 100644 --- a/internal/kubernetes/drainSchedule_test.go +++ b/internal/kubernetes/drainSchedule_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/pkg/errors" "go.uber.org/zap" v1 "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -84,3 +85,45 @@ func TestDrainSchedules_Schedule(t *testing.T) { }) } } + +type failDrainer struct { + NoopCordonDrainer +} + +func (d *failDrainer) Drain(n *v1.Node) error { return errors.New("myerr") } + +// Test to ensure there are no races when calling HasSchedule while the +// scheduler is draining a node. +func TestDrainSchedules_HasSchedule_Polling(t *testing.T) { + scheduler := NewDrainSchedules(&failDrainer{}, &record.FakeRecorder{}, 0, zap.NewNop()) + node := &v1.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}} + + when, err := scheduler.Schedule(node) + if err != nil { + t.Fatalf("DrainSchedules.Schedule() error = %v", err) + } + + timeout := time.After(time.Until(when) + time.Minute) + for { + hasSchedule, _, failed := scheduler.HasSchedule(node.Name) + if !hasSchedule { + t.Fatalf("Missing schedule record for node %v", node.Name) + } + if failed { + // Having `failed` as true is the expected result here since this + // test is using the `failDrainer{}` drainer. It means that + // HasSchedule was successfully called during or after the draining + // function was scheduled and the test can complete successfully. + break + } + select { + case <-time.After(time.Second): + // Small sleep to ensure we're not running the CPU hot while + // polling `HasSchedule`. + case <-timeout: + // This timeout prevents this test from running forever in case + // some bug caused the draining function never to be scheduled. + t.Fatalf("timeout waiting for HasSchedule to fail") + } + } +} From 49918a3b24e4feb73107fbe2dd7ec615f3408f00 Mon Sep 17 00:00:00 2001 From: Cezar Sa Espinola Date: Thu, 20 Aug 2020 16:17:33 -0300 Subject: [PATCH 2/2] Fix data race in drain scheduler This commit fixes a data race when calling HasSchedule. This happens because the `time.AfterFunc` function is called by a new goroutine and it updates fields which may be read in parallel by `HasSchedule`. The `running` return value from `HasSchedule` was removed to simplify the code as it wasn't used anywhere. --- internal/kubernetes/drainSchedule.go | 32 ++++++++++++++--------- internal/kubernetes/drainSchedule_test.go | 8 +++--- internal/kubernetes/eventhandler.go | 2 +- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/internal/kubernetes/drainSchedule.go b/internal/kubernetes/drainSchedule.go index ea0e3710..0d0cc284 100644 --- a/internal/kubernetes/drainSchedule.go +++ b/internal/kubernetes/drainSchedule.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "go.opencensus.io/stats" @@ -21,7 +22,7 @@ const ( ) type DrainScheduler interface { - HasSchedule(name string) (has, running, failed bool) + HasSchedule(name string) (has, failed bool) Schedule(node *v1.Node) (time.Time, error) DeleteSchedule(name string) } @@ -40,8 +41,7 @@ type DrainSchedules struct { func NewDrainSchedules(drainer Drainer, eventRecorder record.EventRecorder, period time.Duration, logger *zap.Logger) DrainScheduler { return &DrainSchedules{ - schedules: map[string]*schedule{}, - //lastDrainScheduledFor: time.Now(), + schedules: map[string]*schedule{}, period: period, logger: logger, drainer: drainer, @@ -49,14 +49,14 @@ func NewDrainSchedules(drainer Drainer, eventRecorder record.EventRecorder, peri } } -func (d *DrainSchedules) HasSchedule(name string) (has, running, failed bool) { +func (d *DrainSchedules) HasSchedule(name string) (has, failed bool) { d.Lock() defer d.Unlock() sched, ok := d.schedules[name] if !ok { - return false, false, false + return false, false } - return true, sched.running, sched.failed + return true, sched.isFailed() } func (d *DrainSchedules) DeleteSchedule(name string) { @@ -109,11 +109,18 @@ func (d *DrainSchedules) Schedule(node *v1.Node) (time.Time, error) { } type schedule struct { - when time.Time - running bool - failed bool - finish time.Time - timer *time.Timer + when time.Time + failed int32 + finish time.Time + timer *time.Timer +} + +func (s *schedule) setFailed() { + atomic.StoreInt32(&s.failed, 1) +} + +func (s *schedule) isFailed() bool { + return atomic.LoadInt32(&s.failed) == 1 } func (d *DrainSchedules) newSchedule(node *v1.Node, when time.Time) *schedule { @@ -122,13 +129,12 @@ func (d *DrainSchedules) newSchedule(node *v1.Node, when time.Time) *schedule { } sched.timer = time.AfterFunc(time.Until(when), func() { log := d.logger.With(zap.String("node", node.GetName())) - sched.running = true nr := &core.ObjectReference{Kind: "Node", Name: node.GetName(), UID: types.UID(node.GetName())} tags, _ := tag.New(context.Background(), tag.Upsert(TagNodeName, node.GetName())) // nolint:gosec d.eventRecorder.Event(nr, core.EventTypeWarning, eventReasonDrainStarting, "Draining node") if err := d.drainer.Drain(node); err != nil { sched.finish = time.Now() - sched.failed = true + sched.setFailed() log.Info("Failed to drain", zap.Error(err)) tags, _ = tag.New(tags, tag.Upsert(TagResult, tagResultFailed)) // nolint:gosec stats.Record(tags, MeasureNodesDrained.M(1)) diff --git a/internal/kubernetes/drainSchedule_test.go b/internal/kubernetes/drainSchedule_test.go index 0b675aee..421a2459 100644 --- a/internal/kubernetes/drainSchedule_test.go +++ b/internal/kubernetes/drainSchedule_test.go @@ -56,7 +56,7 @@ func TestDrainSchedules_Schedule(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Check that node is not yet scheduled for drain - hasSchedule, _, _ := scheduler.HasSchedule(tt.node.Name) + hasSchedule, _ := scheduler.HasSchedule(tt.node.Name) if hasSchedule { t.Errorf("Node %v should not have any schedule", tt.node.Name) } @@ -67,7 +67,7 @@ func TestDrainSchedules_Schedule(t *testing.T) { return } // Check that node is scheduled for drain - hasSchedule, _, _ = scheduler.HasSchedule(tt.node.Name) + hasSchedule, _ = scheduler.HasSchedule(tt.node.Name) if !hasSchedule { t.Errorf("Missing schedule record for node %v", tt.node.Name) } @@ -78,7 +78,7 @@ func TestDrainSchedules_Schedule(t *testing.T) { // Deleting schedule scheduler.DeleteSchedule(tt.node.Name) // Check that node is no more scheduled for drain - hasSchedule, _, _ = scheduler.HasSchedule(tt.node.Name) + hasSchedule, _ = scheduler.HasSchedule(tt.node.Name) if hasSchedule { t.Errorf("Node %v should not been scheduled anymore", tt.node.Name) } @@ -105,7 +105,7 @@ func TestDrainSchedules_HasSchedule_Polling(t *testing.T) { timeout := time.After(time.Until(when) + time.Minute) for { - hasSchedule, _, failed := scheduler.HasSchedule(node.Name) + hasSchedule, failed := scheduler.HasSchedule(node.Name) if !hasSchedule { t.Fatalf("Missing schedule record for node %v", node.Name) } diff --git a/internal/kubernetes/eventhandler.go b/internal/kubernetes/eventhandler.go index fc865c6f..1cc44273 100644 --- a/internal/kubernetes/eventhandler.go +++ b/internal/kubernetes/eventhandler.go @@ -142,7 +142,7 @@ func (h *DrainingResourceEventHandler) HandleNode(n *core.Node) { } // Let's ensure that a drain is scheduled - hasSChedule, _, failedDrain := h.drainScheduler.HasSchedule(n.GetName()) + hasSChedule, failedDrain := h.drainScheduler.HasSchedule(n.GetName()) if !hasSChedule { h.scheduleDrain(n) return