Skip to content

Commit

Permalink
Add Nonblock option for Sender and Receiver.
Browse files Browse the repository at this point in the history
And, Make Sender and Receiver can not be
constructed out of this crate.
  • Loading branch information
hicqu committed Apr 2, 2017
1 parent 2b23af6 commit d6400b8
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fifo"
version = "0.1.3"
version = "0.1.4"
license = "MIT"
authors = ["qupeng <onlyqupeng@gmail.com>"]
description = "First-in-first-out lock-free ring-buffer like kfifo in Linux"
Expand Down
49 changes: 44 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,29 @@ impl Drop for Inner {

unsafe impl Sync for Inner {}

/// What can we do when operations on `Sender` or `Receiver` would block.
///
/// 1) Just return `Err(ErrorKind::WouldBlock)` immediately; or
/// 2) sleep for some milliseconds.
///
/// The default on `Sender` and `Receiver` is Sleep(10).
pub enum WouldBlock {
Nonblock,
Sleep(u64),
}

/// The fifo sender. It's `Send` but `!Send`.
pub struct Sender {
_private: (),
inner: Arc<Inner>,
would_block: WouldBlock,
}

/// The fifo receiver. It's `Send` but `!Send`.
pub struct Receiver {
_private: (),
inner: Arc<Inner>,
would_block: WouldBlock,
}

impl Drop for Sender {
Expand Down Expand Up @@ -107,12 +122,24 @@ pub fn align_up_for_fifo_size(size: usize) -> usize {
pub fn fifo(size: usize) -> (Sender, Receiver) {
let size = align_up_for_fifo_size(size);
let inner = Arc::new(Inner::new(size));
let sender = Sender { inner: inner.clone() };
let receiver = Receiver { inner: inner };
let sender = Sender {
_private: (),
inner: inner.clone(),
would_block: WouldBlock::Sleep(10),
};
let receiver = Receiver {
_private: (),
inner: inner,
would_block: WouldBlock::Sleep(10),
};
(sender, receiver)
}

impl Sender {
pub fn set_would_block(&mut self, would_block: WouldBlock) {
self.would_block = would_block;
}

fn do_write<T>(&mut self, bytes: usize, mut cp_data_to: T) -> io::Result<usize>
where T: FnMut(&mut [u8], usize, usize) -> io::Result<usize>
{
Expand All @@ -128,9 +155,13 @@ impl Sender {
break;
} else {
if inner.shutdown.shuted(SHUT_READ) {
return Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe"));
return Err(io::Error::new(io::ErrorKind::BrokenPipe, "closed on read end"));
}
if let WouldBlock::Sleep(sleep) = self.would_block {
thread::sleep(time::Duration::from_millis(sleep));
} else {
return Err(io::Error::new(io::ErrorKind::WouldBlock, "buffer is full"));
}
thread::sleep(time::Duration::from_millis(10));
};
}
let start_pos = pin & (inner.size - 1);
Expand Down Expand Up @@ -161,6 +192,10 @@ impl io::Write for Sender {
}

impl Receiver {
pub fn set_would_block(&mut self, would_block: WouldBlock) {
self.would_block = would_block;
}

fn do_write<T>(&mut self, bytes: usize, mut cp_data_from: T) -> io::Result<usize>
where T: FnMut(&[u8], usize, usize) -> io::Result<usize>
{
Expand All @@ -178,7 +213,11 @@ impl Receiver {
if inner.shutdown.shuted(SHUT_WRITE) {
return Ok(0);
}
thread::sleep(time::Duration::from_millis(10));
if let WouldBlock::Sleep(sleep) = self.would_block {
thread::sleep(time::Duration::from_millis(sleep));
} else {
return Err(io::Error::new(io::ErrorKind::WouldBlock, "buffer is empty"));
}
}
}
let start_pos = pout & (inner.size - 1);
Expand Down

0 comments on commit d6400b8

Please sign in to comment.