Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,25 @@ where
self.inner.set_target_window_size(size);
}

/// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
/// flow control for received data.
///
/// The `SETTINGS` will be sent to the remote, and only applied once the
/// remote acknowledges the change.
///
/// This can be used to increase or decrease the window size for existing
/// streams.
///
/// # Errors
///
/// Returns an error if a previous call is still pending acknowledgement
/// from the remote endpoint.
pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
assert!(size <= proto::MAX_WINDOW_SIZE);
self.inner.set_initial_window_size(size)?;
Ok(())
}

/// Takes a `PingPong` instance from the connection.
///
/// # Note
Expand Down
4 changes: 4 additions & 0 deletions src/codec/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ pub enum UserError {

/// Calls `PingPong::send_ping` before receiving a pong.
SendPingWhilePending,

/// Tries to update local SETTINGS while ACK has not been received.
SendSettingsWhilePending,
}

// ===== impl RecvError =====
Expand Down Expand Up @@ -140,6 +143,7 @@ impl error::Error for UserError {
MissingUriSchemeAndAuthority => "request URI missing scheme and authority",
PollResetAfterSendResponse => "poll_reset after send_response is illegal",
SendPingWhilePending => "send_ping before received previous pong",
SendSettingsWhilePending => "sending SETTINGS before received previous ACK",
}
}
}
Expand Down
19 changes: 14 additions & 5 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::codec::RecvError;
use crate::codec::{RecvError, UserError};
use crate::frame::{Reason, StreamId};
use crate::{client, frame, proto, server};

Expand Down Expand Up @@ -99,16 +99,24 @@ where
codec,
go_away: GoAway::new(),
ping_pong: PingPong::new(),
settings: Settings::new(),
settings: Settings::new(config.settings),
streams,
_phantom: PhantomData,
}
}

pub fn set_target_window_size(&mut self, size: WindowSize) {
/// connection flow control
pub(crate) fn set_target_window_size(&mut self, size: WindowSize) {
self.streams.set_target_connection_window_size(size);
}

/// Send a new SETTINGS frame with an updated initial window size.
pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> {
let mut settings = frame::Settings::default();
settings.set_initial_window_size(Some(size));
self.settings.send_settings(settings)
}

/// Returns `Ready` when the connection is ready to receive a frame.
///
/// Returns `RecvError` as this may raise errors that are caused by delayed
Expand All @@ -119,7 +127,7 @@ where
ready!(self.ping_pong.send_pending_ping(cx, &mut self.codec))?;
ready!(self
.settings
.send_pending_ack(cx, &mut self.codec, &mut self.streams))?;
.poll_send(cx, &mut self.codec, &mut self.streams))?;
ready!(self.streams.send_pending_refusal(cx, &mut self.codec))?;

