-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(codec): Fix buffer decode panic on full (#43)
* fix(codec): Fix buffer decode panic on full This is a naive fix for the buffer growing beyond capacity and producing a panic. Ideally we should do a better job of not having to allocate for new messages by using a link list. * fmt
- Loading branch information
1 parent
bd2b4e0
commit ed3e7e9
Showing
5 changed files
with
134 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
use super::{ | ||
encode_server, | ||
prost::{ProstDecoder, ProstEncoder}, | ||
Streaming, | ||
}; | ||
use crate::Status; | ||
use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf}; | ||
use http_body::Body; | ||
use prost::Message; | ||
use std::{ | ||
io::Cursor, | ||
pin::Pin, | ||
task::{Context, Poll}, | ||
}; | ||
|
||
#[derive(Clone, PartialEq, prost::Message)] | ||
struct Msg { | ||
#[prost(bytes, tag = "1")] | ||
data: Vec<u8>, | ||
} | ||
|
||
#[tokio::test] | ||
async fn decode() { | ||
let decoder = ProstDecoder::<Msg>::default(); | ||
|
||
let data = Vec::from(&[0u8; 1024][..]); | ||
let msg = Msg { data }; | ||
|
||
let mut buf = BytesMut::new(); | ||
let len = msg.encoded_len(); | ||
|
||
buf.reserve(len + 5); | ||
buf.put_u8(0); | ||
buf.put_u32_be(len as u32); | ||
msg.encode(&mut buf).unwrap(); | ||
|
||
let body = MockBody(buf.freeze(), 0, 100); | ||
|
||
let mut stream = Streaming::new_request(decoder, body); | ||
|
||
while let Some(_) = stream.message().await.unwrap() {} | ||
} | ||
|
||
#[tokio::test] | ||
async fn encode() { | ||
let encoder = ProstEncoder::<Msg>::default(); | ||
|
||
let data = Vec::from(&[0u8; 1024][..]); | ||
let msg = Msg { data }; | ||
|
||
let messages = std::iter::repeat(Ok::<_, Status>(msg)).take(10000); | ||
let source = futures_util::stream::iter(messages); | ||
|
||
let body = encode_server(encoder, source); | ||
|
||
futures_util::pin_mut!(body); | ||
|
||
while let Some(r) = body.next().await { | ||
r.unwrap(); | ||
} | ||
} | ||
|
||
#[derive(Debug)] | ||
struct MockBody(Bytes, usize, usize); | ||
|
||
impl Body for MockBody { | ||
type Data = Data; | ||
type Error = Status; | ||
|
||
fn poll_data( | ||
mut self: Pin<&mut Self>, | ||
_cx: &mut Context<'_>, | ||
) -> Poll<Option<Result<Self::Data, Self::Error>>> { | ||
if self.1 > self.2 { | ||
self.1 += 1; | ||
let data = Data(self.0.clone().into_buf()); | ||
Poll::Ready(Some(Ok(data))) | ||
} else { | ||
Poll::Ready(None) | ||
} | ||
} | ||
|
||
fn poll_trailers( | ||
self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> { | ||
drop(cx); | ||
Poll::Ready(Ok(None)) | ||
} | ||
} | ||
|
||
struct Data(Cursor<Bytes>); | ||
|
||
impl Into<Bytes> for Data { | ||
fn into(self) -> Bytes { | ||
self.0.into_inner() | ||
} | ||
} | ||
|
||
impl Buf for Data { | ||
fn remaining(&self) -> usize { | ||
self.0.remaining() | ||
} | ||
|
||
fn bytes(&self) -> &[u8] { | ||
self.0.bytes() | ||
} | ||
|
||
fn advance(&mut self, cnt: usize) { | ||
self.0.advance(cnt) | ||
} | ||
} |