@@ -11,53 +11,7 @@ use std::{
1111 pin:: Pin ,
1212 task:: { ready, Context , Poll } ,
1313} ;
14- use tokio_stream:: { Stream , StreamExt } ;
15-
16- /// Turns a stream of grpc results (message or error status) into [EncodeBody] which is used by grpc
17- /// servers for turning the messages into http frames for sending over the network.
18- pub fn encode_server < T , U > (
19- encoder : T ,
20- source : U ,
21- compression_encoding : Option < CompressionEncoding > ,
22- compression_override : SingleMessageCompressionOverride ,
23- max_message_size : Option < usize > ,
24- ) -> EncodeBody < impl Stream < Item = Result < Bytes , Status > > >
25- where
26- T : Encoder < Error = Status > ,
27- U : Stream < Item = Result < T :: Item , Status > > ,
28- {
29- let stream = EncodedBytes :: new (
30- encoder,
31- source. fuse ( ) ,
32- compression_encoding,
33- compression_override,
34- max_message_size,
35- ) ;
36-
37- EncodeBody :: new_server ( stream)
38- }
39-
40- /// Turns a stream of grpc messages into [EncodeBody] which is used by grpc clients for
41- /// turning the messages into http frames for sending over the network.
42- pub fn encode_client < T , U > (
43- encoder : T ,
44- source : U ,
45- compression_encoding : Option < CompressionEncoding > ,
46- max_message_size : Option < usize > ,
47- ) -> EncodeBody < impl Stream < Item = Result < Bytes , Status > > >
48- where
49- T : Encoder < Error = Status > ,
50- U : Stream < Item = T :: Item > ,
51- {
52- let stream = EncodedBytes :: new (
53- encoder,
54- source. fuse ( ) . map ( Ok ) ,
55- compression_encoding,
56- SingleMessageCompressionOverride :: default ( ) ,
57- max_message_size,
58- ) ;
59- EncodeBody :: new_client ( stream)
60- }
14+ use tokio_stream:: { adapters:: Fuse , Stream , StreamExt } ;
6115
6216/// Combinator for efficient encoding of messages into reasonably sized buffers.
6317/// EncodedBytes encodes ready messages from its delegate stream into a BytesMut,
6620/// * The encoded buffer surpasses YIELD_THRESHOLD.
6721#[ pin_project( project = EncodedBytesProj ) ]
6822#[ derive( Debug ) ]
69- pub ( crate ) struct EncodedBytes < T , U >
70- where
71- T : Encoder < Error = Status > ,
72- U : Stream < Item = Result < T :: Item , Status > > ,
73- {
23+ struct EncodedBytes < T , U > {
7424 #[ pin]
75- source : U ,
25+ source : Fuse < U > ,
7626 encoder : T ,
7727 compression_encoding : Option < CompressionEncoding > ,
7828 max_message_size : Option < usize > ,
8131 error : Option < Status > ,
8232}
8333
84- impl < T , U > EncodedBytes < T , U >
85- where
86- T : Encoder < Error = Status > ,
87- U : Stream < Item = Result < T :: Item , Status > > ,
88- {
89- // `source` should be fused stream.
34+ impl < T : Encoder , U : Stream > EncodedBytes < T , U > {
9035 fn new (
9136 encoder : T ,
9237 source : U ,
11156 } ;
11257
11358 Self {
114- source,
59+ source : source . fuse ( ) ,
11560 encoder,
11661 compression_encoding,
11762 max_message_size,
@@ -270,9 +215,9 @@ enum Role {
270215/// A specialized implementation of [Body] for encoding [Result<Bytes, Status>].
271216#[ pin_project]
272217#[ derive( Debug ) ]
273- pub struct EncodeBody < S > {
218+ pub struct EncodeBody < T , U > {
274219 #[ pin]
275- inner : S ,
220+ inner : EncodedBytes < T , U > ,
276221 state : EncodeState ,
277222}
278223
@@ -283,10 +228,23 @@ struct EncodeState {
283228 is_end_stream : bool ,
284229}
285230
286- impl < S > EncodeBody < S > {
287- fn new_client ( inner : S ) -> Self {
231+ impl < T : Encoder , U : Stream > EncodeBody < T , U > {
232+ /// Turns a stream of grpc messages into [EncodeBody] which is used by grpc clients for
233+ /// turning the messages into http frames for sending over the network.
234+ pub fn new_client (
235+ encoder : T ,
236+ source : U ,
237+ compression_encoding : Option < CompressionEncoding > ,
238+ max_message_size : Option < usize > ,
239+ ) -> Self {
288240 Self {
289- inner,
241+ inner : EncodedBytes :: new (
242+ encoder,
243+ source,
244+ compression_encoding,
245+ SingleMessageCompressionOverride :: default ( ) ,
246+ max_message_size,
247+ ) ,
290248 state : EncodeState {
291249 error : None ,
292250 role : Role :: Client ,
@@ -295,9 +253,23 @@ impl<S> EncodeBody<S> {
295253 }
296254 }
297255
298- fn new_server ( inner : S ) -> Self {
256+ /// Turns a stream of grpc results (message or error status) into [EncodeBody] which is used by grpc
257+ /// servers for turning the messages into http frames for sending over the network.
258+ pub fn new_server (
259+ encoder : T ,
260+ source : U ,
261+ compression_encoding : Option < CompressionEncoding > ,
262+ compression_override : SingleMessageCompressionOverride ,
263+ max_message_size : Option < usize > ,
264+ ) -> Self {
299265 Self {
300- inner,
266+ inner : EncodedBytes :: new (
267+ encoder,
268+ source,
269+ compression_encoding,
270+ compression_override,
271+ max_message_size,
272+ ) ,
301273 state : EncodeState {
302274 error : None ,
303275 role : Role :: Server ,
@@ -328,9 +300,10 @@ impl EncodeState {
328300 }
329301}
330302
331- impl < S > Body for EncodeBody < S >
303+ impl < T , U > Body for EncodeBody < T , U >
332304where
333- S : Stream < Item = Result < Bytes , Status > > ,
305+ T : Encoder < Error = Status > ,
306+ U : Stream < Item = Result < T :: Item , Status > > ,
334307{
335308 type Data = Bytes ;
336309 type Error = Status ;
0 commit comments