Skip to content

Commit e050b04

Browse files
seanmonstargila
authored andcommitted
Add ability to adjust INITIAL_WINDOW_SIZE setting on an existing connection (hyperium#421)
1 parent 139053a commit e050b04

File tree

10 files changed

+435
-23
lines changed

10 files changed

+435
-23
lines changed

src/client.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,6 +1201,25 @@ where
12011201
self.inner.set_target_window_size(size);
12021202
}
12031203

1204+
/// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
1205+
/// flow control for received data.
1206+
///
1207+
/// The `SETTINGS` will be sent to the remote, and only applied once the
1208+
/// remote acknowledges the change.
1209+
///
1210+
/// This can be used to increase or decrease the window size for existing
1211+
/// streams.
1212+
///
1213+
/// # Errors
1214+
///
1215+
/// Returns an error if a previous call is still pending acknowledgement
1216+
/// from the remote endpoint.
1217+
pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
1218+
assert!(size <= proto::MAX_WINDOW_SIZE);
1219+
self.inner.set_initial_window_size(size)?;
1220+
Ok(())
1221+
}
1222+
12041223
/// Takes a `PingPong` instance from the connection.
12051224
///
12061225
/// # Note

src/codec/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ pub enum UserError {
6060

6161
/// Calls `PingPong::send_ping` before receiving a pong.
6262
SendPingWhilePending,
63+
64+
/// Tries to update local SETTINGS while ACK has not been received.
65+
SendSettingsWhilePending,
6366
}
6467

6568
// ===== impl RecvError =====
@@ -140,6 +143,7 @@ impl error::Error for UserError {
140143
MissingUriSchemeAndAuthority => "request URI missing scheme and authority",
141144
PollResetAfterSendResponse => "poll_reset after send_response is illegal",
142145
SendPingWhilePending => "send_ping before received previous pong",
146+
SendSettingsWhilePending => "sending SETTINGS before received previous ACK",
143147
}
144148
}
145149
}

src/proto/connection.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::codec::RecvError;
1+
use crate::codec::{RecvError, UserError};
22
use crate::frame::{Reason, StreamId};
33
use crate::{client, frame, proto, server};
44

@@ -99,16 +99,24 @@ where
9999
codec,
100100
go_away: GoAway::new(),
101101
ping_pong: PingPong::new(),
102-
settings: Settings::new(),
102+
settings: Settings::new(config.settings),
103103
streams,
104104
_phantom: PhantomData,
105105
}
106106
}
107107

108-
pub fn set_target_window_size(&mut self, size: WindowSize) {
108+
/// connection flow control
109+
pub(crate) fn set_target_window_size(&mut self, size: WindowSize) {
109110
self.streams.set_target_connection_window_size(size);
110111
}
111112

113+
/// Send a new SETTINGS frame with an updated initial window size.
114+
pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> {
115+
let mut settings = frame::Settings::default();
116+
settings.set_initial_window_size(Some(size));
117+
self.settings.send_settings(settings)
118+
}
119+
112120
/// Returns `Ready` when the connection is ready to receive a frame.
113121
///
114122
/// Returns `RecvError` as this may raise errors that are caused by delayed
@@ -119,7 +127,7 @@ where
119127
ready!(self.ping_pong.send_pending_ping(cx, &mut self.codec))?;
120128
ready!(self
121129
.settings
122-
.send_pending_ack(cx, &mut self.codec, &mut self.streams))?;
130+
.poll_send(cx, &mut self.codec, &mut self.streams))?;
123131
ready!(self.streams.send_pending_refusal(cx, &mut self.codec))?;
124132

