Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure API using a handle per stream #12

Merged
merged 19 commits into from
Aug 8, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
More code
  • Loading branch information
carllerche committed Aug 5, 2017
commit 1c55ad75ea7cbcdad8673c69a1c5ec807b95bbed
23 changes: 23 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct Client<T, B: IntoBuf> {
}

/// Client half of an active HTTP/2.0 stream.
#[derive(Debug)]
pub struct Stream<B: IntoBuf> {
inner: proto::StreamRef<Peer, B::Buf>,
}
Expand Down Expand Up @@ -89,6 +90,19 @@ impl<T, B> Client<T, B>
}
}

impl<T, B> Future for Client<T, B>
// TODO: Get rid of 'static
where T: AsyncRead + AsyncWrite + 'static,
B: IntoBuf + 'static,
{
type Item = ();
type Error = ConnectionError;

fn poll(&mut self) -> Poll<(), ConnectionError> {
self.connection.poll()
}
}

impl<T, B> fmt::Debug for Client<T, B>
where T: fmt::Debug,
B: fmt::Debug + IntoBuf,
Expand Down Expand Up @@ -145,6 +159,15 @@ impl<B: IntoBuf> Stream<B> {
}
}

impl<B: IntoBuf> Future for Stream<B> {
type Item = Response<()>;
type Error = ConnectionError;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.poll_response()
}
}

// ===== impl Peer =====

impl proto::Peer for Peer {
Expand Down
8 changes: 3 additions & 5 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl<T, P, B> Connection<T, P, B>
}

/// Advances the internal state of the connection.
pub fn poll(&mut self) -> Poll<Option<()>, ConnectionError> {
pub fn poll(&mut self) -> Poll<(), ConnectionError> {
use frame::Frame::*;

loop {
Expand Down Expand Up @@ -183,11 +183,9 @@ impl<T, P, B> Connection<T, P, B>
*/
}
None => {
unimplemented!();
/*
// TODO: Is this correct?
trace!("codec closed");
return Ok(Async::Ready(None));
*/
return Ok(Async::Ready(()));
}
}
}
Expand Down
41 changes: 36 additions & 5 deletions src/proto/streams/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct Deque<B> {
}

