Skip to content

Commit 2e7250b

Browse files
rrichardsonseanmonstar
authored andcommitted
feat(client): add http1_read_buf_exact_size Builder option
This changes the read buffer strategy from being adaptive to always using an exact size for the buffer.
1 parent 92a8aba commit 2e7250b

File tree

4 files changed

+91
-27
lines changed

4 files changed

+91
-27
lines changed

src/client/conn.rs

+9
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ pub struct Builder {
7474
exec: Exec,
7575
h1_writev: bool,
7676
h1_title_case_headers: bool,
77+
h1_read_buf_exact_size: Option<usize>,
7778
http2: bool,
7879
}
7980

@@ -432,6 +433,7 @@ impl Builder {
432433
Builder {
433434
exec: Exec::Default,
434435
h1_writev: true,
436+
h1_read_buf_exact_size: None,
435437
h1_title_case_headers: false,
436438
http2: false,
437439
}
@@ -461,6 +463,10 @@ impl Builder {
461463
self
462464
}
463465

466+
pub(super) fn h1_read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
467+
self.h1_read_buf_exact_size = sz;
468+
self
469+
}
464470
/// Sets whether HTTP2 is required.
465471
///
466472
/// Default is false.
@@ -506,6 +512,9 @@ where
506512
if self.builder.h1_title_case_headers {
507513
conn.set_title_case_headers();
508514
}
515+
if let Some(sz) = self.builder.h1_read_buf_exact_size {
516+
conn.set_read_buf_exact_size(sz);
517+
}
509518
let cd = proto::h1::dispatch::Client::new(rx);
510519
let dispatch = proto::h1::Dispatcher::new(cd, conn);
511520
Either::A(dispatch)

src/client/mod.rs

