Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ring options #39

Merged
merged 3 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ path = "h1-server.rs"
name = "h1-server-multishot"
path = "h1-server-multishot.rs"

[[example]]
name = "proactor-config-fwrite"
path = "proactor-config-fwrite.rs"

[[example]]
name = "tcp-server"
path = "tcp-server.rs"
Expand Down
69 changes: 69 additions & 0 deletions examples/proactor-config-fwrite.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use nuclei::*;
use std::fs::{File, OpenOptions};
use std::io;
use std::path::PathBuf;
use std::time::Duration;

Check warning on line 5 in examples/proactor-config-fwrite.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused import: `std::time::Duration`

Check warning on line 5 in examples/proactor-config-fwrite.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused import: `std::time::Duration`

use futures::io::SeekFrom;
use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use nuclei::config::{IoUringConfiguration, NucleiConfig};

const DARK_MATTER_TEXT: &'static str = "\
Dark matter is a form of matter thought to account for approximately \
85% of the matter in the universe and about a quarter of its total \
mass–energy density or about 2.241×10−27 kg/m3. Its presence is implied \
in a variety of astrophysical observations, including gravitational effects \
that cannot be explained by accepted theories of gravity unless more matter \
is present than can be seen. For this reason, most experts think that dark \
matter is abundant in the universe and that it has had a strong influence \
on its structure and evolution. Dark matter is called dark because it does \
not appear to interact with the electromagnetic field, which means it doesn't \
absorb, reflect or emit electromagnetic radiation, and is therefore difficult \
to detect.[1]\
\
";

