Skip to content

Commit

Permalink
Merge pull request #7 from HaveFunTrading/feat/batch_iter
Browse files Browse the repository at this point in the history
Feat/batch iter
  • Loading branch information
HaveFunTrading authored Jan 4, 2025
2 parents df20961 + 3bfd4e7 commit 6611047
Show file tree
Hide file tree
Showing 17 changed files with 225 additions and 126 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "boomnet"
version = "0.0.29"
version = "0.0.30"
edition = "2021"
license = "MIT"
description = "Framework for building low latency websocket client based applications."
Expand All @@ -9,7 +9,7 @@ documentation = "https://docs.rs/boomnet"
repository = "https://github.com/HaveFunTrading/boomnet"
keywords = ["tungstenite", "async", "client", "websocket", "mio"]
categories = ["network-programming", "web-programming::websocket"]
rust-version = "1.74.1"
rust-version = "1.83.0"

[package.metadata.docs.rs]
features = ["full"]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ particularly focusing on TCP stream-oriented clients that utilise various protoc
Simply declare dependency on `boomnet` in your `Cargo.toml` and select desired [features](#features).
```toml
[dependencies]
boomnet = { version = "0.0.29", features = ["full"]}
boomnet = { version = "0.0.30", features = ["full"]}
```

## Design Principles
Expand Down
4 changes: 2 additions & 2 deletions examples/endpoint_with_timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ impl TlsWebsocketEndpointWithContext<FeedContext> for TradeEndpoint {

#[inline]
fn poll(&mut self, ws: &mut TlsWebsocket<Self::Stream>, ctx: &mut FeedContext) -> io::Result<()> {
while let Some(WebsocketFrame::Text(ts, fin, data)) = ws.receive_next()? {
info!("{ts}: ({fin}) {}", String::from_utf8_lossy(data));
while let Some(WebsocketFrame::Text(fin, data)) = ws.receive_next()? {
info!("({fin}) {}", String::from_utf8_lossy(data));
}
let now_ns = ctx.current_time_ns();
if now_ns > self.next_disconnect_time_ns {
Expand Down
4 changes: 2 additions & 2 deletions examples/io_service_with_auto_disconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ impl TlsWebsocketEndpoint for TradeEndpoint {

#[inline]
fn poll(&mut self, ws: &mut TlsWebsocket<Self::Stream>) -> io::Result<()> {
while let Some(WebsocketFrame::Text(ts, fin, data)) = ws.receive_next()? {
println!("[{}] {ts}: ({fin}) {}", self.id, String::from_utf8_lossy(data));
while let Some(WebsocketFrame::Text(fin, data)) = ws.receive_next()? {
println!("[{}] ({fin}) {}", self.id, String::from_utf8_lossy(data));
}
Ok(())
}
Expand Down
10 changes: 5 additions & 5 deletions examples/io_service_with_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ impl TlsWebsocketEndpointWithContext<FeedContext> for TradeEndpoint {

#[inline]
fn poll(&mut self, ws: &mut TlsWebsocket<Self::Stream>, _ctx: &mut FeedContext) -> io::Result<()> {
while let Some(WebsocketFrame::Text(ts, fin, data)) = ws.receive_next()? {
while let Some(WebsocketFrame::Text(fin, data)) = ws.receive_next()? {
match self.id % 4 {
0 => info!("{ts}: ({fin}) {}", Red.paint(String::from_utf8_lossy(data))),
1 => info!("{ts}: ({fin}) {}", Green.paint(String::from_utf8_lossy(data))),
2 => info!("{ts}: ({fin}) {}", Purple.paint(String::from_utf8_lossy(data))),
3 => info!("{ts}: ({fin}) {}", Yellow.paint(String::from_utf8_lossy(data))),
0 => info!("({fin}) {}", Red.paint(String::from_utf8_lossy(data))),
1 => info!("({fin}) {}", Green.paint(String::from_utf8_lossy(data))),
2 => info!("({fin}) {}", Purple.paint(String::from_utf8_lossy(data))),
3 => info!("({fin}) {}", Yellow.paint(String::from_utf8_lossy(data))),
_ => {}
}
}
Expand Down
4 changes: 2 additions & 2 deletions examples/io_service_with_direct_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ impl TlsWebsocketEndpoint for TradeEndpoint {

#[inline]
fn poll(&mut self, ws: &mut TlsWebsocket<Self::Stream>) -> io::Result<()> {
while let Some(WebsocketFrame::Text(ts, fin, data)) = ws.receive_next()? {
println!("[{}] {ts}: ({fin}) {}", self.id, String::from_utf8_lossy(data));
while let Some(WebsocketFrame::Text(fin, data)) = ws.receive_next()? {
println!("[{}] ({fin}) {}", self.id, String::from_utf8_lossy(data));
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions examples/io_service_without_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ impl TlsWebsocketEndpoint for TradeEndpoint {

#[inline]
fn poll(&mut self, ws: &mut TlsWebsocket<Self::Stream>) -> io::Result<()> {
while let Some(WebsocketFrame::Text(ts, fin, data)) = ws.receive_next()? {
println!("[{}] {ts}: ({fin}) {}", self.id, String::from_utf8_lossy(data));
while let Some(WebsocketFrame::Text(fin, data)) = ws.receive_next()? {
println!("[{}] ({fin}) {}", self.id, String::from_utf8_lossy(data));
}
Ok(())
}
Expand Down
8 changes: 4 additions & 4 deletions examples/polymorphic_endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ impl TlsWebsocketEndpointWithContext<FeedContext> for TradeEndpoint {

#[inline]
fn poll(&mut self, ws: &mut TlsWebsocket<Self::Stream>, _ctx: &mut FeedContext) -> io::Result<()> {
while let Some(WebsocketFrame::Text(ts, fin, data)) = ws.receive_next()? {
info!("{ts}: ({fin}) {}", String::from_utf8_lossy(data));
while let Some(WebsocketFrame::Text(fin, data)) = ws.receive_next()? {
info!("({fin}) {}", String::from_utf8_lossy(data));
}
Ok(())
}
Expand Down Expand Up @@ -153,8 +153,8 @@ impl TlsWebsocketEndpointWithContext<FeedContext> for TickerEndpoint {

#[inline]
fn poll(&mut self, ws: &mut TlsWebsocket<Self::Stream>, _ctx: &mut FeedContext) -> io::Result<()> {
while let Some(WebsocketFrame::Text(ts, fin, data)) = ws.receive_next()? {
info!("{ts}: ({fin}) {}", String::from_utf8_lossy(data));
while let Some(WebsocketFrame::Text(fin, data)) = ws.receive_next()? {
info!("({fin}) {}", String::from_utf8_lossy(data));
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions examples/recorded_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ fn main() -> anyhow::Result<()> {
'outer: loop {
'inner: loop {
match ws.receive_next() {
Ok(Some(WebsocketFrame::Text(ts, fin, data))) => {
println!("{ts}: ({fin}) {}", String::from_utf8_lossy(data));
Ok(Some(WebsocketFrame::Text(fin, data))) => {
println!("({fin}) {}", String::from_utf8_lossy(data));
}
Ok(None) => break 'inner,
Err(err) => {
Expand Down
4 changes: 2 additions & 2 deletions examples/replay_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ fn main() -> anyhow::Result<()> {
'outer: loop {
'inner: loop {
match ws.receive_next() {
Ok(Some(WebsocketFrame::Text(ts, fin, data))) => {
println!("{ts}: ({fin}) {}", String::from_utf8_lossy(data));
Ok(Some(WebsocketFrame::Text(fin, data))) => {
println!("({fin}) {}", String::from_utf8_lossy(data));
}
Ok(None) => break 'inner,
Err(err) => {
Expand Down
20 changes: 5 additions & 15 deletions examples/ws_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,12 @@ fn main() -> anyhow::Result<()> {

let idle = IdleStrategy::Sleep(Duration::from_millis(1));

'outer: loop {
'inner: loop {
match ws.receive_next() {
Ok(Some(WebsocketFrame::Text(ts, fin, data))) => {
println!("{ts}: ({fin}) {}", String::from_utf8_lossy(data));
}
Ok(None) => break 'inner,
Err(err) => {
println!("{}", err);
break 'outer;
}
_ => {}
loop {
for frame in ws.batch_iter()? {
if let WebsocketFrame::Text(fin, body) = frame? {
println!("({fin}) {}", String::from_utf8_lossy(body));
}
idle.idle(0);
}
idle.idle(0);
}

Ok(())
}
101 changes: 74 additions & 27 deletions src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const DEFAULT_INITIAL_CAPACITY: usize = 32768;
#[derive(Debug)]
pub struct ReadBuffer<const CHUNK_SIZE: usize, const INITIAL_CAPACITY: usize = DEFAULT_INITIAL_CAPACITY> {
inner: Vec<u8>,
ptr: *const u8,
head: usize,
tail: usize,
}
Expand All @@ -29,8 +30,11 @@ impl<const CHUNK_SIZE: usize, const INITIAL_CAPACITY: usize> ReadBuffer<CHUNK_SI
CHUNK_SIZE <= INITIAL_CAPACITY,
"CHUNK_SIZE ({CHUNK_SIZE}) must be less or equal than {INITIAL_CAPACITY}"
);
let inner = vec![0u8; INITIAL_CAPACITY];
let ptr = inner.as_ptr();
Self {
inner: vec![0u8; INITIAL_CAPACITY],
inner,
ptr,
head: 0,
tail: 0,
}
Expand Down Expand Up @@ -81,25 +85,53 @@ impl<const CHUNK_SIZE: usize, const INITIAL_CAPACITY: usize> ReadBuffer<CHUNK_SI
}

#[inline]
pub fn consume_next(&mut self, len: usize) -> &'static [u8] {
#[inline(never)]
#[cold]
fn bounds_violation(head: usize, tail: usize) -> ! {
panic!("bounds violation: head[{}] > tail[{}]", head, tail)
pub const fn consume_next(&mut self, len: usize) -> Option<&'static [u8]> {
match self.available() >= len {
true => Some(unsafe { self.consume_next_unchecked(len) }),
false => None,
}
}

// view to return
let consumed_view = unsafe { &*ptr::slice_from_raw_parts(self.inner.as_ptr().add(self.head), len) };

// update head to the new value
/// # Safety
/// This function should only be called after `available` bytes are known.
/// ```no_run
/// use boomnet::buffer::ReadBuffer;
/// let mut buffer = ReadBuffer::<4096>::new();
/// if buffer.available() > 10 {
/// unsafe {
/// let view = buffer.consume_next_unchecked(10);
/// }
/// }
#[inline]
pub const unsafe fn consume_next_unchecked(&mut self, len: usize) -> &'static [u8] {
let consumed_view = &*ptr::slice_from_raw_parts(self.ptr.add(self.head), len);
self.head += len;
consumed_view
}

// bounds check
if self.head > self.tail {
bounds_violation(self.head, self.tail);
#[inline]
pub const fn consume_next_byte(&mut self) -> Option<u8> {
match self.available() >= 1 {
true => Some(unsafe { self.consume_next_byte_unchecked() }),
false => None,
}
}

consumed_view
/// # Safety
/// This function should only be called after `available` bytes are known.
/// ```no_run
/// use boomnet::buffer::ReadBuffer;
/// let mut buffer = ReadBuffer::<4096>::new();
/// if buffer.available() > 0 {
/// unsafe {
/// let byte = buffer.consume_next_byte_unchecked();
/// }
/// }
#[inline]
pub const unsafe fn consume_next_byte_unchecked(&mut self) -> u8 {
let byte = *self.ptr.add(self.head);
self.head += 1;
byte
}

#[inline]
Expand Down Expand Up @@ -133,11 +165,11 @@ mod tests {
assert_eq!(12, buf.available());
assert_eq!(b"hello world!", buf.view());

assert_eq!(b"hello ", buf.consume_next(6));
assert_eq!(b"hello ", buf.consume_next(6).unwrap());
assert_eq!(6, buf.available());
assert_eq!(b"world!", buf.view());

assert_eq!(b"world!", buf.consume_next(6));
assert_eq!(b"world!", buf.consume_next(6).unwrap());
assert_eq!(0, buf.available());
assert_eq!(b"", buf.view());

Expand Down Expand Up @@ -174,7 +206,7 @@ mod tests {
buf.read_from(&mut stream).expect("unable to read from the stream");
assert_eq!(b"hello ", buf.view());

assert_eq!(b"hello ", buf.consume_next(6));
assert_eq!(b"hello ", buf.consume_next(6).unwrap());
assert_eq!(0, buf.available());
assert_eq!(b"", buf.view());

Expand All @@ -196,7 +228,7 @@ mod tests {
buf.read_from(&mut stream).expect("unable to read from the stream");
assert_eq!(b"hello ", buf.view());

assert_eq!(b"he", buf.consume_next(2));
assert_eq!(b"he", buf.consume_next(2).unwrap());
assert_eq!(4, buf.available());
assert_eq!(b"llo ", buf.view());

Expand All @@ -210,15 +242,13 @@ mod tests {
}

#[test]
#[should_panic(expected = "bounds violation: head[32] > tail[6]")]
fn should_panic_if_bounds_violated() {
fn should_return_none_if_too_many_bytes_requested_to_view() {
let mut buf = ReadBuffer::<6>::new();
let mut stream = Cursor::new(b"hello world!");

buf.read_from(&mut stream).expect("unable to read from the stream");
assert_eq!(b"hello ", buf.view());

let _ = buf.consume_next(32); // will panic
assert_eq!(b"hello ", buf.view());
assert_eq!(None, buf.consume_next(7));
}

#[test]
Expand Down Expand Up @@ -279,21 +309,38 @@ mod tests {
fn should_consume_next() {
let mut buf = ReadBuffer::<64>::new();
let mut stream = Cursor::new(b"hello world!");
buf.read_from(&mut stream).expect("unable to read from the stream");

assert_eq!(b"hello world!", buf.view());
assert_eq!(b"hello", buf.consume_next(5).unwrap());
assert_eq!(b" ", buf.consume_next(1).unwrap());
assert_eq!(b"world!", buf.consume_next(6).unwrap());
assert_eq!(0, buf.available())
}

#[test]
fn should_consume_next_byte() {
let mut buf = ReadBuffer::<64>::new();
let mut stream = Cursor::new(b"hello world!");
buf.read_from(&mut stream).expect("unable to read from the stream");

assert_eq!(b"hello world!", buf.view());
assert_eq!(b"hello", buf.consume_next(5));
assert_eq!(b" ", buf.consume_next(1));
assert_eq!(b"world!", buf.consume_next(6));
assert_eq!(b'h', buf.consume_next_byte().unwrap());
assert_eq!(b'e', buf.consume_next_byte().unwrap());
assert_eq!(b'l', buf.consume_next_byte().unwrap());
assert_eq!(b'l', buf.consume_next_byte().unwrap());
assert_eq!(b'o', buf.consume_next_byte().unwrap());
assert_eq!(b' ', buf.consume_next_byte().unwrap());
assert_eq!(b"world!", buf.consume_next(6).unwrap());
assert_eq!(0, buf.available())
}

#[test]
fn should_view_last() {
let mut buf = ReadBuffer::<64>::new();
let mut stream = Cursor::new(b"hello world!");

buf.read_from(&mut stream).expect("unable to read from the stream");

assert_eq!(b"hello world!", buf.view());
assert_eq!(b"world!", buf.view_last(6));
assert_eq!(12, buf.available())
Expand Down
Loading

0 comments on commit 6611047

Please sign in to comment.