/// Tracks the head & tail for a sequence of frames in a `Buffer`.
#[derive(Debug, Default)]
#[derive(Debug, Default, Copy, Clone)]
struct Indices {
head: usize,
tail: usize,
Expand All @@ -27,7 +27,7 @@ struct Indices {
#[derive(Debug)]
struct Slot<B> {
frame: Frame<B>,
next: usize,
next: Option<usize>,
}

impl<B> Buffer<B> {
Expand All @@ -50,11 +50,42 @@ impl<B> Deque<B> {
self.indices.is_none()
}

pub fn push_back(&mut self, buf: &mut Buffer<B>, val: Frame<B>) {
unimplemented!();
pub fn push_back(&mut self, buf: &mut Buffer<B>, frame: Frame<B>) {
let key = buf.slab.insert(Slot {
frame,
next: None,
});

match self.indices {
Some(ref mut idxs) => {
buf.slab[idxs.tail].next = Some(key);
idxs.tail = key;
}
None => {
self.indices = Some(Indices {
head: key,
tail: key,
});
}
}
}

pub fn pop_front(&mut self, buf: &mut Buffer<B>) -> Option<Frame<B>> {
unimplemented!();
match self.indices {
Some(mut idxs) => {
let mut slot = buf.slab.remove(idxs.head);

if idxs.head == idxs.tail {
assert!(slot.next.is_none());
self.indices = None;
} else {
idxs.head = slot.next.take().unwrap();
self.indices = Some(idxs);
}

return Some(slot.frame);
}
None => None,
}
}
}
76 changes: 55 additions & 21 deletions src/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,8 @@ impl<B> Prioritize<B>
return;
}

// The next pointer shouldn't be set
debug_assert!(stream.next_pending_send.is_none());

// Queue the stream
match self.pending_send {
Some(ref mut idxs) => {
// Update the current tail node to point to `stream`
stream.resolve(idxs.tail).next_pending_send = Some(stream.key());

// Update the tail pointer
idxs.tail = stream.key();
}
None => {
self.pending_send = Some(Indices {
head: stream.key(),
tail: stream.key(),
});
}
}

stream.is_pending_send = true;
self.push_sender(stream);
}

pub fn poll_complete<T>(&mut self,
Expand Down Expand Up @@ -88,6 +69,59 @@ impl<B> Prioritize<B>
}

fn pop_frame(&mut self, store: &mut Store<B>) -> Option<Frame<B>> {
unimplemented!();
match self.pop_sender(store) {
Some(mut stream) => {
let frame = stream.pending_send.pop_front(&mut self.buffer).unwrap();

if !stream.pending_send.is_empty() {
self.push_sender(&mut stream);
}

Some(frame)
}
None => None,
}
}

fn push_sender(&mut self, stream: &mut store::Ptr<B>) {
// The next pointer shouldn't be set
debug_assert!(stream.next_pending_send.is_none());

// Queue the stream
match self.pending_send {
Some(ref mut idxs) => {
// Update the current tail node to point to `stream`
stream.resolve(idxs.tail).next_pending_send = Some(stream.key());

// Update the tail pointer
idxs.tail = stream.key();
}
None => {
self.pending_send = Some(Indices {
head: stream.key(),
tail: stream.key(),
});
}
}

stream.is_pending_send = true;
}

fn pop_sender<'a>(&mut self, store: &'a mut Store<B>) -> Option<store::Ptr<'a, B>> {
if let Some(mut idxs) = self.pending_send {
let mut stream = store.resolve(idxs.head);

if idxs.head == idxs.tail {
assert!(stream.next_pending_send.is_none());
self.pending_send = None;
} else {
idxs.head = stream.next_pending_send.take().unwrap();
self.pending_send = Some(idxs);
}

return Some(stream);
}

None
}
}
2 changes: 1 addition & 1 deletion src/proto/streams/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub(super) struct Ptr<'a, B: 'a> {
}

/// References an entry in the store.
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) struct Key(usize);

pub(super) enum Entry<'a, B: 'a> {
Expand Down
5 changes: 5 additions & 0 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ impl<B> Streams<client::Peer, B>
pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool)
-> Result<StreamRef<client::Peer, B>, ConnectionError>
{
// TODO: There is a hazard with assigning a stream ID before the
// prioritize layer. If prioritization reorders new streams, this
// implicitly closes the earlier stream IDs.
//
// See: carllerche/h2#11
let key = {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
Expand Down
16 changes: 12 additions & 4 deletions tests/stream_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,21 @@ fn send_recv_headers_only() {
.read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89])
.build();

let h2 = client::handshake(mock)
let mut h2 = Client::handshake(mock)
.wait().unwrap();

// Send the request
let mut request = request::Head::default();
request.uri = "https://http2.akamai.com/".parse().unwrap();
let request = Request::builder()
.uri("https://http2.akamai.com/")
.body(()).unwrap();

info!("sending request");
let h2 = h2.send_request(1.into(), request, true).wait().unwrap();
let stream = h2.request(request, true).unwrap();

let resp = stream.select2(h2).wait().ok().unwrap();
println!("GOT: {:?}", resp);

/*
// Get the response

info!("getting response");
Expand All @@ -48,8 +53,10 @@ fn send_recv_headers_only() {
// No more frames
info!("ensure no more responses");
assert!(Stream::wait(h2).next().is_none());;
*/
}

/*
#[test]
fn send_recv_data() {
let _ = env_logger::init();
Expand Down Expand Up @@ -257,3 +264,4 @@ fn send_data_without_headers() {
#[ignore]
fn exceed_max_streams() {
}
*/
8 changes: 4 additions & 4 deletions tests/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ pub use self::http::{
response,
method,
status,
Request,
Response,
};

pub use self::h2::{
client,
server,
};
pub use self::h2::client::{self, Client};
// pub use self::h2::server;

pub use self::bytes::{
Buf,
Expand Down