Skip to content

Fixed bugs related to protocols and encoding. #154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
dcb882d
feat: 支持curl直接访问
fatestrange Aug 3, 2023
dfa2607
fix(fix bug):
fatestrange Aug 7, 2023
cd7fdce
refactor: 将生成Codec的逻辑转移到了TripleServer
fatestrange Aug 9, 2023
fe963dc
Feat: Added support for JSON encoding types(#145)
fatestrange Aug 14, 2023
52dbcd6
fix: Fixed a bug related to compression(#145)
fatestrange Aug 14, 2023
17c41ee
perf: Improved code reuse-related logic(#145)
fatestrange Aug 16, 2023
5af48ff
style: Formatted the code according to the cargo fmt standard(#145)
fatestrange Aug 16, 2023
1f041b2
perf: Resolved the warnings from cargo check(#145)
fatestrange Aug 16, 2023
3f5c135
perf: Optimized the code structure and removed redundant code(#145)
fatestrange Aug 16, 2023
0791600
perf: Optimized the configuration format as well as the configuration…
fatestrange Aug 17, 2023
3c70c89
perf: Removed configuration using serialization methods.(#145)
fatestrange Aug 18, 2023
3cca8c4
Merge branch 'main' into curl
AdachiAndShimamura Aug 20, 2023
8133afc
fix(dubbo): Fixed bugs related to protocols and encoding.(#145)
AdachiAndShimamura Aug 20, 2023
3a04088
perf(dubbo): Rename some variables and function names(#145)
AdachiAndShimamura Aug 24, 2023
cd958fd
Merge branch 'main' into curl
AdachiAndShimamura Aug 27, 2023
8a34266
docs(dubbo): Added protocol-related comments
AdachiAndShimamura Aug 29, 2023
a2caca4
Merge remote-tracking branch 'origin/curl' into curl
AdachiAndShimamura Aug 29, 2023
306f55b
perf(dubbo): Improved the expression of meaning in code related to en…
AdachiAndShimamura Aug 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 19 additions & 26 deletions dubbo/src/triple/client/triple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,16 @@ impl TripleClient {
M1: Message + Send + Sync + 'static + Serialize,
M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> + Default,
{
let is_json = false;
let (decoder, encoder): (
Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
) = get_codec(is_json);
) = get_codec("application/grpc+proto");
let req = req.map(|m| stream::once(future::ready(m)));
let body_stream = encode(
encoder,
req.into_inner().map(Ok),
self.send_compression_encoding,
is_json,
true,
)
.into_stream();
let body = hyper::Body::wrap_stream(body_stream);
Expand All @@ -179,9 +178,8 @@ impl TripleClient {

match response {
Ok(v) => {
let resp = v.map(|body| {
Decoding::new(body, decoder, self.send_compression_encoding, is_json)
});
let resp = v
.map(|body| Decoding::new(body, decoder, self.send_compression_encoding, true));
let (mut parts, body) = Response::from_http(resp).into_parts();

futures_util::pin_mut!(body);
Expand Down Expand Up @@ -215,17 +213,16 @@ impl TripleClient {
M1: Message + Send + Sync + 'static + Serialize,
M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> + Default,
{
let is_json = false;
let (decoder, encoder): (
Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
) = get_codec(is_json);
) = get_codec("application/grpc+proto");
let req = req.into_streaming_request();
let en = encode(
encoder,
req.into_inner().map(Ok),
self.send_compression_encoding,
is_json,
true,
)
.into_stream();
let body = hyper::Body::wrap_stream(en);
Expand All @@ -251,9 +248,8 @@ impl TripleClient {

match response {
Ok(v) => {
let resp = v.map(|body| {
Decoding::new(body, decoder, self.send_compression_encoding, is_json)
});
let resp = v
.map(|body| Decoding::new(body, decoder, self.send_compression_encoding, true));

Ok(Response::from_http(resp))
}
Expand All @@ -271,17 +267,16 @@ impl TripleClient {
M1: Message + Send + Sync + 'static + Serialize,
M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> + Default,
{
let is_json = false;
let (decoder, encoder): (
Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
) = get_codec(is_json);
) = get_codec("application/grpc+proto");
let req = req.into_streaming_request();
let en = encode(
encoder,
req.into_inner().map(Ok),
self.send_compression_encoding,
is_json,
true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

273和279行这两个值是不是应该一致?

)
.into_stream();
let body = hyper::Body::wrap_stream(en);
Expand All @@ -308,9 +303,8 @@ impl TripleClient {

match response {
Ok(v) => {
let resp = v.map(|body| {
Decoding::new(body, decoder, self.send_compression_encoding, is_json)
});
let resp = v
.map(|body| Decoding::new(body, decoder, self.send_compression_encoding, true));
let (mut parts, body) = Response::from_http(resp).into_parts();

futures_util::pin_mut!(body);
Expand Down Expand Up @@ -344,17 +338,16 @@ impl TripleClient {
M1: Message + Send + Sync + 'static + Serialize,
M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> + Default,
{
let is_json = false;
let (decoder, encoder): (
Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
) = get_codec(is_json);
) = get_codec("application/grpc+proto");
let req = req.map(|m| stream::once(future::ready(m)));
let en = encode(
encoder,
req.into_inner().map(Ok),
self.send_compression_encoding,
is_json,
true,
)
.into_stream();
let body = hyper::Body::wrap_stream(en);
Expand All @@ -379,9 +372,8 @@ impl TripleClient {

match response {
Ok(v) => {
let resp = v.map(|body| {
Decoding::new(body, decoder, self.send_compression_encoding, is_json)
});
let resp = v
.map(|body| Decoding::new(body, decoder, self.send_compression_encoding, true));

Ok(Response::from_http(resp))
}
Expand All @@ -391,7 +383,7 @@ impl TripleClient {
}

pub fn get_codec<M1, M2>(
is_json: bool,
content_type: &str,
) -> (
Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
Expand All @@ -400,7 +392,8 @@ where
M1: Message + Send + Sync + 'static + Serialize,
M2: Message + Send + Sync + 'static + for<'a> Deserialize<'a> + Default,
{
match is_json {
//Determine whether to use JSON as the serialization method.
match content_type.ends_with("json") {
true => {
let mut codec = SerdeCodec::<M1, M2>::default();
(Box::new(codec.decoder()), Box::new(codec.encoder()))
Expand Down
23 changes: 12 additions & 11 deletions dubbo/src/triple/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ pub struct Decoding<T> {
trailers: Option<Metadata>,
compress: Option<CompressionEncoding>,
decompress_buf: BytesMut,
is_json: bool,
decode_as_grpc: bool,
}

#[derive(PartialEq)]
enum State {
ReadHeader,
ReadJSON,
ReadHttpBody,
ReadBody { len: usize, is_compressed: bool },
Error,
}
Expand All @@ -54,12 +54,13 @@ impl<T> Decoding<T> {
body: B,
decoder: Box<dyn Decoder<Item = T, Error = crate::status::Status> + Send + 'static>,
compress: Option<CompressionEncoding>,
is_json: bool,
decode_as_grpc: bool,
) -> Self
where
B: Body + Send + 'static,
B::Error: Into<crate::Error>,
{
//Determine whether to use the gRPC mode to handle request data
Self {
state: State::ReadHeader,
body: body
Expand All @@ -76,7 +77,7 @@ impl<T> Decoding<T> {
trailers: None,
compress,
decompress_buf: BytesMut::new(),
is_json,
decode_as_grpc,
}
}

Expand All @@ -98,12 +99,12 @@ impl<T> Decoding<T> {
trailer.map(|data| data.map(Metadata::from_headers))
}

pub fn decode_json(&mut self) -> Result<Option<T>, crate::status::Status> {
pub fn decode_http(&mut self) -> Result<Option<T>, crate::status::Status> {
if self.state == State::ReadHeader {
self.state = State::ReadJSON;
self.state = State::ReadHttpBody;
return Ok(None);
}
if let State::ReadJSON = self.state {
if let State::ReadHttpBody = self.state {
if self.buf.is_empty() {
return Ok(None);
}
Expand Down Expand Up @@ -138,7 +139,7 @@ impl<T> Decoding<T> {
Ok(None)
}

pub fn decode_proto(&mut self) -> Result<Option<T>, crate::status::Status> {
pub fn decode_grpc(&mut self) -> Result<Option<T>, crate::status::Status> {
if self.state == State::ReadHeader {
// buffer is full
if self.buf.remaining() < super::consts::HEADER_SIZE {
Expand Down Expand Up @@ -215,10 +216,10 @@ impl<T> Decoding<T> {
}

pub fn decode_chunk(&mut self) -> Result<Option<T>, crate::status::Status> {
if self.is_json {
self.decode_json()
if self.decode_as_grpc {
self.decode_grpc()
} else {
self.decode_proto()
self.decode_http()
}
}
}
Expand Down
34 changes: 17 additions & 17 deletions dubbo/src/triple/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub fn encode<E, B>(
mut encoder: Box<dyn Encoder<Error = Status, Item = E> + Send + 'static>,
resp_body: B,
compression_encoding: Option<CompressionEncoding>,
is_json: bool,
encode_as_grpc: bool,
) -> impl TryStream<Ok = Bytes, Error = Status>
where
B: Stream<Item = Result<E, Status>>,
Expand All @@ -48,10 +48,10 @@ where
loop {
match resp_body.next().await {
Some(Ok(item)) => {
if !is_json {
if encode_as_grpc {
buf.reserve(super::consts::HEADER_SIZE);
unsafe {
buf.advance_mut(super::consts::HEADER_SIZE);
unsafe {
buf.advance_mut(super::consts::HEADER_SIZE);
}
}
// 编码数据到缓冲中
Expand All @@ -67,18 +67,18 @@ where
} else {
encoder.encode(item, &mut EncodeBuf::new(&mut buf)).map_err(|_e| crate::status::Status::new(crate::status::Code::Internal, "encode error".to_string()));
}
let result=match is_json{
let result=match encode_as_grpc{
true=>{
buf.clone()
}
false=>{
let len = buf.len() - super::consts::HEADER_SIZE;
{
let mut buf = &mut buf[..super::consts::HEADER_SIZE];
buf.put_u8(enable_compress as u8);
buf.put_u32(len as u32);
{
let mut buf = &mut buf[..super::consts::HEADER_SIZE];
buf.put_u8(enable_compress as u8);
buf.put_u32(len as u32);
}
buf.split_to(len + super::consts::HEADER_SIZE)
}
buf.split_to(len + super::consts::HEADER_SIZE)
false=>{
buf.clone()
}
};
yield Ok(result.freeze());
Expand All @@ -94,25 +94,25 @@ pub fn encode_server<E, B>(
encoder: Box<dyn Encoder<Error = Status, Item = E> + Send + 'static>,
body: B,
compression_encoding: Option<CompressionEncoding>,
is_json: bool,
encode_as_grpc: bool,
) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
where
B: Stream<Item = Result<E, Status>>,
{
let s = encode(encoder, body, compression_encoding, is_json).into_stream();
let s = encode(encoder, body, compression_encoding, encode_as_grpc).into_stream();
EncodeBody::new_server(s)
}

pub fn encode_client<E, B>(
encoder: Box<dyn Encoder<Error = Status, Item = E> + Send + 'static>,
body: B,
compression_encoding: Option<CompressionEncoding>,
is_json: bool,
is_grpc: bool,
) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
where
B: Stream<Item = E>,
{
let s = encode(encoder, body.map(Ok), compression_encoding, is_json).into_stream();
let s = encode(encoder, body.map(Ok), compression_encoding, is_grpc).into_stream();
EncodeBody::new_client(s)
}

Expand Down
Loading