Poll::Ready(Ok(()))
Expand Down Expand Up @@ -327,7 +335,8 @@ where
}
Some(Settings(frame)) => {
log::trace!("recv SETTINGS; frame={:?}", frame);
self.settings.recv_settings(frame);
self.settings
.recv_settings(frame, &mut self.codec, &mut self.streams)?;
}
Some(GoAway(frame)) => {
log::trace!("recv GOAWAY; frame={:?}", frame);
Expand Down
109 changes: 94 additions & 15 deletions src/proto/settings.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,98 @@
use crate::codec::RecvError;
use crate::codec::{RecvError, UserError};
use crate::error::Reason;
use crate::frame;
use crate::proto::*;
use std::task::{Context, Poll};

#[derive(Debug)]
pub(crate) struct Settings {
/// Our local SETTINGS sync state with the remote.
local: Local,
/// Received SETTINGS frame pending processing. The ACK must be written to
/// the socket first then the settings applied **before** receiving any
/// further frames.
pending: Option<frame::Settings>,
remote: Option<frame::Settings>,
}

#[derive(Debug)]
enum Local {
/// We want to send these SETTINGS to the remote when the socket is ready.
ToSend(frame::Settings),
/// We have sent these SETTINGS and are waiting for the remote to ACK
/// before we apply them.
WaitingAck(frame::Settings),
/// Our local settings are in sync with the remote.
Synced,
}

impl Settings {
pub fn new() -> Self {
Settings { pending: None }
pub(crate) fn new(local: frame::Settings) -> Self {
Settings {
// We assume the initial local SETTINGS were flushed during
// the handshake process.
local: Local::WaitingAck(local),
remote: None,
}
}

pub fn recv_settings(&mut self, frame: frame::Settings) {
pub(crate) fn recv_settings<T, B, C, P>(
&mut self,
frame: frame::Settings,
codec: &mut Codec<T, B>,
streams: &mut Streams<C, P>,
) -> Result<(), RecvError>
where
T: AsyncWrite + Unpin,
B: Buf + Unpin,
C: Buf + Unpin,
P: Peer,
{
if frame.is_ack() {
log::debug!("received remote settings ack");
// TODO: handle acks
match &self.local {
Local::WaitingAck(local) => {
log::debug!("received settings ACK; applying {:?}", local);

if let Some(max) = local.max_frame_size() {
codec.set_max_recv_frame_size(max as usize);
}

if let Some(max) = local.max_header_list_size() {
codec.set_max_recv_header_list_size(max as usize);
}

streams.apply_local_settings(local)?;
self.local = Local::Synced;
Ok(())
}
Local::ToSend(..) | Local::Synced => {
// We haven't sent any SETTINGS frames to be ACKed, so
// this is very bizarre! Remote is either buggy or malicious.
proto_err!(conn: "received unexpected settings ack");
Err(RecvError::Connection(Reason::PROTOCOL_ERROR))
}
}
} else {
assert!(self.pending.is_none());
self.pending = Some(frame);
// We always ACK before reading more frames, so `remote` should
// always be none!
assert!(self.remote.is_none());
self.remote = Some(frame);
Ok(())
}
}

pub(crate) fn send_settings(&mut self, frame: frame::Settings) -> Result<(), UserError> {
assert!(!frame.is_ack());
match &self.local {
Local::ToSend(..) | Local::WaitingAck(..) => Err(UserError::SendSettingsWhilePending),
Local::Synced => {
log::trace!("queue to send local settings: {:?}", frame);
self.local = Local::ToSend(frame);
Ok(())
}
}
}

pub fn send_pending_ack<T, B, C, P>(
pub(crate) fn poll_send<T, B, C, P>(
&mut self,
cx: &mut Context,
dst: &mut Codec<T, B>,
Expand All @@ -38,11 +104,8 @@ impl Settings {
C: Buf + Unpin,
P: Peer,
{
log::trace!("send_pending_ack; pending={:?}", self.pending);

if let Some(settings) = &self.pending {
if let Some(settings) = &self.remote {
if !dst.poll_ready(cx)?.is_ready() {
log::trace!("failed to send ACK");
return Poll::Pending;
}

Expand All @@ -61,7 +124,23 @@ impl Settings {
streams.apply_remote_settings(settings)?;
}

self.pending = None;
self.remote = None;

match &self.local {
Local::ToSend(settings) => {
if !dst.poll_ready(cx)?.is_ready() {
return Poll::Pending;
}

// Buffer the settings frame
dst.buffer(settings.clone().into())
.expect("invalid settings frame");
log::trace!("local settings sent; waiting for ack: {:?}", settings);

self.local = Local::WaitingAck(settings.clone());
}
Local::WaitingAck(..) | Local::Synced => {}
}

Poll::Ready(Ok(()))
}
Expand Down
20 changes: 18 additions & 2 deletions src/proto/streams/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ impl FlowControl {
Ok(())
}

/// Decrement the window size.
/// Decrement the send-side window size.
///
/// This is called after receiving a SETTINGS frame with a lower
/// INITIAL_WINDOW_SIZE value.
pub fn dec_window(&mut self, sz: WindowSize) {
pub fn dec_send_window(&mut self, sz: WindowSize) {
log::trace!(
"dec_window; sz={}; window={}, available={}",
sz,
Expand All @@ -146,6 +146,22 @@ impl FlowControl {
self.window_size -= sz;
}

/// Decrement the recv-side window size.
///
/// This is called after receiving a SETTINGS ACK frame with a lower
/// INITIAL_WINDOW_SIZE value.
pub fn dec_recv_window(&mut self, sz: WindowSize) {
log::trace!(
"dec_recv_window; sz={}; window={}, available={}",
sz,
self.window_size,
self.available
);
// This should not be able to overflow `window_size` from the bottom.
self.window_size -= sz;
self.available -= sz;
}

/// Decrements the window reflecting data has actually been sent. The caller
/// must ensure that the window has capacity.
pub fn send_data(&mut self, sz: WindowSize) {
Expand Down
61 changes: 61 additions & 0 deletions src/proto/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,67 @@ impl Recv {
}
}

pub(crate) fn apply_local_settings(
&mut self,
settings: &frame::Settings,
store: &mut Store,
) -> Result<(), RecvError> {
let target = if let Some(val) = settings.initial_window_size() {
val
} else {
return Ok(());
};

let old_sz = self.init_window_sz;
self.init_window_sz = target;

log::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);

// Per RFC 7540 §6.9.2:
//
// In addition to changing the flow-control window for streams that are
// not yet active, a SETTINGS frame can alter the initial flow-control
// window size for streams with active flow-control windows (that is,
// streams in the "open" or "half-closed (remote)" state). When the
// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
// the size of all stream flow-control windows that it maintains by the
// difference between the new value and the old value.
//
// A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
// space in a flow-control window to become negative. A sender MUST
// track the negative flow-control window and MUST NOT send new
// flow-controlled frames until it receives WINDOW_UPDATE frames that
// cause the flow-control window to become positive.

if target < old_sz {
// We must decrease the (local) window on every open stream.
let dec = old_sz - target;
log::trace!("decrementing all windows; dec={}", dec);

store.for_each(|mut stream| {
stream.recv_flow.dec_recv_window(dec);
Ok(())
})
} else if target > old_sz {
// We must increase the (local) window on every open stream.
let inc = target - old_sz;
log::trace!("incrementing all windows; inc={}", inc);
store.for_each(|mut stream| {
// XXX: Shouldn't the peer have already noticed our
// overflow and sent us a GOAWAY?
stream
.recv_flow
.inc_window(inc)
.map_err(RecvError::Connection)?;
stream.recv_flow.assign_capacity(inc);
Ok(())
})
} else {
// size is the same... so do nothing
Ok(())
}
}

pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
if !stream.state.is_recv_closed() {
return false;
Expand Down
2 changes: 1 addition & 1 deletion src/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ impl Send {
store.for_each(|mut stream| {
let stream = &mut *stream;

stream.send_flow.dec_window(dec);
stream.send_flow.dec_send_window(dec);

// It's possible that decreasing the window causes
// `window_size` (the stream-specific window) to fall below
Expand Down
7 changes: 7 additions & 0 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,13 @@ where
)
}

pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), RecvError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

me.actions.recv.apply_local_settings(frame, &mut me.store)
}

pub fn send_request(
&mut self,
request: Request<()>,
Expand Down
19 changes: 19 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,25 @@ where
self.connection.set_target_window_size(size);
}

/// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
/// flow control for received data.
///
/// The `SETTINGS` will be sent to the remote, and only applied once the
/// remote acknowledges the change.
///
/// This can be used to increase or decrease the window size for existing
/// streams.
///
/// # Errors
///
/// Returns an error if a previous call is still pending acknowledgement
/// from the remote endpoint.
pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
assert!(size <= proto::MAX_WINDOW_SIZE);
self.connection.set_initial_window_size(size)?;
Ok(())
}

/// Returns `Ready` when the underlying connection has closed.
///
/// If any new inbound streams are received during a call to `poll_closed`,
Expand Down
Loading