+17
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ pub struct Client<C, B = Body> {
110110
h1_writev: bool,
111111
h1_title_case_headers: bool,
112112
pool: Pool<PoolClient<B>>,
113+
h1_read_buf_exact_size: Option<usize>,
113114
retry_canceled_requests: bool,
114115
set_host: bool,
115116
ver: Ver,
@@ -460,6 +461,7 @@ where C: Connect + Sync + 'static,
460461
let pool = self.pool.clone();
461462
let h1_writev = self.h1_writev;
462463
let h1_title_case_headers = self.h1_title_case_headers;
464+
let h1_read_buf_exact_size = self.h1_read_buf_exact_size;
463465
let ver = self.ver;
464466
let is_ver_h2 = self.ver == Ver::Http2;
465467
let connector = self.connector.clone();
@@ -506,6 +508,7 @@ where C: Connect + Sync + 'static,
506508
.exec(executor.clone())
507509
.h1_writev(h1_writev)
508510
.h1_title_case_headers(h1_title_case_headers)
511+
.h1_read_buf_exact_size(h1_read_buf_exact_size)
509512
.http2_only(is_h2)
510513
.handshake(io)
511514
.and_then(move |(tx, conn)| {
@@ -545,6 +548,7 @@ impl<C, B> Clone for Client<C, B> {
545548
connector: self.connector.clone(),
546549
executor: self.executor.clone(),
547550
h1_writev: self.h1_writev,
551+
h1_read_buf_exact_size: self.h1_read_buf_exact_size,
548552
h1_title_case_headers: self.h1_title_case_headers,
549553
pool: self.pool.clone(),
550554
retry_canceled_requests: self.retry_canceled_requests,
@@ -791,6 +795,7 @@ pub struct Builder {
791795
keep_alive_timeout: Option<Duration>,
792796
h1_writev: bool,
793797
h1_title_case_headers: bool,
798+
h1_read_buf_exact_size: Option<usize>,
794799
max_idle_per_host: usize,
795800
retry_canceled_requests: bool,
796801
set_host: bool,
@@ -805,6 +810,7 @@ impl Default for Builder {
805810
keep_alive_timeout: Some(Duration::from_secs(90)),
806811
h1_writev: true,
807812
h1_title_case_headers: false,
813+
h1_read_buf_exact_size: None,
808814
max_idle_per_host: ::std::usize::MAX,
809815
retry_canceled_requests: true,
810816
set_host: true,
@@ -851,6 +857,15 @@ impl Builder {
851857
self
852858
}
853859

860+
/// Sets the exact size of the read buffer to *always* use.
861+
///
862+
/// Default is an adaptive read buffer.
863+
#[inline]
864+
pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
865+
self.h1_read_buf_exact_size = Some(sz);
866+
self
867+
}
868+
854869
/// Set whether HTTP/1 connections will write header names as title case at
855870
/// the socket level.
856871
///
@@ -950,6 +965,7 @@ impl Builder {
950965
executor: self.exec.clone(),
951966
h1_writev: self.h1_writev,
952967
h1_title_case_headers: self.h1_title_case_headers,
968+
h1_read_buf_exact_size: self.h1_read_buf_exact_size,
953969
pool: Pool::new(
954970
pool::Enabled(self.keep_alive),
955971
pool::IdleTimeout(self.keep_alive_timeout),
@@ -968,6 +984,7 @@ impl fmt::Debug for Builder {
968984
f.debug_struct("Builder")
969985
.field("keep_alive", &self.keep_alive)
970986
.field("keep_alive_timeout", &self.keep_alive_timeout)
987+
.field("http1_read_buf_exact_size", &self.h1_read_buf_exact_size)
971988
.field("http1_writev", &self.h1_writev)
972989
.field("max_idle_per_host", &self.max_idle_per_host)
973990
.field("set_host", &self.set_host)

src/proto/h1/conn.rs

+4
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ where I: AsyncRead + AsyncWrite,
6363
self.io.set_max_buf_size(max);
6464
}
6565

66+
pub fn set_read_buf_exact_size(&mut self, sz: usize) {
67+
self.io.set_read_buf_exact_size(sz);
68+
}
69+
6670
pub fn set_write_strategy_flatten(&mut self) {
6771
self.io.set_write_strategy_flatten();
6872
}

src/proto/h1/io.rs

+61-27
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ const MAX_BUF_LIST_BUFFERS: usize = 16;
3131
pub struct Buffered<T, B> {
3232
flush_pipeline: bool,
3333
io: T,
34-
max_buf_size: usize,
3534
read_blocked: bool,
3635
read_buf: BytesMut,
36+
read_buf_strategy: ReadStrategy,
3737
write_buf: WriteBuf<B>,
3838
}
3939

@@ -58,10 +58,12 @@ where
5858
Buffered {
5959
flush_pipeline: false,
6060
io: io,
61-
max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
61+
read_blocked: false,
6262
read_buf: BytesMut::with_capacity(0),
63+
read_buf_strategy: ReadStrategy::Adaptive {
64+
max: DEFAULT_MAX_BUFFER_SIZE,
65+
},
6366
write_buf: WriteBuf::new(),
64-
read_blocked: false,
6567
}
6668
}
6769

@@ -76,17 +78,24 @@ where
7678
pub fn set_max_buf_size(&mut self, max: usize) {
7779
assert!(
7880
max >= MINIMUM_MAX_BUFFER_SIZE,
79-
"The max_buf_size cannot be smaller than the initial buffer size."
81+
"The max_buf_size cannot be smaller than {}.",
82+
MINIMUM_MAX_BUFFER_SIZE,
8083
);
81-
self.max_buf_size = max;
84+
self.read_buf_strategy = ReadStrategy::Adaptive {
85+
max,
86+
};
8287
self.write_buf.max_buf_size = max;
8388
}
8489

90+
pub fn set_read_buf_exact_size(&mut self, sz: usize) {
91+
self.read_buf_strategy = ReadStrategy::Exact(sz);
92+
}
93+
8594
pub fn set_write_strategy_flatten(&mut self) {
8695
// this should always be called only at construction time,
8796
// so this assert is here to catch myself
8897
debug_assert!(self.write_buf.queue.bufs.is_empty());
89-
self.write_buf.set_strategy(Strategy::Flatten);
98+
self.write_buf.set_strategy(WriteStrategy::Flatten);
9099
}
91100

92101
pub fn read_buf(&self) -> &[u8] {
@@ -140,10 +149,18 @@ where
140149
debug!("parsed {} headers", msg.head.headers.len());
141150
return Ok(Async::Ready(msg))
142151
},
143-
None => {
144-
if self.read_buf.capacity() >= self.max_buf_size {
145-
debug!("max_buf_size ({}) reached, closing", self.max_buf_size);
146-
return Err(::Error::new_too_large());
152+
None => match self.read_buf_strategy {
153+
ReadStrategy::Adaptive { max } => {
154+
if self.read_buf.len() >= max {
155+
debug!("max_buf_size ({}) reached, closing", max);
156+
return Err(::Error::new_too_large());
157+
}
158+
},
159+
ReadStrategy::Exact(exact) => {
160+
if self.read_buf.len() >= exact {
161+
debug!("exact buf size ({}) filled, closing", exact);
162+
return Err(::Error::new_too_large());
163+
}
147164
}
148165
},
149166
}
@@ -160,8 +177,17 @@ where
160177
pub fn read_from_io(&mut self) -> Poll<usize, io::Error> {
161178
use bytes::BufMut;
162179
self.read_blocked = false;
163-
if self.read_buf.remaining_mut() < INIT_BUFFER_SIZE {
164-
self.read_buf.reserve(INIT_BUFFER_SIZE);
180+
match self.read_buf_strategy {
181+
ReadStrategy::Adaptive { .. } => {
182+
if self.read_buf.remaining_mut() < INIT_BUFFER_SIZE {
183+
self.read_buf.reserve(INIT_BUFFER_SIZE);
184+
}
185+
},
186+
ReadStrategy::Exact(exact) => {
187+
if self.read_buf.capacity() < exact {
188+
self.read_buf.reserve(exact);
189+
}
190+
},
165191
}
166192
self.io.read_buf(&mut self.read_buf).map(|ok| {
167193
match ok {
@@ -196,7 +222,7 @@ where
196222
try_nb!(self.io.flush());
197223
} else {
198224
match self.write_buf.strategy {
199-
Strategy::Flatten => return self.flush_flattened(),
225+
WriteStrategy::Flatten => return self.flush_flattened(),
200226
_ => (),
201227
}
202228
loop {
@@ -256,6 +282,14 @@ where
256282
}
257283
}
258284

285+
#[derive(Clone, Copy, Debug)]
286+
enum ReadStrategy {
287+
Adaptive {
288+
max: usize
289+
},
290+
Exact(usize),
291+
}
292+
259293
#[derive(Clone)]
260294
pub struct Cursor<T> {
261295
bytes: T,
@@ -313,7 +347,7 @@ pub(super) struct WriteBuf<B> {
313347
max_buf_size: usize,
314348
/// Deque of user buffers if strategy is Queue
315349
queue: BufDeque<B>,
316-
strategy: Strategy,
350+
strategy: WriteStrategy,
317351
}
318352

319353
impl<B> WriteBuf<B> {
@@ -322,7 +356,7 @@ impl<B> WriteBuf<B> {
322356
headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
323357
max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
324358
queue: BufDeque::new(),
325-
strategy: Strategy::Auto,
359+
strategy: WriteStrategy::Auto,
326360
}
327361
}
328362
}
@@ -332,7 +366,7 @@ impl<B> WriteBuf<B>
332366
where
333367
B: Buf,
334368
{
335-
fn set_strategy(&mut self, strategy: Strategy) {
369+
fn set_strategy(&mut self, strategy: WriteStrategy) {
336370
self.strategy = strategy;
337371
}
338372

@@ -344,7 +378,7 @@ where
344378
pub(super) fn buffer<BB: Buf + Into<B>>(&mut self, mut buf: BB) {
345379
debug_assert!(buf.has_remaining());
346380
match self.strategy {
347-
Strategy::Flatten => {
381+
WriteStrategy::Flatten => {
348382
let head = self.headers_mut();
349383
//perf: This is a little faster than <Vec as BufMut>>::put,
350384
//but accomplishes the same result.
@@ -360,18 +394,18 @@ where
360394
buf.advance(adv);
361395
}
362396
},
363-
Strategy::Auto | Strategy::Queue => {
397+
WriteStrategy::Auto | WriteStrategy::Queue => {
364398
self.queue.bufs.push_back(buf.into());
365399
},
366400
}
367401
}
368402

369403
fn can_buffer(&self) -> bool {
370404
match self.strategy {
371-
Strategy::Flatten => {
405+
WriteStrategy::Flatten => {
372406
self.remaining() < self.max_buf_size
373407
},
374-
Strategy::Auto | Strategy::Queue => {
408+
WriteStrategy::Auto | WriteStrategy::Queue => {
375409
self.queue.bufs.len() < MAX_BUF_LIST_BUFFERS
376410
&& self.remaining() < self.max_buf_size
377411
},
@@ -474,12 +508,12 @@ impl<'a, B: Buf> Buf for WriteBufAuto<'a, B> {
474508

475509
impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> {
476510
fn drop(&mut self) {
477-
if let Strategy::Auto = self.inner.strategy {
511+
if let WriteStrategy::Auto = self.inner.strategy {
478512
if self.bytes_vec_called.get() {
479-
self.inner.strategy = Strategy::Queue;
513+
self.inner.strategy = WriteStrategy::Queue;
480514
} else if self.bytes_called.get() {
481515
trace!("detected no usage of vectored write, flattening");
482-
self.inner.strategy = Strategy::Flatten;
516+
self.inner.strategy = WriteStrategy::Flatten;
483517
self.inner.headers.bytes.put(&mut self.inner.queue);
484518
}
485519
}
@@ -488,7 +522,7 @@ impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> {
488522

489523

490524
#[derive(Debug)]
491-
enum Strategy {
525+
enum WriteStrategy {
492526
Auto,
493527
Flatten,
494528
Queue,
@@ -640,7 +674,7 @@ mod tests {
640674

641675
let mock = AsyncIo::new_buf(vec![], 1024);
642676
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
643-
buffered.write_buf.set_strategy(Strategy::Flatten);
677+
buffered.write_buf.set_strategy(WriteStrategy::Flatten);
644678

645679
buffered.headers_buf().extend(b"hello ");
646680
buffered.buffer(Cursor::new(b"world, ".to_vec()));
@@ -686,7 +720,7 @@ mod tests {
686720
let mut mock = AsyncIo::new_buf(vec![], 1024);
687721
mock.max_read_vecs(0); // disable vectored IO
688722
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
689-
buffered.write_buf.set_strategy(Strategy::Queue);
723+
buffered.write_buf.set_strategy(WriteStrategy::Queue);
690724

691725
// we have 4 buffers, and vec IO disabled, but explicitly said
692726
// don't try to auto detect (via setting strategy above)
@@ -710,7 +744,7 @@ mod tests {
710744
b.bytes = s.len() as u64;
711745

712746
let mut write_buf = WriteBuf::<::Chunk>::new();
713-
write_buf.set_strategy(Strategy::Flatten);
747+
write_buf.set_strategy(WriteStrategy::Flatten);
714748
b.iter(|| {
715749
let chunk = ::Chunk::from(s);
716750
write_buf.buffer(chunk);

0 commit comments

Comments
 (0)