Skip to content

Commit cbaeaf3

Browse files
committed
Add ability to adjust INITIAL_WINDOW_SIZE setting on an existing connection
1 parent 367206b commit cbaeaf3

File tree

10 files changed

+424
-35
lines changed

10 files changed

+424
-35
lines changed

src/client.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,6 +1201,25 @@ where
12011201
self.inner.set_target_window_size(size);
12021202
}
12031203

1204+
/// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
1205+
/// flow control for received data.
1206+
///
1207+
/// The `SETTINGS` will be sent to the remote, and only applied once the
1208+
/// remote acknowledges the change.
1209+
///
1210+
/// This can be used to increase or decrease the window size for existing
1211+
/// streams.
1212+
///
1213+
/// # Errors
1214+
///
1215+
/// Returns an error if a previous call is still pending acknowledgement
1216+
/// from the remote endpoint.
1217+
pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
1218+
assert!(size <= proto::MAX_WINDOW_SIZE);
1219+
self.inner.set_initial_window_size(size)?;
1220+
Ok(())
1221+
}
1222+
12041223
/// Takes a `PingPong` instance from the connection.
12051224
///
12061225
/// # Note

src/codec/error.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ pub enum UserError {
6060

6161
/// Calls `PingPong::send_ping` before receiving a pong.
6262
SendPingWhilePending,
63+
64+
/// Tries to update local SETTINGS while ACK has not been received.
65+
SendSettingsWhilePending,
6366
}
6467

6568
// ===== impl RecvError =====
@@ -70,24 +73,14 @@ impl From<io::Error> for RecvError {
7073
}
7174
}
7275

73-
impl error::Error for RecvError {
74-
fn description(&self) -> &str {
75-
use self::RecvError::*;
76-
77-
match *self {
78-
Connection(ref reason) => reason.description(),
79-
Stream { ref reason, .. } => reason.description(),
80-
Io(ref e) => e.description(),
81-
}
82-
}
83-
}
84-
76+
/*
8577
impl fmt::Display for RecvError {
8678
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
8779
use std::error::Error;
8880
write!(fmt, "{}", self.description())
8981
}
9082
}
83+
*/
9184

9285
// ===== impl SendError =====
9386

@@ -140,6 +133,7 @@ impl error::Error for UserError {
140133
MissingUriSchemeAndAuthority => "request URI missing scheme and authority",
141134
PollResetAfterSendResponse => "poll_reset after send_response is illegal",
142135
SendPingWhilePending => "send_ping before received previous pong",
136+
SendSettingsWhilePending => "sending SETTINGS before received previous ACK",
143137
}
144138
}
145139
}

src/proto/connection.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::codec::RecvError;
1+
use crate::codec::{RecvError, UserError};
22
use crate::frame::{Reason, StreamId};
33
use crate::{client, frame, proto, server};
44

@@ -99,16 +99,25 @@ where
9999
codec,
100100
go_away: GoAway::new(),
101101
ping_pong: PingPong::new(),
102-
settings: Settings::new(),
102+
settings: Settings::new(config.settings),
103103
streams,
104104
_phantom: PhantomData,
105105
}
106106
}
107107

108-
pub fn set_target_window_size(&mut self, size: WindowSize) {
108+
109+
/// connection flow control
110+
pub(crate) fn set_target_window_size(&mut self, size: WindowSize) {
109111
self.streams.set_target_connection_window_size(size);
110112
}
111113

114+
/// Send a new SETTINGS frame with an updated initial window size.
115+
pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> {
116+
let mut settings = frame::Settings::default();
117+
settings.set_initial_window_size(Some(size));
118+
self.settings.send_settings(settings)
119+
}
120+
112121
/// Returns `Ready` when the connection is ready to receive a frame.
113122
///
114123
/// Returns `RecvError` as this may raise errors that are caused by delayed
@@ -119,7 +128,7 @@ where
119128
ready!(self.ping_pong.send_pending_ping(cx, &mut self.codec))?;
120129
ready!(self
121130
.settings
122-
.send_pending_ack(cx, &mut self.codec, &mut self.streams))?;
131+
.poll_send(cx, &mut self.codec, &mut self.streams))?;
123132
ready!(self.streams.send_pending_refusal(cx, &mut self.codec))?;
124133

