Skip to content

Commit ff6c8e7

Browse files
committed
fix(storage): deadlock in event loop while coordinating channels (#13652)
1 parent 5cd8007 commit ff6c8e7

File tree

1 file changed

+68
-16
lines changed

1 file changed

+68
-16
lines changed

storage/grpc_reader_multi_range.go

Lines changed: 68 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package storage
1616

1717
import (
18+
"container/list"
1819
"context"
1920
"errors"
2021
"fmt"
@@ -34,6 +35,10 @@ import (
3435
const (
3536
mrdCommandChannelSize = 1
3637
mrdResponseChannelSize = 100
38+
// This should never be hit in practice, but is a safety valve to prevent
39+
// unbounded memory usage if the user is adding ranges faster than they
40+
// can be processed.
41+
mrdAddInternalQueueMaxSize = 50000
3742
)
3843

3944
// --- internalMultiRangeDownloader Interface ---
@@ -83,18 +88,19 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
8388

8489
// Create the manager
8590
manager := &multiRangeDownloaderManager{
86-
ctx: mCtx,
87-
cancel: cancel,
88-
client: c,
89-
settings: s,
90-
params: params,
91-
cmds: make(chan mrdCommand, mrdCommandChannelSize),
92-
sessionResps: make(chan mrdSessionResult, mrdResponseChannelSize),
93-
pendingRanges: make(map[int64]*rangeRequest),
94-
readIDCounter: 1,
95-
readSpec: readSpec,
96-
attrsReady: make(chan struct{}),
97-
spanCtx: ctx,
91+
ctx: mCtx,
92+
cancel: cancel,
93+
client: c,
94+
settings: s,
95+
params: params,
96+
cmds: make(chan mrdCommand, mrdCommandChannelSize),
97+
sessionResps: make(chan mrdSessionResult, mrdResponseChannelSize),
98+
pendingRanges: make(map[int64]*rangeRequest),
99+
readIDCounter: 1,
100+
readSpec: readSpec,
101+
attrsReady: make(chan struct{}),
102+
spanCtx: ctx,
103+
unsentRequests: newRequestQueue(),
98104
}
99105

100106
mrd := &MultiRangeDownloader{
@@ -227,6 +233,7 @@ type multiRangeDownloaderManager struct {
227233
attrsOnce sync.Once
228234
spanCtx context.Context
229235
callbackWg sync.WaitGroup
236+
unsentRequests *requestQueue
230237
}
231238

232239
type rangeRequest struct {
@@ -374,10 +381,29 @@ func (m *multiRangeDownloaderManager) eventLoop() {
374381
}
375382

376383
for {
384+
var nextReq *storagepb.BidiReadObjectRequest
385+
var targetChan chan<- *storagepb.BidiReadObjectRequest
386+
387+
// Only try to send if we have queued requests
388+
if m.unsentRequests.Len() > 0 && m.currentSession != nil {
389+
nextReq = m.unsentRequests.Front()
390+
if nextReq != nil {
391+
targetChan = m.currentSession.reqC
392+
}
393+
}
394+
// Only read from cmds if we have space in the unsentRequests queue.
395+
var cmdsChan chan mrdCommand
396+
if m.unsentRequests.Len() < mrdAddInternalQueueMaxSize {
397+
cmdsChan = m.cmds
398+
}
377399
select {
378400
case <-m.ctx.Done():
379401
return
380-
case cmd := <-m.cmds:
402+
// This path only triggers if space is available in the channel.
403+
// It never blocks the eventLoop.
404+
case targetChan <- nextReq:
405+
m.unsentRequests.RemoveFront()
406+
case cmd := <-cmdsChan:
381407
cmd.apply(m.ctx, m)
382408
if _, ok := cmd.(*mrdCloseCmd); ok {
383409
return
@@ -386,7 +412,7 @@ func (m *multiRangeDownloaderManager) eventLoop() {
386412
m.processSessionResult(result)
387413
}
388414

389-
if len(m.pendingRanges) == 0 {
415+
if len(m.pendingRanges) == 0 && m.unsentRequests.Len() == 0 {
390416
for _, waiter := range m.waiters {
391417
close(waiter)
392418
}
@@ -512,7 +538,7 @@ func (m *multiRangeDownloaderManager) handleAddCmd(ctx context.Context, cmd *mrd
512538
ReadId: req.readID,
513539
}},
514540
}
515-
m.currentSession.SendRequest(protoReq)
541+
m.unsentRequests.PushBack(protoReq)
516542
}
517543

518544
func (m *multiRangeDownloaderManager) convertToPositiveOffset(req *rangeRequest) error {
@@ -655,7 +681,8 @@ func (m *multiRangeDownloaderManager) ensureSession(ctx context.Context) error {
655681
}
656682
}
657683
if len(rangesToResend) > 0 {
658-
m.currentSession.SendRequest(&storagepb.BidiReadObjectRequest{ReadRanges: rangesToResend})
684+
retryReq := &storagepb.BidiReadObjectRequest{ReadRanges: rangesToResend}
685+
m.unsentRequests.PushFront(retryReq)
659686
}
660687
return nil
661688
}, m.settings.retry, true)
@@ -900,3 +927,28 @@ func readerAttrsFromObject(o *ObjectAttrs) ReaderObjectAttrs {
900927
CRC32C: o.CRC32C,
901928
}
902929
}
930+
931+
type requestQueue struct {
932+
l *list.List
933+
}
934+
935+
func newRequestQueue() *requestQueue {
936+
return &requestQueue{l: list.New()}
937+
}
938+
939+
func (q *requestQueue) PushBack(r *storagepb.BidiReadObjectRequest) { q.l.PushBack(r) }
940+
func (q *requestQueue) PushFront(r *storagepb.BidiReadObjectRequest) { q.l.PushFront(r) }
941+
func (q *requestQueue) Len() int { return q.l.Len() }
942+
943+
func (q *requestQueue) Front() *storagepb.BidiReadObjectRequest {
944+
if f := q.l.Front(); f != nil {
945+
return f.Value.(*storagepb.BidiReadObjectRequest)
946+
}
947+
return nil
948+
}
949+
950+
func (q *requestQueue) RemoveFront() {
951+
if f := q.l.Front(); f != nil {
952+
q.l.Remove(f)
953+
}
954+
}

0 commit comments

Comments
 (0)