Skip to content

Commit 43544a1

Browse files
committed
client: Fix deadlock when writing to pipe blocks
Swap to calling send, which handles taking sendLock for us rather than doing it directly in createStream. This means that streamLock is now released before taking sendLock. Taking sendLock before releasing streamLock means that if a goroutine blocks writing to the pipe, it can make another goroutine get stuck trying to take sendLock, and therefore streamLock will be kept locked as well. This can lead to the receiver goroutine no longer being able to read responses from the pipe, since it needs to take streamLock when processing a response. This ultimately leads to a complete deadlock of the client. It is reasonable for a server to block writes to the pipe if the client is not reading responses fast enough. So we can't expect writes to never block. I have repro'd the hang with a simple ttrpc client and server. The client spins up 100 goroutines that spam the server with requests constantly. After a few seconds of running I can see it hang. I have set the buffer size for the pipe to 0 to more easily repro, but it would still be possible to hit with a larger buffer size (just may take a higher volume of requests or larger payloads). I also validated that I no longer see the hang with this fix, by leaving the test client/server running for a few minutes. Obviously not 100% conclusive, but before I could get a hang within several seconds of running. Signed-off-by: Kevin Parsons <kevpar@microsoft.com>
1 parent 4a2816b commit 43544a1

File tree

1 file changed

+20
-9
lines changed

1 file changed

+20
-9
lines changed

client.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -386,25 +386,36 @@ func (c *Client) receiveLoop() error {
386386
// createStream creates a new stream and registers it with the client
387387
// Introduce stream types for multiple or single response
388388
func (c *Client) createStream(flags uint8, b []byte) (*stream, error) {
389-
c.streamLock.Lock()
389+
c.sendLock.Lock()
390+
defer c.sendLock.Unlock()
390391

391392
// Check if closed since lock acquired to prevent adding
392393
// anything after cleanup completes
393394
select {
394395
case <-c.ctx.Done():
395-
c.streamLock.Unlock()
396396
return nil, ErrClosed
397397
default:
398398
}
399399

400-
// Stream ID should be allocated at same time
401-
s := newStream(c.nextStreamID, c)
402-
c.streams[s.id] = s
403-
c.nextStreamID = c.nextStreamID + 2
400+
var s *stream
401+
if err := func() error {
402+
c.streamLock.Lock()
403+
defer c.streamLock.Unlock()
404404

405-
c.sendLock.Lock()
406-
defer c.sendLock.Unlock()
407-
c.streamLock.Unlock()
405+
select {
406+
case <-c.ctx.Done():
407+
return ErrClosed
408+
default:
409+
}
410+
411+
s = newStream(c.nextStreamID, c)
412+
c.streams[s.id] = s
413+
c.nextStreamID = c.nextStreamID + 2
414+
415+
return nil
416+
}(); err != nil {
417+
return nil, err
418+
}
408419

409420
if err := c.channel.send(uint32(s.id), messageTypeRequest, flags, b); err != nil {
410421
return s, filterCloseErr(err)

0 commit comments

Comments
 (0)