125134
Poll::Ready(Ok(()))
@@ -327,7 +336,7 @@ where
327336
}
328337
Some(Settings(frame)) => {
329338
log::trace!("recv SETTINGS; frame={:?}", frame);
330-
self.settings.recv_settings(frame);
339+
self.settings.recv_settings(frame, &mut self.codec, &mut self.streams)?;
331340
}
332341
Some(GoAway(frame)) => {
333342
log::trace!("recv GOAWAY; frame={:?}", frame);

src/proto/settings.rs

Lines changed: 95 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,100 @@
1-
use crate::codec::RecvError;
1+
use crate::codec::{RecvError, UserError};
2+
use crate::error::Reason;
23
use crate::frame;
34
use crate::proto::*;
45
use std::task::{Context, Poll};
56

67
#[derive(Debug)]
78
pub(crate) struct Settings {
9+
/// Our local SETTINGS sync state with the remote.
10+
local: Local,
811
/// Received SETTINGS frame pending processing. The ACK must be written to
912
/// the socket first then the settings applied **before** receiving any
1013
/// further frames.
11-
pending: Option<frame::Settings>,
14+
remote: Option<frame::Settings>,
15+
}
16+
17+
#[derive(Debug)]
18+
enum Local {
19+
/// We want to send these SETTINGS to the remote when the socket is ready.
20+
ToSend(frame::Settings),
21+
/// We have sent these SETTINGS and are waiting for the remote to ACK
22+
/// before we apply them.
23+
WaitingAck(frame::Settings),
24+
/// Our local settings are in sync with the remote.
25+
Synced,
1226
}
1327

1428
impl Settings {
15-
pub fn new() -> Self {
16-
Settings { pending: None }
29+
pub(crate) fn new(local: frame::Settings) -> Self {
30+
Settings {
31+
// We assume the initial local SETTINGS were flushed during
32+
// the handshake process.
33+
local: Local::WaitingAck(local),
34+
remote: None,
35+
}
1736
}
1837

19-
pub fn recv_settings(&mut self, frame: frame::Settings) {
38+
pub(crate) fn recv_settings<T, B, C, P>(
39+
&mut self,
40+
frame: frame::Settings,
41+
codec: &mut Codec<T, B>,
42+
streams: &mut Streams<C, P>,
43+
) -> Result<(), RecvError>
44+
where
45+
T: AsyncWrite + Unpin,
46+
B: Buf + Unpin,
47+
C: Buf + Unpin,
48+
P: Peer,
49+
{
2050
if frame.is_ack() {
21-
log::debug!("received remote settings ack");
22-
// TODO: handle acks
51+
match &self.local {
52+
Local::WaitingAck(local) => {
53+
log::debug!("received settings ACK; applying {:?}", local);
54+
55+
if let Some(max) = local.max_frame_size() {
56+
codec.set_max_recv_frame_size(max as usize);
57+
}
58+
59+
if let Some(max) = local.max_header_list_size() {
60+
codec.set_max_recv_header_list_size(max as usize);
61+
}
62+
63+
streams.apply_local_settings(local);
64+
self.local = Local::Synced;
65+
Ok(())
66+
},
67+
Local::ToSend(..) | Local::Synced => {
68+
// We haven't sent any SETTINGS frames to be ACKed, so
69+
// this is very bizarre! Remote is either buggy or malicious.
70+
proto_err!(conn: "received unexpected settings ack");
71+
Err(RecvError::Connection(Reason::PROTOCOL_ERROR))
72+
}
73+
}
2374
} else {
24-
assert!(self.pending.is_none());
25-
self.pending = Some(frame);
75+
// We always ACK before reading more frames, so `remote` should
76+
// always be none!
77+
assert!(self.remote.is_none());
78+
self.remote = Some(frame);
79+
Ok(())
80+
}
81+
}
82+
83+
pub(crate) fn send_settings(&mut self, frame: frame::Settings) -> Result<(), UserError> {
84+
assert!(!frame.is_ack());
85+
match &self.local {
86+
Local::ToSend(..) | Local::WaitingAck(..) => {
87+
Err(UserError::SendSettingsWhilePending)
88+
},
89+
Local::Synced => {
90+
log::trace!("queue to send local settings: {:?}", frame);
91+
self.local = Local::ToSend(frame);
92+
Ok(())
93+
},
2694
}
2795
}
2896

29-
pub fn send_pending_ack<T, B, C, P>(
97+
pub(crate) fn poll_send<T, B, C, P>(
3098
&mut self,
3199
cx: &mut Context,
32100
dst: &mut Codec<T, B>,
@@ -38,11 +106,8 @@ impl Settings {
38106
C: Buf + Unpin,
39107
P: Peer,
40108
{
41-
log::trace!("send_pending_ack; pending={:?}", self.pending);
42-
43-
if let Some(settings) = &self.pending {
109+
if let Some(settings) = &self.remote {
44110
if !dst.poll_ready(cx)?.is_ready() {
45-
log::trace!("failed to send ACK");
46111
return Poll::Pending;
47112
}
48113

@@ -61,7 +126,22 @@ impl Settings {
61126
streams.apply_remote_settings(settings)?;
62127
}
63128

64-
self.pending = None;
129+
self.remote = None;
130+
131+
match &self.local {
132+
Local::ToSend(settings) => {
133+
if !dst.poll_ready(cx)?.is_ready() {
134+
return Poll::Pending;
135+
}
136+
137+
// Buffer the settings frame
138+
dst.buffer(settings.clone().into()).expect("invalid settings frame");
139+
log::trace!("local settings sent; waiting for ack: {:?}", settings);
140+
141+
self.local = Local::WaitingAck(settings.clone());
142+
},
143+
Local::WaitingAck(..) | Local::Synced => {},
144+
}
65145

66146
Poll::Ready(Ok(()))
67147
}

src/proto/streams/flow_control.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,11 @@ impl FlowControl {
131131
Ok(())
132132
}
133133

134-
/// Decrement the window size.
134+
/// Decrement the send-side window size.
135135
///
136136
/// This is called after receiving a SETTINGS frame with a lower
137137
/// INITIAL_WINDOW_SIZE value.
138-
pub fn dec_window(&mut self, sz: WindowSize) {
138+
pub fn dec_send_window(&mut self, sz: WindowSize) {
139139
log::trace!(
140140
"dec_window; sz={}; window={}, available={}",
141141
sz,
@@ -146,6 +146,22 @@ impl FlowControl {
146146
self.window_size -= sz;
147147
}
148148

149+
/// Decrement the recv-side window size.
150+
///
151+
/// This is called after receiving a SETTINGS ACK frame with a lower
152+
/// INITIAL_WINDOW_SIZE value.
153+
pub fn dec_recv_window(&mut self, sz: WindowSize) {
154+
log::trace!(
155+
"dec_recv_window; sz={}; window={}, available={}",
156+
sz,
157+
self.window_size,
158+
self.available
159+
);
160+
// This should not be able to overflow `window_size` from the bottom.
161+
self.window_size -= sz;
162+
self.available -= sz;
163+
}
164+
149165
/// Decrements the window reflecting data has actually been sent. The caller
150166
/// must ensure that the window has capacity.
151167
pub fn send_data(&mut self, sz: WindowSize) {

src/proto/streams/recv.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,70 @@ impl Recv {
455455
}
456456
}
457457

458+
pub(crate) fn apply_local_settings(
459+
&mut self,
460+
settings: &frame::Settings,
461+
store: &mut Store,
462+
) -> Result<(), RecvError> {
463+
464+
let target = if let Some(val) = settings.initial_window_size() {
465+
val
466+
} else {
467+
return Ok(());
468+
};
469+
470+
let old_sz = self.init_window_sz;
471+
self.init_window_sz = target;
472+
473+
log::trace!(
474+
"update_initial_window_size; new={}; old={}",
475+
target,
476+
old_sz,
477+
);
478+
479+
480+
// Per RFC 7540 §6.9.2:
481+
//
482+
// In addition to changing the flow-control window for streams that are
483+
// not yet active, a SETTINGS frame can alter the initial flow-control
484+
// window size for streams with active flow-control windows (that is,
485+
// streams in the "open" or "half-closed (remote)" state). When the
486+
// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
487+
// the size of all stream flow-control windows that it maintains by the
488+
// difference between the new value and the old value.
489+
//
490+
// A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
491+
// space in a flow-control window to become negative. A sender MUST
492+
// track the negative flow-control window and MUST NOT send new
493+
// flow-controlled frames until it receives WINDOW_UPDATE frames that
494+
// cause the flow-control window to become positive.
495+
496+
if target < old_sz {
497+
// We must decrease the (local) window on every open stream.
498+
let dec = old_sz - target;
499+
log::trace!("decrementing all windows; dec={}", dec);
500+
501+
store.for_each(|mut stream| {
502+
stream.recv_flow.dec_recv_window(dec);
503+
Ok(())
504+
})
505+
} else if target > old_sz {
506+
// We must increase the (local) window on every open stream.
507+
let inc = target - old_sz;
508+
log::trace!("incrementing all windows; inc={}", inc);
509+
store.for_each(|mut stream| {
510+
// XXX: Shouldn't the peer have already noticed our
511+
// overflow and sent us a GOAWAY?
512+
stream.recv_flow.inc_window(inc).map_err(RecvError::Connection)?;
513+
stream.recv_flow.assign_capacity(inc);
514+
Ok(())
515+
})
516+
} else {
517+
// size is the same... so do nothing
518+
Ok(())
519+
}
520+
}
521+
458522
pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
459523
if !stream.state.is_recv_closed() {
460524
return false;

src/proto/streams/send.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ impl Send {
419419
store.for_each(|mut stream| {
420420
let stream = &mut *stream;
421421

422-
stream.send_flow.dec_window(dec);
422+
stream.send_flow.dec_send_window(dec);
423423

424424
// It's possible that decreasing the window causes
425425
// `window_size` (the stream-specific window) to fall below

0 commit comments

Comments
 (0)