-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
grpc: fix receiving empty messages when compression is enabled and maxReceiveMessageSize is MaxInt #7753 #7918
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #7918 +/- ##
==========================================
- Coverage 82.08% 82.04% -0.04%
==========================================
Files 379 381 +2
Lines 38261 38547 +286
==========================================
+ Hits 31406 31626 +220
- Misses 5551 5600 +49
- Partials 1304 1321 +17
|
rpc_util.go
Outdated
if err != nil { | ||
out.Free() | ||
return nil, 0, err | ||
} | ||
if err = checkReceiveMessageOverflow(int64(out.Len()), int64(maxReceiveMessageSize), dcReader); err != nil { | ||
return nil, out.Len() + 1, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding 1 to out.Len()
can result in an overflow on 32 bit systems. The following part of the function seems strange to me:
Optionally, if data will be over maxReceiveMessageSize, just return the size
I suggest making the following change to avoid this:
- Declare a global error for indicating that the max receive size is exceeded:
var errMaxMessageSizeExceeded = errors.New("max message size exceeded")
- When the check here fails,
nil, 0, errMaxMessageSizeExceeded
. Update the godoc to mention the same. - In the caller, i.e. recvAndDecompress, instead of checking
if size > maxReceiveMessageSize
, checkif err == errMaxMessageSizeExceeded
. Also update the error message to not mention the actual size, because we didn't read the entire message anyways:grpc: received message after decompression larger than max %d
.
This ensures we're using the returned error to indicate the failure instead of relying on special values of the bytes read count.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please let the reviewer resolve comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not attempt to add out.Len() + 1
, instead return errMaxMessageSizeExceeded
to signal the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait, in case of overflow i think we are still reading the partial data so we need to return the length upto which we have read?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so, the logic should be read till maxMessageSize -1 first and then check if we still have bytes to read by reading one byte and if we overflow, throw the error along with lenght of how much we have read
rpc_util.go
Outdated
if err != nil { | ||
out.Free() | ||
return nil, 0, err | ||
} | ||
if err = checkReceiveMessageOverflow(int64(out.Len()), int64(maxReceiveMessageSize), dcReader); err != nil { | ||
return nil, out.Len() + 1, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please let the reviewer resolve comments.
rpc_util.go
Outdated
if err != nil { | ||
out.Free() | ||
return nil, 0, err | ||
} | ||
if err = checkReceiveMessageOverflow(int64(out.Len()), int64(maxReceiveMessageSize), dcReader); err != nil { | ||
return nil, out.Len() + 1, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not attempt to add out.Len() + 1
, instead return errMaxMessageSizeExceeded
to signal the same.
9d80990
to
3cf9054
Compare
rpc_util_test.go
Outdated
wantErr: nil, | ||
}, | ||
{ | ||
name: "Handles large buffer size (MaxInt)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't use special characters (like (
, )
) in the subtest names. Subtest names are used in test filters and go special characters will be escaped. It is not immediately clear what the escaped name would be in such cases.
rpc_util_test.go
Outdated
// premature data end. It ensures that the function behaves correctly with different | ||
// inputs, buffer sizes, and error conditions, using the "gzip" compressor for testing. | ||
func (s) TestDecompress(t *testing.T) { | ||
c := encoding.GetCompressor("gzip") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use a longer name, e.g: compressor
. Then length of a variable name should be proportional to it's scope. If a variable is accessed after 20 lines or so, it should have a longer name to avoid confusions.
rpc_util_test.go
Outdated
} | ||
|
||
if !cmp.Equal(tc.want, output.Materialize()) { | ||
t.Fatalf("decompress() output mismatch: Got = %v, Want = %v", output.Materialize(), tc.want) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small G
in got, small
Win
want`.
rpc_util.go
Outdated
} | ||
|
||
// doesReceiveMessageOverflow checks if the number of bytes read from the stream | ||
// exceeds the maximum receive message size allowed by the client. If the `readBytes` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: exceed
instead of exceeds
.
allowed by the client
should be allowed by the receiver
, the receiver could be the client or the server.
rpc_util_test.go
Outdated
for _, tc := range testCases { | ||
t.Run(tc.name, func(t *testing.T) { | ||
output, err := decompress(compressor, tc.input, tc.maxReceiveMessageSize, mem.DefaultBufferPool()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nix new line
rpc_util_test.go
Outdated
if !cmp.Equal(err, tc.wantErr, cmpopts.EquateErrors()) { | ||
t.Fatalf("decompress() error = %v, wantErr = %v", err, tc.wantErr) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nix new line
rpc_util_test.go
Outdated
output, err := decompress(compressor, tc.input, tc.maxReceiveMessageSize, mem.DefaultBufferPool()) | ||
|
||
if !cmp.Equal(err, tc.wantErr, cmpopts.EquateErrors()) { | ||
t.Fatalf("decompress() error = %v, wantErr = %v", err, tc.wantErr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
t.Fatalf("decompress() err = %v, wantErr = %v", err, tc.wantErr)
or .Fatalf("decompress() error = %v, want error = %v", err, tc.wantErr)
rpc_util_test.go
Outdated
}, | ||
{ | ||
name: "Handles large buffer size MaxInt", | ||
input: mustCompress(t, []byte("large message")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
large message is confusing. The bug which we are solving returns nil irrespective of what message size is if maxReceiveMessageSize is math.MaxInt. The test can be just Decompresses successfully with maxReceiveMessageSize MaxInt
. Basically it can be similar to first test case except the maxReceiveMessageSize is MaxInt.
The other test case where you are actually going to overflow by providing a message larger than MaxInt is hard to write as unit test
rpc_util_test.go
Outdated
return mem.BufferSlice{mem.NewBuffer(&compressedData, nil)} | ||
} | ||
|
||
// TestDecompress tests the decompress function with various scenarios, including |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TestDecompress tests the decompress function behaves correctly for following scenarios
- decompress successfully when message is <= maxReceiveMessageSize
- errors when message > maxReceiveMessageSize
- decompress successfully when maxReceiveMessageSize is MaxInt
rpc_util_test.go
Outdated
@@ -294,3 +298,75 @@ func BenchmarkGZIPCompressor512KiB(b *testing.B) { | |||
func BenchmarkGZIPCompressor1MiB(b *testing.B) { | |||
bmCompressor(b, 1024*1024, NewGZIPCompressor()) | |||
} | |||
|
|||
// mustCompress compresses the input data and returns a BufferSlice. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this function is more like compressWithDeterministicError so that we can do comparison in our test cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Adding a second reviewer.
rpc_util.go
Outdated
@@ -808,6 +809,9 @@ func (p *payloadInfo) free() { | |||
} | |||
} | |||
|
|||
// errMaxMessageSizeExceeded represents an error due to exceeding the maximum message size limit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is unnecessary.
rpc_util.go
Outdated
if size := len(uncompressedBuf); size > maxReceiveMessageSize { | ||
out.Free() | ||
// TODO: Revisit the error code. Currently keep it consistent with java | ||
// implementation. | ||
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", size, maxReceiveMessageSize) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like a good amount of duplication now.
Maybe we can move more logic into decompress
(e.g. pass in dc
) in order to avoid that?
rpc_util.go
Outdated
|
||
if doesReceiveMessageOverflow(out.Len(), maxReceiveMessageSize, dcReader) { | ||
out.Free() | ||
return nil, errMaxMessageSizeExceeded |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this just return the status.Errorf(...)
instead of returning a magic error that gets converted into that?
(We'd want to make sure this method always returns an error that is appropriate to just return directly from recvAndDecompress
.)
@purnesh42H I mistakenly clicked on 'Unassign me' from the issue. Can you please reassign the issue to me? |
Fixes #4552
RELEASE NOTES: