Skip to content

Commit

Permalink
Add max send buffer per stream option (#580)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar authored Dec 8, 2021
1 parent e9e0f27 commit efa113b
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 6 deletions.
23 changes: 23 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,9 @@ pub struct Builder {
/// Initial target window size for new connections.
initial_target_connection_window_size: Option<u32>,

/// Maximum amount of bytes to "buffer" for writing per stream.
max_send_buffer_size: usize,

/// Maximum number of locally reset streams to keep at a time.
reset_stream_max: usize,

Expand Down Expand Up @@ -628,6 +631,7 @@ impl Builder {
/// ```
pub fn new() -> Builder {
Builder {
max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
initial_target_connection_window_size: None,
Expand Down Expand Up @@ -962,6 +966,24 @@ impl Builder {
self
}

/// Sets the maximum send buffer size per stream.
///
/// Once a stream has buffered up to (or over) the maximum, the stream's
/// flow control will not "poll" additional capacity. Once bytes for the
/// stream have been written to the connection, the send buffer capacity
/// will be freed up again.
///
/// The default is currently ~400MB, but may change.
///
/// # Panics
///
/// This function panics if `max` is larger than `u32::MAX`.
pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
assert!(max <= std::u32::MAX as usize);
self.max_send_buffer_size = max;
self
}

/// Enables or disables server push promises.
///
/// This value is included in the initial SETTINGS handshake. When set, the
Expand Down Expand Up @@ -1184,6 +1206,7 @@ where
proto::Config {
next_stream_id: builder.stream_id,
initial_max_send_streams: builder.initial_max_send_streams,
max_send_buffer_size: builder.max_send_buffer_size,
reset_stream_duration: builder.reset_stream_duration,
reset_stream_max: builder.reset_stream_max,
settings: builder.settings.clone(),
Expand Down
2 changes: 2 additions & 0 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ struct DynConnection<'a, B: Buf = Bytes> {
pub(crate) struct Config {
pub next_stream_id: StreamId,
pub initial_max_send_streams: usize,
pub max_send_buffer_size: usize,
pub reset_stream_duration: Duration,
pub reset_stream_max: usize,
pub settings: frame::Settings,
Expand Down Expand Up @@ -108,6 +109,7 @@ where
.initial_window_size()
.unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE),
initial_max_send_streams: config.initial_max_send_streams,
local_max_buffer_size: config.max_send_buffer_size,
local_next_stream_id: config.next_stream_id,
local_push_enabled: config.settings.is_push_enabled().unwrap_or(true),
extended_connect_protocol_enabled: config
Expand Down
1 change: 1 addition & 0 deletions src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ pub type WindowSize = u32;
pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1;
pub const DEFAULT_RESET_STREAM_MAX: usize = 10;
pub const DEFAULT_RESET_STREAM_SECS: u64 = 30;
pub const DEFAULT_MAX_SEND_BUFFER_SIZE: usize = 1024 * 400;
3 changes: 3 additions & 0 deletions src/proto/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub struct Config {
/// MAX_CONCURRENT_STREAMS specified in the frame.
pub initial_max_send_streams: usize,

/// Max amount of DATA bytes to buffer per stream.
pub local_max_buffer_size: usize,

/// The stream ID to start the next local stream with
pub local_next_stream_id: StreamId,

Expand Down
12 changes: 6 additions & 6 deletions src/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub(super) struct Send {
/// > the identified last stream.
max_stream_id: StreamId,

/// The maximum amount of bytes a stream should buffer.
max_buffer_size: usize,

/// Initial window size of locally initiated streams
init_window_sz: WindowSize,

Expand All @@ -52,6 +55,7 @@ impl Send {
pub fn new(config: &Config) -> Self {
Send {
init_window_sz: config.remote_init_window_sz,
max_buffer_size: config.local_max_buffer_size,
max_stream_id: StreamId::MAX,
next_stream_id: Ok(config.local_next_stream_id),
prioritize: Prioritize::new(config),
Expand Down Expand Up @@ -333,14 +337,10 @@ impl Send {

/// Current available stream send capacity
pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize {
let available = stream.send_flow.available().as_size();
let available = stream.send_flow.available().as_size() as usize;
let buffered = stream.buffered_send_data;

if available as usize <= buffered {
0
} else {
available - buffered as WindowSize
}
available.min(self.max_buffer_size).saturating_sub(buffered) as WindowSize
}

pub fn poll_reset(
Expand Down
23 changes: 23 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ pub struct Builder {

/// Initial target window size for new connections.
initial_target_connection_window_size: Option<u32>,

/// Maximum amount of bytes to "buffer" for writing per stream.
max_send_buffer_size: usize,
}

/// Send a response back to the client
Expand Down Expand Up @@ -633,6 +636,7 @@ impl Builder {
reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
settings: Settings::default(),
initial_target_connection_window_size: None,
max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
}
}

Expand Down Expand Up @@ -870,6 +874,24 @@ impl Builder {
self
}

/// Sets the maximum send buffer size per stream.
///
/// Once a stream has buffered up to (or over) the maximum, the stream's
/// flow control will not "poll" additional capacity. Once bytes for the
/// stream have been written to the connection, the send buffer capacity
/// will be freed up again.
///
/// The default is currently ~400MB, but may change.
///
/// # Panics
///
/// This function panics if `max` is larger than `u32::MAX`.
pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
assert!(max <= std::u32::MAX as usize);
self.max_send_buffer_size = max;
self
}

/// Sets the maximum number of concurrent locally reset streams.
///
/// When a stream is explicitly reset by either calling
Expand Down Expand Up @@ -1290,6 +1312,7 @@ where
next_stream_id: 2.into(),
// Server does not need to locally initiate any streams
initial_max_send_streams: 0,
max_send_buffer_size: self.builder.max_send_buffer_size,
reset_stream_duration: self.builder.reset_stream_duration,
reset_stream_max: self.builder.reset_stream_max,
settings: self.builder.settings.clone(),
Expand Down
57 changes: 57 additions & 0 deletions tests/h2-tests/tests/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1611,3 +1611,60 @@ async fn poll_capacity_after_send_data_and_reserve() {

join(srv, h2).await;
}

#[tokio::test]
async fn max_send_buffer_size_overflow() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
.await;
srv.send_frame(frames::headers(1).response(200).eos()).await;
srv.recv_frame(frames::data(1, &[0; 10][..])).await;
srv.recv_frame(frames::data(1, &[][..]).eos()).await;
};

let client = async move {
let (mut client, mut conn) = client::Builder::new()
.max_send_buffer_size(5)
.handshake::<_, Bytes>(io)
.await
.unwrap();
let request = Request::builder()
.method(Method::POST)
.uri("https://www.example.com/")
.body(())
.unwrap();

let (response, mut stream) = client.send_request(request, false).unwrap();

let response = conn.drive(response).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);

assert_eq!(stream.capacity(), 0);
stream.reserve_capacity(10);
assert_eq!(
stream.capacity(),
5,
"polled capacity not over max buffer size"
);

stream.send_data([0; 10][..].into(), false).unwrap();

stream.reserve_capacity(15);
assert_eq!(
stream.capacity(),
0,
"now with buffered over the max, don't overflow"
);
stream.send_data([0; 0][..].into(), true).unwrap();

// Wait for the connection to close
conn.await.unwrap();
};

join(srv, client).await;
}

0 comments on commit efa113b

Please sign in to comment.