Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement h2::server::Stream::send_reset(Reason) and Body::is_empty() #22

Merged
merged 17 commits into from
Aug 23, 2017
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ pub struct Chunk<B: IntoBuf> {

// ===== impl Body =====

impl<B: IntoBuf> Body<B> {
pub fn is_empty(&self) -> bool {
// If the recv side is closed and the receive queue is empty, the body is empty.
self.inner.is_recv_eos()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the comment thread below, this will return false if the body is consumed and the next pending frame is a trailer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think that's correct fsvo is_empty. I intend to expose "is there a body at all" and not "is there remaining body to be consumed". i should probably rename this

}
}

impl<B: IntoBuf> futures::Stream for Body<B> {
type Item = Chunk<B>;
type Error = ConnectionError;
Expand Down
17 changes: 17 additions & 0 deletions src/proto/streams/send.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use {frame, ConnectionError};
use error::User::InactiveStreamId;
use proto::*;
use super::*;

Expand Down Expand Up @@ -107,6 +108,22 @@ impl<B> Send<B> where B: Buf {
stream.state.send_close()
}

pub fn send_reset(&mut self, reason: Reason,
stream: &mut store::Ptr<B>)
-> Result<(), ConnectionError>
{
if stream.state.is_closed() {
return Err(InactiveStreamId.into())
}

stream.state.send_reset(reason)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the reset will still be sent if the stream state is already closed (potentially reset already). Is this expected?


let frame = frame::Reset::new(stream.id, reason);
self.prioritize.queue_frame(frame.into(), stream);

Ok(())
}

pub fn send_data(&mut self,
frame: frame::Data<B>,
stream: &mut store::Ptr<B>)
Expand Down
13 changes: 13 additions & 0 deletions src/proto/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,19 @@ impl State {
}
}

/// Indicates that the local side will not send more data to the local.
pub fn send_reset(&mut self, reason: Reason) -> Result<(), ConnectionError> {
match self.inner {
Idle => Err(ProtocolError.into()),
Closed(..) => Ok(()),
_ => {
trace!("send_reset: => Closed");
self.inner = Closed(Some(Cause::Proto(reason)));
Ok(())
}
}
}

/// Returns true if a stream with the current state counts against the
/// concurrency limit.
pub fn is_counted(&self) -> bool {
Expand Down
19 changes: 19 additions & 0 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,16 @@ impl<B> StreamRef<B>
me.actions.recv.take_request(&mut stream)
}

pub fn send_reset<P: Peer>(&mut self, reason: Reason) -> Result<(), ConnectionError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

let stream = me.store.resolve(self.key);
me.actions.transition::<P, _, _>(stream, move |actions, stream| {
actions.send.send_reset(reason, stream)
})
}

pub fn send_response(&mut self, response: Response<()>, end_of_stream: bool)
-> Result<(), ConnectionError>
{
Expand All @@ -352,6 +362,15 @@ impl<B> StreamRef<B>
})
}

pub fn is_recv_eos(&self) -> bool {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

let stream = me.store.resolve(self.key);

stream.state.is_recv_closed() && stream.pending_recv.is_empty()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream.pending_recv could only contain a trailers frame, and is_recv_empty() would return false, yet there are no more data frames?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it worth just tracking a bool for this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, i think this is okay if it's only meant to determine whether eos may be set...

}

pub fn poll_response(&mut self) -> Poll<Response<()>, ConnectionError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
Expand Down
8 changes: 6 additions & 2 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use {frame, ConnectionError, StreamId};
use {frame, ConnectionError, Reason, StreamId};
use {Body, Chunk};
use proto::{self, Connection, WindowSize};
use error::Reason::*;

use http::{self, Request, Response};
use futures::{self, Future, Sink, Poll, Async, AsyncSink, IntoFuture};
use tokio_io::{AsyncRead, AsyncWrite};
use bytes::{Bytes, IntoBuf};
use bytes::{Bytes, IntoBuf, Buf};

use std::fmt;

Expand Down Expand Up @@ -191,6 +191,10 @@ impl<B: IntoBuf> Stream<B> {
{
unimplemented!();
}

pub fn send_reset(mut self, reason: Reason) -> Result<(), ConnectionError> {
self.inner.send_reset::<Peer>(reason)
}
}

impl Stream<Bytes> {
Expand Down