From fff8a4e3d4f2f4eb5f68f6703b465399e28b57d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Ga=C4=87e=C5=A1a?= Date: Tue, 4 Jul 2023 18:15:40 +0200 Subject: [PATCH] fix scheduler queue deadlock --- .drone.yml | 2 +- scheduler/queue/queue.go | 12 ++++++----- scheduler/queue/queue_test.go | 40 +++++++++++++++++++++++++++++------ 3 files changed, 41 insertions(+), 13 deletions(-) diff --git a/.drone.yml b/.drone.yml index ffb0fc8165..50a28859bc 100644 --- a/.drone.yml +++ b/.drone.yml @@ -11,7 +11,7 @@ steps: - name: test image: golang:1.14.15 commands: - - go test ./... + - go test -race ./... - go build -o /dev/null github.com/drone/drone/cmd/drone-server - go build -o /dev/null -tags "oss nolimit" github.com/drone/drone/cmd/drone-server diff --git a/scheduler/queue/queue.go b/scheduler/queue/queue.go index 1354d4f5e6..14704de589 100644 --- a/scheduler/queue/queue.go +++ b/scheduler/queue/queue.go @@ -96,6 +96,7 @@ func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, e variant: params.Variant, labels: params.Labels, channel: make(chan *core.Stage), + done: ctx.Done(), } q.Lock() q.workers[w] = struct{}{} @@ -108,9 +109,6 @@ func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, e select { case <-ctx.Done(): - q.Lock() - delete(q.workers, w) - q.Unlock() return nil, ctx.Err() case b := <-w.channel: return b, nil @@ -211,9 +209,12 @@ func (q *queue) signal(ctx context.Context) error { // } select { case w.channel <- item: - delete(q.workers, w) - break loop + case <-w.done: + case <-time.After(q.interval): } + + delete(q.workers, w) + break loop } } return nil @@ -241,6 +242,7 @@ type worker struct { variant string labels map[string]string channel chan *core.Stage + done <-chan struct{} } type counter struct { diff --git a/scheduler/queue/queue_test.go b/scheduler/queue/queue_test.go index 251e799415..496a6f9586 100644 --- a/scheduler/queue/queue_test.go +++ b/scheduler/queue/queue_test.go @@ -50,16 +50,15 @@ func TestQueueCancel(t *testing.T) { controller := gomock.NewController(t) defer controller.Finish() - ctx, cancel := context.WithCancel(context.Background()) store := mock.NewMockStageStore(controller) - store.EXPECT().ListIncomplete(ctx).Return(nil, nil) + store.EXPECT().ListIncomplete(gomock.Any()).Return(nil, nil) q := newQueue(store) - q.ctx = ctx var wg sync.WaitGroup wg.Add(1) + ctx, cancel := context.WithCancel(context.Background()) go func() { build, err := q.Request(ctx, core.Filter{OS: "linux/amd64", Arch: "amd64"}) if err != context.Canceled { @@ -102,10 +101,7 @@ func TestQueuePush(t *testing.T) { ctx := context.Background() store := mock.NewMockStageStore(controller) - q := &queue{ - store: store, - ready: make(chan struct{}, 1), - } + q := newQueue(store) q.Schedule(ctx, item1) q.Schedule(ctx, item2) select { @@ -356,3 +352,33 @@ func TestWithinLimits_Old(t *testing.T) { } } } + +func TestQueueContextCanceling(t *testing.T) { + listIncompleteResponse := []*core.Stage{ + {ID: 1, OS: "linux/amd64", Arch: "amd64", Status: drone.StatusPending}, + } + + controller := gomock.NewController(t) + defer controller.Finish() + + globCtx := context.Background() + + mockStageStore := mock.NewMockStageStore(controller) + mockStageStore.EXPECT().ListIncomplete(globCtx).Return(listIncompleteResponse, nil).AnyTimes() + + q := newQueue(mockStageStore) + + for k := 0; k < 1000; k++ { + reqCtx, reqCanc := context.WithCancel(context.Background()) + go reqCanc() // asynchronously cancel the context + + stage, err := q.Request(reqCtx, core.Filter{OS: "linux/amd64", Arch: "amd64"}) + if stage == nil && err == context.Canceled { + continue // we got the ctx canceled error + } + if stage == listIncompleteResponse[0] && err == nil { + continue // we got a stage before the context got canceled + } + t.Errorf("got neither the context canceled error nor the data: stage=%v err=%v", stage, err) + } +}