// #[nuclei::main]
fn main() -> io::Result<()> {
let nuclei_config = NucleiConfig {
// Other options for IO_URING are:
// * low_latency_driven,
// * kernel_poll_only
// * io_poll
iouring: IoUringConfiguration::interrupt_driven(1 << 11),
};
let _ = Proactor::with_config(nuclei_config);

// Approximately ~75,9 MB
let dark_matter = vec![DARK_MATTER_TEXT; 100_000].join("\n");

Check warning on line 38 in examples/proactor-config-fwrite.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused variable: `dark_matter`

Check warning on line 38 in examples/proactor-config-fwrite.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused variable: `dark_matter`

let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("data");
path.push("dark-matter");

drive(async {
// Approximately ~75,9 MB
let dark_matter = vec![DARK_MATTER_TEXT; 100_000].join("\n");

let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("data");
path.push("dark-matter");

let fo = OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.unwrap();
let mut file = Handle::<File>::new(fo).unwrap();
file.write_all(dark_matter.as_bytes()).await.unwrap();

let mut buf = vec![];
assert!(file.seek(SeekFrom::Start(0)).await.is_ok());
assert_eq!(file.read_to_end(&mut buf).await.unwrap(), dark_matter.len());
assert_eq!(&buf[0..dark_matter.len()], dark_matter.as_bytes());

println!("Length of file is {}", buf.len());
});

Ok(())
}
28 changes: 14 additions & 14 deletions src/async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ impl AsyncRead for Handle<File> {
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
let mut store = &mut self.get_mut().store_file;
let store = &mut self.get_mut().store_file;

if let Some(mut store_file) = store.as_mut() {
if let Some(store_file) = store.as_mut() {
let fd: RawFd = store_file.receive_fd();
let op_state = store_file.op_state();
let (_, pos) = store_file.bufpair();
Expand Down Expand Up @@ -239,10 +239,10 @@ const NON_READ: &[u8] = &[];

#[cfg(all(feature = "iouring", target_os = "linux"))]
impl AsyncBufRead for Handle<File> {
fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let mut store = &mut self.get_mut().store_file;
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let store = &mut self.get_mut().store_file;

if let Some(mut store_file) = store.as_mut() {
if let Some(store_file) = store.as_mut() {
let fd: RawFd = store_file.receive_fd();
let op_state = store_file.op_state();
let (bufp, pos) = store_file.bufpair();
Expand All @@ -267,7 +267,7 @@ impl AsyncBufRead for Handle<File> {
}

fn consume(self: Pin<&mut Self>, amt: usize) {
let mut store = self.get_mut().store_file.as_mut().unwrap();
let store = self.get_mut().store_file.as_mut().unwrap();
store.buf().consume(amt);
}
}
Expand All @@ -279,9 +279,9 @@ impl AsyncWrite for Handle<File> {
cx: &mut Context<'_>,
bufslice: &[u8],
) -> Poll<io::Result<usize>> {
let mut store = &mut self.get_mut().store_file;
let store = &mut self.get_mut().store_file;

if let Some(mut store_file) = store.as_mut() {
if let Some(store_file) = store.as_mut() {
let fd: RawFd = store_file.receive_fd();
let op_state = store_file.op_state();
let (bufp, pos) = store_file.bufpair();
Expand Down Expand Up @@ -319,9 +319,9 @@ impl AsyncWrite for Handle<File> {
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
let mut store = &mut self.get_mut().store_file;
let store = &mut self.get_mut().store_file;

if let Some(mut store_file) = store.as_mut() {
if let Some(store_file) = store.as_mut() {
let fd: RawFd = store_file.receive_fd();
let op_state = store_file.op_state();
let (_, pos) = store_file.bufpair();
Expand Down Expand Up @@ -349,9 +349,9 @@ impl AsyncWrite for Handle<File> {
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut store = &mut self.get_mut().store_file;
let store = &mut self.get_mut().store_file;

if let Some(mut store_file) = store.as_mut() {
if let Some(store_file) = store.as_mut() {
let fd: RawFd = store_file.receive_fd();
let op_state = store_file.op_state();

Expand All @@ -377,7 +377,7 @@ impl AsyncSeek for Handle<File> {
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
let mut store = &mut self.get_mut().store_file.as_mut().unwrap();
let store = &mut self.get_mut().store_file.as_mut().unwrap();

let (cursor, offset) = match pos {
io::SeekFrom::Start(n) => {
Expand All @@ -392,7 +392,7 @@ impl AsyncSeek for Handle<File> {
}
};
let valid_seek = if offset.is_negative() {
match cursor.checked_sub(offset.abs() as usize) {
match cursor.checked_sub(offset.unsigned_abs() as usize) {
Some(valid_seek) => valid_seek,
None => {
let invalid = io::Error::from(io::ErrorKind::InvalidInput);
Expand Down
73 changes: 73 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,89 @@ pub struct IoUringConfiguration {
/// If [None] passed unbounded workers will be limited by the process task limit,
/// as indicated by the rlimit [RLIMIT_NPROC](https://man7.org/linux/man-pages/man2/getrlimit.2.html) limit.
pub per_numa_unbounded_worker_count: Option<u32>,

/// This argument allows aggressively waiting on CQ(completion queue) to have low latency of IO completions.
/// Basically, this argument polls CQEs(completion queue events) directly on cq at userspace.
/// Mind that, using this increase pressure on CPUs from userspace side. By default, nuclei reaps the CQ with
/// aggressive wait. This is double polling approach for nuclei where Kernel gets submissions by itself (SQPOLL),
/// processes it and puts completions to completion queue and we immediately pick up without latency
/// (aggressive_poll).
///
/// **[default]**: `true`.
pub aggressive_poll: bool,

/// Perform busy-waiting for I/O completion events, as opposed to getting notifications via an
/// asynchronous IRQ (Interrupt Request). This will reduce latency, but increases CPU usage.
/// This is only usable on file systems that support polling and files opened with `O_DIRECT`.
///
/// **[default]**: `false`.
pub iopoll_enabled: bool,
// XXX: `redrive_kthread_wake` = bool, syncs queue changes so kernel threads got awakened. increased cpu usage.
}

impl IoUringConfiguration {
///
/// Standard way to use IO_URING. No polling, purely IRQ awaken IO completion.
/// This is a normal way to process IO, mind that with this approach
/// `actual completion time != userland reception of completion`.
/// Throughput is low compared to all the other config alternatives.
///
/// **NOTE:** If you don't know what to select as configuration, please select this.
pub fn interrupt_driven(queue_len: u32) -> Self {
Self {
queue_len,
sqpoll_wake_interval: None,
per_numa_bounded_worker_count: None,
per_numa_unbounded_worker_count: None,
aggressive_poll: false,
iopoll_enabled: false,
}
}

///
/// Low Latency Driven version of IO_URING, where it is suitable for high traffic environments.
/// High throughput low latency solution where it consumes a lot of resources.
pub fn low_latency_driven(queue_len: u32) -> Self {
Self {
queue_len,
sqpoll_wake_interval: Some(2),
aggressive_poll: true,
..Self::default()
}
}

///
/// Kernel poll only version of IO_URING, where it is suitable for high traffic environments.
/// This version won't allow aggressive polling on completion queue(CQ).
pub fn kernel_poll_only(queue_len: u32) -> Self {
Self {
queue_len,
sqpoll_wake_interval: Some(2),
aggressive_poll: true,
..Self::default()
}
}

///
/// IOPOLL enabled ring configuration for operating on files with low-latency.
pub fn io_poll(queue_len: u32) -> Self {
Self {
queue_len,
iopoll_enabled: true,
..Self::default()
}
}
}

impl Default for IoUringConfiguration {
fn default() -> Self {
Self {
queue_len: 1 << 11,
sqpoll_wake_interval: Some(2),
per_numa_bounded_worker_count: Some(1 << 8),
per_numa_unbounded_worker_count: Some(1 << 9),
aggressive_poll: true,
iopoll_enabled: false,
}
}
}
13 changes: 6 additions & 7 deletions src/proactor.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::ops::DerefMut;
use std::task::{Context, Poll};
use std::time::Duration;
use std::{future::Future, io};

use crate::config::NucleiConfig;
use once_cell::sync::{Lazy, OnceCell};
use once_cell::sync::OnceCell;

use super::syscore::*;
use super::waker::*;
Expand All @@ -25,7 +24,7 @@ impl Proactor {
/// Returns a reference to the proactor.
pub fn get() -> &'static Proactor {
unsafe {
&PROACTOR.get_or_init(|| {
PROACTOR.get_or_init(|| {
Proactor(
SysProactor::new(NucleiConfig::default())
.expect("cannot initialize IO backend"),
Expand All @@ -37,15 +36,14 @@ impl Proactor {
/// Builds a proactor instance with given config and returns a reference to it.
pub fn with_config(config: NucleiConfig) -> &'static Proactor {
unsafe {
let mut proactor =
let proactor =
Proactor(SysProactor::new(config.clone()).expect("cannot initialize IO backend"));
PROACTOR
.set(proactor)
.map_err(|e| "Proactor instance not being able to set.")
.unwrap();
let proactor =
Proactor(SysProactor::new(config).expect("cannot initialize IO backend"));
&PROACTOR.get_or_init(|| proactor)

PROACTOR.wait()
}
}

Expand Down Expand Up @@ -133,6 +131,7 @@ mod proactor_tests {
sqpoll_wake_interval: Some(11),
per_numa_bounded_worker_count: Some(12),
per_numa_unbounded_worker_count: Some(13),
..IoUringConfiguration::default()
},
};
let new = Proactor::with_config(config);
Expand Down
4 changes: 2 additions & 2 deletions src/syscore/linux/iouring/fs/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl Buffer {
unsafe {
let data: *mut u8 = self.data.cast().as_ptr();
let cap = self.cap - self.pos;
slice::from_raw_parts(data.offset(self.pos as isize), cap as usize)
slice::from_raw_parts(data.add(self.pos), cap)
}
} else {
&[]
Expand Down Expand Up @@ -81,7 +81,7 @@ impl Buffer {

#[inline(always)]
pub fn consume(&mut self, amt: usize) {
self.pos = cmp::min(self.pos + amt as usize, self.cap);
self.pos = cmp::min(self.pos + amt, self.cap);
}

#[inline(always)]
Expand Down
2 changes: 0 additions & 2 deletions src/syscore/linux/iouring/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,4 @@ pub(crate) mod buffer;
pub(crate) mod cancellation;
pub(crate) mod store_file;

pub(crate) use buffer::*;
pub(crate) use cancellation::*;
pub(crate) use store_file::*;
8 changes: 2 additions & 6 deletions src/syscore/linux/iouring/fs/store_file.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
use crate::Handle;
use lever::sync::prelude::TTas;
use std::fs::File;
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::os::unix::io::{FromRawFd, RawFd};
use std::pin::Pin;
use std::sync::Arc;

use super::buffer::Buffer;
use crate::syscore::Processor;
use lever::sync::atomics::AtomicBox;
use pin_utils::unsafe_pinned;
use std::task::{Context, Poll};

pub struct StoreFile {
fd: RawFd,
Expand Down Expand Up @@ -87,7 +83,7 @@ impl StoreFile {
&mut self.pos
}

pub(crate) fn guard_op(self: &mut Self, op: Op) {
pub(crate) fn guard_op(&mut self, op: Op) {
// let this = unsafe { Pin::get_unchecked_mut(self) };
// if *self.op_state.get() != Op::Pending && *self.op_state.get() != op {
// self.cancel();
Expand Down
Loading
Loading