Skip to content

Commit

Permalink
poll + recv for socket io
Browse files Browse the repository at this point in the history
  • Loading branch information
jianghua committed Nov 23, 2021
1 parent 9ae84bf commit b9a9865
Show file tree
Hide file tree
Showing 10 changed files with 597 additions and 334 deletions.
16 changes: 8 additions & 8 deletions glommio/src/io/buffered_file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ impl StreamWriter {
res.map(|_x| ())
}

fn flush_write_buffer(&mut self, waker: Waker) -> bool {
fn flush_write_buffer(&mut self, waker: &Waker) -> bool {
assert!(self.source.is_none());
let bytes = self.buffer.consumed_bytes();
if !bytes.is_empty() {
Expand Down Expand Up @@ -409,7 +409,7 @@ impl StreamWriter {
.upgrade()
.unwrap()
.fdatasync(self.file.as_ref().unwrap().as_raw_fd());
source.add_waiter_single(cx.waker().clone());
source.add_waiter_single(cx.waker());
self.source = Some(source);
Poll::Pending
}
Expand All @@ -429,7 +429,7 @@ impl StreamWriter {
.upgrade()
.unwrap()
.close(self.file.as_ref().unwrap().as_raw_fd());
source.add_waiter_single(cx.waker().clone());
source.add_waiter_single(cx.waker());
self.source = Some(source);
Poll::Pending
}
Expand All @@ -443,7 +443,7 @@ impl StreamWriter {

fn do_poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self.source.take() {
None => match self.flush_write_buffer(cx.waker().clone()) {
None => match self.flush_write_buffer(cx.waker()) {
true => Poll::Pending,
false => Poll::Ready(Ok(())),
},
Expand Down Expand Up @@ -497,7 +497,7 @@ macro_rules! do_seek {
.upgrade()
.unwrap()
.statx($fileobj.as_raw_fd(), &$fileobj.path().unwrap());
source.add_waiter_single($cx.waker().clone());
source.add_waiter_single($cx.waker());
$source = Some(source);
Poll::Pending
}
Expand Down Expand Up @@ -585,7 +585,7 @@ impl AsyncBufRead for StreamReader {
self.buffer.max_buffer_size,
self.file.file.scheduler.borrow().as_ref(),
);
source.add_waiter_single(cx.waker().clone());
source.add_waiter_single(cx.waker());
self.io_source = Some(source);
Poll::Pending
}
Expand All @@ -611,7 +611,7 @@ impl AsyncWrite for StreamWriter {
}

if !self.buffer.data.is_empty() {
let x = self.flush_write_buffer(cx.waker().clone());
let x = self.flush_write_buffer(cx.waker());
assert!(x);
Poll::Pending
} else {
Expand Down Expand Up @@ -670,7 +670,7 @@ impl AsyncBufRead for Stdin {
self.buffer.max_buffer_size,
None,
);
source.add_waiter_single(cx.waker().clone());
source.add_waiter_single(cx.waker());
self.source = Some(source);
Poll::Pending
}
Expand Down
1 change: 1 addition & 0 deletions glommio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ macro_rules! poll_err {
/// Unwraps an Option to Poll<T>: if Some returns right away.
///
/// Usage is similar to `future_lite::ready!`
#[allow(unused)]
macro_rules! poll_some {
($e:expr $(,)?) => {
match $e {
Expand Down
16 changes: 16 additions & 0 deletions glommio/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ fn yolo_send(fd: RawFd, buf: &[u8]) -> Option<io::Result<usize>> {
}
}

fn yolo_peek(fd: RawFd, buf: &mut [u8]) -> Option<io::Result<usize>> {
match sys::recv_syscall(
fd,
buf.as_mut_ptr(),
buf.len(),
(MsgFlags::MSG_DONTWAIT | MsgFlags::MSG_PEEK).bits(),
) {
Ok(x) => Some(Ok(x)),
Err(err) => match err.kind() {
io::ErrorKind::WouldBlock => None,
_ => Some(Err(err)),
},
}
}

fn yolo_recv(fd: RawFd, buf: &mut [u8]) -> Option<io::Result<usize>> {
match sys::recv_syscall(
fd,
Expand Down Expand Up @@ -97,6 +112,7 @@ mod tcp_socket;
mod udp_socket;
mod unix;
pub use self::{
stream::{Buffered, Preallocated},
tcp_socket::{AcceptedTcpStream, TcpListener, TcpStream},
udp_socket::UdpSocket,
unix::{AcceptedUnixStream, UnixDatagram, UnixListener, UnixStream},
Expand Down
Loading

0 comments on commit b9a9865

Please sign in to comment.