-
Notifications
You must be signed in to change notification settings - Fork 4.5k
transport: Propagate status code on receiving RST_STREAM during message read #8289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
transport: Propagate status code on receiving RST_STREAM during message read #8289
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #8289 +/- ##
==========================================
+ Coverage 82.14% 82.27% +0.13%
==========================================
Files 417 419 +2
Lines 41344 41992 +648
==========================================
+ Hits 33961 34550 +589
- Misses 5957 5982 +25
- Partials 1426 1460 +34
🚀 New features to boost your workflow:
|
b603bef
to
78f39c7
Compare
78f39c7
to
af8bb07
Compare
af8bb07
to
6a24978
Compare
@@ -1242,7 +1242,8 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { | |||
statusCode = codes.DeadlineExceeded | |||
} | |||
} | |||
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false) | |||
st := status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode) | |||
t.closeStream(s, st.Err(), false, http2.ErrCodeNo, st, nil, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dfawley I've changed the code to use a status error for unblocking the reader. This bypasses the conversion to io.ErrUnexpectedEOF
and doesn't require reading the status in csAttempt.recvMsg()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is that conversion happening?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here:
Lines 674 to 680 in a43eba6
data, err := p.r.Read(int(length)) | |
if err != nil { | |
if err == io.EOF { | |
err = io.ErrUnexpectedEOF | |
} | |
return 0, nil, err | |
} |
@@ -1242,7 +1242,8 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { | |||
statusCode = codes.DeadlineExceeded | |||
} | |||
} | |||
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false) | |||
st := status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode) | |||
t.closeStream(s, st.Err(), false, http2.ErrCodeNo, st, nil, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is that conversion happening?
test/transport_test.go
Outdated
case *http2.HeadersFrame: | ||
// When the client creates a stream, write a partial gRPC | ||
// message followed by an RST_STREAM. | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this in a goroutine, but the ping ack is written inline?
Also...can we just not reply to the ping and then remove the lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the ping handling and the mutex. Removed the separate goroutine used for writing data frames.
The removed things were unnecessary. I copied the loop that reads frames from an existing test and didn't remove the extra stuff.
grpc-go/internal/transport/transport_test.go
Lines 1414 to 1450 in 950a7cf
for { | |
frame, err := sfr.ReadFrame() | |
if err != nil { | |
return | |
} | |
switch frame := frame.(type) { | |
case *http2.HeadersFrame: | |
// When the client creates a stream, violate the stream flow control. | |
go func() { | |
buf := make([]byte, http2MaxFrameLen) | |
for { | |
mu.Lock() | |
if err := sfr.WriteData(1, false, buf); err != nil { | |
mu.Unlock() | |
return | |
} | |
mu.Unlock() | |
// This for loop is capable of hogging the CPU and cause starvation | |
// in Go versions prior to 1.9, | |
// in single CPU environment. Explicitly relinquish processor. | |
runtime.Gosched() | |
} | |
}() | |
case *http2.RSTStreamFrame: | |
if frame.Header().StreamID != 1 || http2.ErrCode(frame.ErrCode) != http2.ErrCodeFlowControl { | |
t.Errorf("RST stream received with streamID: %d and code: %v, want streamID: 1 and code: http2.ErrCodeFlowControl", frame.Header().StreamID, http2.ErrCode(frame.ErrCode)) | |
} | |
close(success) | |
return | |
case *http2.PingFrame: | |
mu.Lock() | |
sfr.WritePing(true, frame.Data) | |
mu.Unlock() | |
default: | |
} | |
} | |
}() |
Co-authored-by: Doug Fawley <dfawley@google.com>
test/transport_test.go
Outdated
case *http2.RSTStreamFrame: | ||
if frame.Header().StreamID != 1 || http2.ErrCode(frame.ErrCode) != http2.ErrCodeFlowControl { | ||
t.Errorf("RST stream received with streamID: %d and code: %v, want streamID: 1 and code: http2.ErrCodeFlowControl", frame.Header().StreamID, http2.ErrCode(frame.ErrCode)) | ||
messageLen := 2048 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const
?
Fixes: #8281
RCA: #8281 (comment)
This PR uses a status error instead of an
io.UnexpectedEOF
to fail RPCs that have partially read a gRPC message.RELEASE NOTES:
INTERNAL
instead ofCANCELLED
orDEADLINE_EXCEEDED
when the client receives an RST_STREAM frame while reading a gRPC message.