Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wire in recv flow control #26

Merged
merged 3 commits into from
Aug 23, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {frame, ConnectionError, StreamId};
use {Body, Chunk};
use Body;
use proto::{self, Connection, WindowSize};
use error::Reason::*;

Expand Down
20 changes: 2 additions & 18 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,14 @@ pub struct Body<B: IntoBuf> {
inner: proto::StreamRef<B::Buf>,
}

#[derive(Debug)]
pub struct Chunk<B: IntoBuf> {
inner: proto::Chunk<B::Buf>,
}

// ===== impl Body =====

impl<B: IntoBuf> futures::Stream for Body<B> {
type Item = Chunk<B>;
type Item = Bytes;
type Error = ConnectionError;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let chunk = try_ready!(self.inner.poll_data())
.map(|inner| Chunk { inner });

Ok(chunk.into())
}
}

// ===== impl Chunk =====

impl<B: IntoBuf> Chunk<B> {
pub fn pop_bytes(&mut self) -> Option<Bytes> {
self.inner.pop_bytes()
self.inner.poll_data()
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod settings;
mod streams;

pub(crate) use self::connection::Connection;
pub(crate) use self::streams::{Streams, StreamRef, Chunk};
pub(crate) use self::streams::{Streams, StreamRef};

use self::codec::Codec;
use self::framed_read::FramedRead;
Expand All @@ -21,6 +21,7 @@ use error::Reason;
use frame::{self, Frame};

use futures::{self, task, Poll, Async, AsyncSink, Sink, Stream as Stream2};
use futures::task::Task;
use bytes::{Buf, IntoBuf};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
Expand Down
49 changes: 0 additions & 49 deletions src/proto/streams/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,53 +108,4 @@ impl<B> Deque<B> {
None => None,
}
}

pub fn take_while<F>(&mut self, buf: &mut Buffer<B>, mut f: F) -> Self
where F: FnMut(&Frame<B>) -> bool
{
match self.indices {
Some(mut idxs) => {
if !f(&buf.slab[idxs.head].frame) {
return Deque::new();
}

let head = idxs.head;
let mut tail = idxs.head;

loop {
let next = match buf.slab[tail].next {
Some(next) => next,
None => {
self.indices = None;
return Deque {
indices: Some(idxs),
_p: PhantomData,
};
}
};

if !f(&buf.slab[next].frame) {
// Split the linked list
buf.slab[tail].next = None;

self.indices = Some(Indices {
head: next,
tail: idxs.tail,
});

return Deque {
indices: Some(Indices {
head: head,
tail: tail,
}),
_p: PhantomData,
}
}

tail = next;
}
}
None => Deque::new(),
}
}
}
16 changes: 13 additions & 3 deletions src/proto/streams/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@ impl FlowControl {
self.available += capacity;
}

/// Returns the number of bytes available but not assigned to the window.
///
/// This represents pending outbound WINDOW_UPDATE frames.
pub fn unclaimed_capacity(&self) -> WindowSize {
let available = self.available as i32;

if self.window_size >= available {
return 0;
}

(available - self.window_size) as WindowSize
}

/// Update the window size.
///
/// This is called after receiving a WINDOW_UPDATE frame
Expand All @@ -68,9 +81,6 @@ impl FlowControl {
trace!("send_data; sz={}; window={}; available={}",
sz, self.window_size, self.available);

// Available cannot be greater than the window
debug_assert!(self.available as i32 <= self.window_size || self.available == 0);

// Ensure that the argument is correct
assert!(sz <= self.window_size as WindowSize);

Expand Down
2 changes: 1 addition & 1 deletion src/proto/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod store;
mod stream;
mod streams;

pub(crate) use self::streams::{Streams, StreamRef, Chunk};
pub(crate) use self::streams::{Streams, StreamRef};
pub(crate) use self::prioritize::Prioritized;

use self::buffer::Buffer;
Expand Down
18 changes: 6 additions & 12 deletions src/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ pub(super) struct Prioritize<B> {

/// Holds frames that are waiting to be written to the socket
buffer: Buffer<B>,

/// Holds the connection task. This signals the connection that there is
/// data to flush.
conn_task: Option<task::Task>,
}

pub(crate) struct Prioritized<B> {
Expand Down Expand Up @@ -49,14 +45,14 @@ impl<B> Prioritize<B>
pending_capacity: store::Queue::new(),
flow: flow,
buffer: Buffer::new(),
conn_task: None,
}
}

/// Queue a frame to be sent to the remote
pub fn queue_frame(&mut self,
frame: Frame<B>,
stream: &mut store::Ptr<B>)
stream: &mut store::Ptr<B>,
task: &mut Option<Task>)
{
// Queue the frame in the buffer
stream.pending_send.push_back(&mut self.buffer, frame);
Expand All @@ -65,15 +61,16 @@ impl<B> Prioritize<B>
self.pending_send.push(stream);

// Notify the connection.
if let Some(task) = self.conn_task.take() {
if let Some(task) = task.take() {
task.notify();
}
}

/// Send a data frame
pub fn send_data(&mut self,
frame: frame::Data<B>,
stream: &mut store::Ptr<B>)
stream: &mut store::Ptr<B>,
task: &mut Option<Task>)
-> Result<(), ConnectionError>
{
let sz = frame.payload().remaining();
Expand Down Expand Up @@ -112,7 +109,7 @@ impl<B> Prioritize<B>
if stream.send_flow.available() > stream.buffered_send_data {
// The stream currently has capacity to send the data frame, so
// queue it up and notify the connection task.
self.queue_frame(frame.into(), stream);
self.queue_frame(frame.into(), stream, task);
} else {
// The stream has no capacity to send the frame now, save it but
// don't notify the conneciton task. Once additional capacity
Expand Down Expand Up @@ -294,9 +291,6 @@ impl<B> Prioritize<B>

// This might release a data frame...
if !self.reclaim_frame(store, dst) {
// Nothing else to do, track the task
self.conn_task = Some(task::current());

return Ok(().into());
}

Expand Down
Loading