Skip to content

Commit

Permalink
fix scheduler queue deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
marko-gacesa authored and dependabot-ci committed Jul 5, 2023
1 parent 15819f2 commit fff8a4e
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 13 deletions.
2 changes: 1 addition & 1 deletion .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ steps:
- name: test
image: golang:1.14.15
commands:
- go test ./...
- go test -race ./...
- go build -o /dev/null github.com/drone/drone/cmd/drone-server
- go build -o /dev/null -tags "oss nolimit" github.com/drone/drone/cmd/drone-server

Expand Down
12 changes: 7 additions & 5 deletions scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, e
variant: params.Variant,
labels: params.Labels,
channel: make(chan *core.Stage),
done: ctx.Done(),
}
q.Lock()
q.workers[w] = struct{}{}
Expand All @@ -108,9 +109,6 @@ func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, e

select {
case <-ctx.Done():
q.Lock()
delete(q.workers, w)
q.Unlock()
return nil, ctx.Err()
case b := <-w.channel:
return b, nil
Expand Down Expand Up @@ -211,9 +209,12 @@ func (q *queue) signal(ctx context.Context) error {
// }
select {
case w.channel <- item:
delete(q.workers, w)
break loop
case <-w.done:
case <-time.After(q.interval):
}

delete(q.workers, w)
break loop
}
}
return nil
Expand Down Expand Up @@ -241,6 +242,7 @@ type worker struct {
variant string
labels map[string]string
channel chan *core.Stage
done <-chan struct{}
}

type counter struct {
Expand Down
40 changes: 33 additions & 7 deletions scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,15 @@ func TestQueueCancel(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()

ctx, cancel := context.WithCancel(context.Background())
store := mock.NewMockStageStore(controller)
store.EXPECT().ListIncomplete(ctx).Return(nil, nil)
store.EXPECT().ListIncomplete(gomock.Any()).Return(nil, nil)

q := newQueue(store)
q.ctx = ctx

var wg sync.WaitGroup
wg.Add(1)

ctx, cancel := context.WithCancel(context.Background())
go func() {
build, err := q.Request(ctx, core.Filter{OS: "linux/amd64", Arch: "amd64"})
if err != context.Canceled {
Expand Down Expand Up @@ -102,10 +101,7 @@ func TestQueuePush(t *testing.T) {
ctx := context.Background()
store := mock.NewMockStageStore(controller)

q := &queue{
store: store,
ready: make(chan struct{}, 1),
}
q := newQueue(store)
q.Schedule(ctx, item1)
q.Schedule(ctx, item2)
select {
Expand Down Expand Up @@ -356,3 +352,33 @@ func TestWithinLimits_Old(t *testing.T) {
}
}
}

func TestQueueContextCanceling(t *testing.T) {
listIncompleteResponse := []*core.Stage{
{ID: 1, OS: "linux/amd64", Arch: "amd64", Status: drone.StatusPending},
}

controller := gomock.NewController(t)
defer controller.Finish()

globCtx := context.Background()

mockStageStore := mock.NewMockStageStore(controller)
mockStageStore.EXPECT().ListIncomplete(globCtx).Return(listIncompleteResponse, nil).AnyTimes()

q := newQueue(mockStageStore)

for k := 0; k < 1000; k++ {
reqCtx, reqCanc := context.WithCancel(context.Background())
go reqCanc() // asynchronously cancel the context

stage, err := q.Request(reqCtx, core.Filter{OS: "linux/amd64", Arch: "amd64"})
if stage == nil && err == context.Canceled {
continue // we got the ctx canceled error
}
if stage == listIncompleteResponse[0] && err == nil {
continue // we got a stage before the context got canceled
}
t.Errorf("got neither the context canceled error nor the data: stage=%v err=%v", stage, err)
}
}

0 comments on commit fff8a4e

Please sign in to comment.