125133
Poll::Ready(Ok(()))
@@ -327,7 +335,8 @@ where
327335
}
328336
Some(Settings(frame)) => {
329337
log::trace!("recv SETTINGS; frame={:?}", frame);
330-
self.settings.recv_settings(frame);
338+
self.settings
339+
.recv_settings(frame, &mut self.codec, &mut self.streams)?;
331340
}
332341
Some(GoAway(frame)) => {
333342
log::trace!("recv GOAWAY; frame={:?}", frame);

src/proto/settings.rs

Lines changed: 94 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,98 @@
1-
use crate::codec::RecvError;
1+
use crate::codec::{RecvError, UserError};
2+
use crate::error::Reason;
23
use crate::frame;
34
use crate::proto::*;
45
use std::task::{Context, Poll};
56

67
#[derive(Debug)]
78
pub(crate) struct Settings {
9+
/// Our local SETTINGS sync state with the remote.
10+
local: Local,
811
/// Received SETTINGS frame pending processing. The ACK must be written to
912
/// the socket first then the settings applied **before** receiving any
1013
/// further frames.
11-
pending: Option<frame::Settings>,
14+
remote: Option<frame::Settings>,
15+
}
16+
17+
#[derive(Debug)]
18+
enum Local {
19+
/// We want to send these SETTINGS to the remote when the socket is ready.
20+
ToSend(frame::Settings),
21+
/// We have sent these SETTINGS and are waiting for the remote to ACK
22+
/// before we apply them.
23+
WaitingAck(frame::Settings),
24+
/// Our local settings are in sync with the remote.
25+
Synced,
1226
}
1327

1428
impl Settings {
15-
pub fn new() -> Self {
16-
Settings { pending: None }
29+
pub(crate) fn new(local: frame::Settings) -> Self {
30+
Settings {
31+
// We assume the initial local SETTINGS were flushed during
32+
// the handshake process.
33+
local: Local::WaitingAck(local),
34+
remote: None,
35+
}
1736
}
1837

19-
pub fn recv_settings(&mut self, frame: frame::Settings) {
38+
pub(crate) fn recv_settings<T, B, C, P>(
39+
&mut self,
40+
frame: frame::Settings,
41+
codec: &mut Codec<T, B>,
42+
streams: &mut Streams<C, P>,
43+
) -> Result<(), RecvError>
44+
where
45+
T: AsyncWrite + Unpin,
46+
B: Buf + Unpin,
47+
C: Buf + Unpin,
48+
P: Peer,
49+
{
2050
if frame.is_ack() {
21-
log::debug!("received remote settings ack");
22-
// TODO: handle acks
51+
match &self.local {
52+
Local::WaitingAck(local) => {
53+
log::debug!("received settings ACK; applying {:?}", local);
54+
55+
if let Some(max) = local.max_frame_size() {
56+
codec.set_max_recv_frame_size(max as usize);
57+
}
58+
59+
if let Some(max) = local.max_header_list_size() {
60+
codec.set_max_recv_header_list_size(max as usize);
61+
}
62+
63+
streams.apply_local_settings(local)?;
64+
self.local = Local::Synced;
65+
Ok(())
66+
}
67+
Local::ToSend(..) | Local::Synced => {
68+
// We haven't sent any SETTINGS frames to be ACKed, so
69+
// this is very bizarre! Remote is either buggy or malicious.
70+
proto_err!(conn: "received unexpected settings ack");
71+
Err(RecvError::Connection(Reason::PROTOCOL_ERROR))
72+
}
73+
}
2374
} else {
24-
assert!(self.pending.is_none());
25-
self.pending = Some(frame);
75+
// We always ACK before reading more frames, so `remote` should
76+
// always be none!
77+
assert!(self.remote.is_none());
78+
self.remote = Some(frame);
79+
Ok(())
80+
}
81+
}
82+
83+
pub(crate) fn send_settings(&mut self, frame: frame::Settings) -> Result<(), UserError> {
84+
assert!(!frame.is_ack());
85+
match &self.local {
86+
Local::ToSend(..) | Local::WaitingAck(..) => Err(UserError::SendSettingsWhilePending),
87+
Local::Synced => {
88+
log::trace!("queue to send local settings: {:?}", frame);
89+
self.local = Local::ToSend(frame);
90+
Ok(())
91+
}
2692
}
2793
}
2894

29-
pub fn send_pending_ack<T, B, C, P>(
95+
pub(crate) fn poll_send<T, B, C, P>(
3096
&mut self,
3197
cx: &mut Context,
3298
dst: &mut Codec<T, B>,
@@ -38,11 +104,8 @@ impl Settings {
38104
C: Buf + Unpin,
39105
P: Peer,
40106
{
41-
log::trace!("send_pending_ack; pending={:?}", self.pending);
42-
43-
if let Some(settings) = &self.pending {
107+
if let Some(settings) = &self.remote {
44108
if !dst.poll_ready(cx)?.is_ready() {
45-
log::trace!("failed to send ACK");
46109
return Poll::Pending;
47110
}
48111

@@ -61,7 +124,23 @@ impl Settings {
61124
streams.apply_remote_settings(settings)?;
62125
}
63126

64-
self.pending = None;
127+
self.remote = None;
128+
129+
match &self.local {
130+
Local::ToSend(settings) => {
131+
if !dst.poll_ready(cx)?.is_ready() {
132+
return Poll::Pending;
133+
}
134+
135+
// Buffer the settings frame
136+
dst.buffer(settings.clone().into())
137+
.expect("invalid settings frame");
138+
log::trace!("local settings sent; waiting for ack: {:?}", settings);
139+
140+
self.local = Local::WaitingAck(settings.clone());
141+
}
142+
Local::WaitingAck(..) | Local::Synced => {}
143+
}
65144

66145
Poll::Ready(Ok(()))
67146
}

src/proto/streams/flow_control.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,11 @@ impl FlowControl {
131131
Ok(())
132132
}
133133

134-
/// Decrement the window size.
134+
/// Decrement the send-side window size.
135135
///
136136
/// This is called after receiving a SETTINGS frame with a lower
137137
/// INITIAL_WINDOW_SIZE value.
138-
pub fn dec_window(&mut self, sz: WindowSize) {
138+
pub fn dec_send_window(&mut self, sz: WindowSize) {
139139
log::trace!(
140140
"dec_window; sz={}; window={}, available={}",
141141
sz,
@@ -146,6 +146,22 @@ impl FlowControl {
146146
self.window_size -= sz;
147147
}
148148

149+
/// Decrement the recv-side window size.
150+
///
151+
/// This is called after receiving a SETTINGS ACK frame with a lower
152+
/// INITIAL_WINDOW_SIZE value.
153+
pub fn dec_recv_window(&mut self, sz: WindowSize) {
154+
log::trace!(
155+
"dec_recv_window; sz={}; window={}, available={}",
156+
sz,
157+
self.window_size,
158+
self.available
159+
);
160+
// This should not be able to overflow `window_size` from the bottom.
161+
self.window_size -= sz;
162+
self.available -= sz;
163+
}
164+
149165
/// Decrements the window reflecting data has actually been sent. The caller
150166
/// must ensure that the window has capacity.
151167
pub fn send_data(&mut self, sz: WindowSize) {

src/proto/streams/recv.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,67 @@ impl Recv {
455455
}
456456
}
457457

458+
pub(crate) fn apply_local_settings(
459+
&mut self,
460+
settings: &frame::Settings,
461+
store: &mut Store,
462+
) -> Result<(), RecvError> {
463+
let target = if let Some(val) = settings.initial_window_size() {
464+
val
465+
} else {
466+
return Ok(());
467+
};
468+
469+
let old_sz = self.init_window_sz;
470+
self.init_window_sz = target;
471+
472+
log::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
473+
474+
// Per RFC 7540 §6.9.2:
475+
//
476+
// In addition to changing the flow-control window for streams that are
477+
// not yet active, a SETTINGS frame can alter the initial flow-control
478+
// window size for streams with active flow-control windows (that is,
479+
// streams in the "open" or "half-closed (remote)" state). When the
480+
// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
481+
// the size of all stream flow-control windows that it maintains by the
482+
// difference between the new value and the old value.
483+
//
484+
// A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
485+
// space in a flow-control window to become negative. A sender MUST
486+
// track the negative flow-control window and MUST NOT send new
487+
// flow-controlled frames until it receives WINDOW_UPDATE frames that
488+
// cause the flow-control window to become positive.
489+
490+
if target < old_sz {
491+
// We must decrease the (local) window on every open stream.
492+
let dec = old_sz - target;
493+
log::trace!("decrementing all windows; dec={}", dec);
494+
495+
store.for_each(|mut stream| {
496+
stream.recv_flow.dec_recv_window(dec);
497+
Ok(())
498+
})
499+
} else if target > old_sz {
500+
// We must increase the (local) window on every open stream.
501+
let inc = target - old_sz;
502+
log::trace!("incrementing all windows; inc={}", inc);
503+
store.for_each(|mut stream| {
504+
// XXX: Shouldn't the peer have already noticed our
505+
// overflow and sent us a GOAWAY?
506+
stream
507+
.recv_flow
508+
.inc_window(inc)
509+
.map_err(RecvError::Connection)?;
510+
stream.recv_flow.assign_capacity(inc);
511+
Ok(())
512+
})
513+
} else {
514+
// size is the same... so do nothing
515+
Ok(())
516+
}
517+
}
518+
458519
pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
459520
if !stream.state.is_recv_closed() {
460521
return false;

src/proto/streams/send.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ impl Send {
419419
store.for_each(|mut stream| {
420420
let stream = &mut *stream;
421421

422-
stream.send_flow.dec_window(dec);
422+
stream.send_flow.dec_send_window(dec);
423423

424424
// It's possible that decreasing the window causes
425425
// `window_size` (the stream-specific window) to fall below

src/proto/streams/streams.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,13 @@ where
664664
)
665665
}
666666

667+
pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), RecvError> {
668+
let mut me = self.inner.lock().unwrap();
669+
let me = &mut *me;
670+
671+
me.actions.recv.apply_local_settings(frame, &mut me.store)
672+
}
673+
667674
pub fn send_request(
668675
&mut self,
669676
request: Request<()>,

src/server.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,25 @@ where
441441
self.connection.set_target_window_size(size);
442442
}
443443

444+
/// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
445+
/// flow control for received data.
446+
///
447+
/// The `SETTINGS` will be sent to the remote, and only applied once the
448+
/// remote acknowledges the change.
449+
///
450+
/// This can be used to increase or decrease the window size for existing
451+
/// streams.
452+
///
453+
/// # Errors
454+
///
455+
/// Returns an error if a previous call is still pending acknowledgement
456+
/// from the remote endpoint.
457+
pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
458+
assert!(size <= proto::MAX_WINDOW_SIZE);
459+
self.connection.set_initial_window_size(size)?;
460+
Ok(())
461+
}
462+
444463
/// Returns `Ready` when the underlying connection has closed.
445464
///
446465
/// If any new inbound streams are received during a call to `poll_closed`,

0 commit comments

Comments
 (0)