Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Igni/windowstests #3

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
WIP
  • Loading branch information
o0Ignition0o committed Jul 19, 2020
commit a58ddca5bd8970d2035e2c191e0d6c600722366d
62 changes: 31 additions & 31 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
[package]
name = "nuclei"
version = "0.1.0"
authors = ["Mahmut Bulut <vertexclique@gmail.com>"]
edition = "2018"
description = "Proactive IO & runtime system"
keywords = ["io", "async", "runtime", "uring", "iouring", "proactor"]
categories = ["concurrency", "asynchronous"]
homepage = "https://github.com/vertexclique/nuclei"
repository = "https://github.com/vertexclique/nuclei"
description = "Proactive IO & runtime system"
documentation = "https://docs.rs/nuclei"
license = "Apache-2.0/MIT"
readme = "README.md"
edition = "2018"
exclude = [
"data/*",
".github/*",
"examples/*",
"graphstore/*",
"tests/*",
"img/*",
"ci/*",
"benches/*",
"doc/*",
"*.png",
"*.dot",
"*.yml",
"*.toml",
"*.md"
"data/*",
".github/*",
"examples/*",
"graphstore/*",
"tests/*",
"img/*",
"ci/*",
"benches/*",
"doc/*",
"*.png",
"*.dot",
"*.yml",
"*.toml",
"*.md",
]
homepage = "https://github.com/vertexclique/nuclei"
keywords = ["io", "async", "runtime", "uring", "iouring", "proactor"]
license = "Apache-2.0/MIT"
name = "nuclei"
readme = "README.md"
repository = "https://github.com/vertexclique/nuclei"
version = "0.1.0"

[features]
default = ["bastion", "iouring"]
Expand All @@ -35,29 +35,29 @@ default = ["bastion", "iouring"]
epoll = []
iouring = ["iou", "uring-sys"]

bastion = ["agnostik/runtime_bastion"]
tokio = ["agnostik/runtime_tokio"]
asyncstd = ["agnostik/runtime_asyncstd"]
bastion = ["agnostik/runtime_bastion"]
smol = ["agnostik/runtime_smol"]
tokio = ["agnostik/runtime_tokio"]

[dependencies]
lever = "0.1.1-alpha.8"
futures = "0.3.5"
futures-util = "0.3.5"
socket2 = { version = "0.3.12", features = ["pair", "unix"] }
libc = "0.2"
pin-utils = "0.1.0"
once_cell = "1.4.0"
agnostik = "0.1.3"
pin-utils = "0.1.0"
socket2 = {version = "0.3.12", features = ["pair", "unix"]}

# Other backends
[target.'cfg(target_os = "linux")'.dependencies]
iou = { version = "0.0.0-ringbahn.1", optional = true }
uring-sys = { version = "0.6.1", optional = true }
iou = {version = "0.0.0-ringbahn.1", optional = true}
uring-sys = {version = "0.6.1", optional = true}

[dev-dependencies]
anyhow = "1.0.31"
async-h1 = "2.0.2"
async-dup = "1.1.0"
async-h1 = "2.0.2"
http-types = "2.2.1"
num_cpus = "1.13.0"
num_cpus = "1.13.0"
93 changes: 58 additions & 35 deletions src/async_io.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,36 @@
use std::{io, task};
use std::{task::Poll, fs::File, pin::Pin, task::Context};
use super::handle::Handle;
use futures::io::{AsyncRead, AsyncWrite, SeekFrom};
use super::submission_handler::SubmissionHandler;
use futures::io::SeekFrom;
use futures::io::{AsyncRead, AsyncWrite};
use std::io::Read;
use std::{fs::File, pin::Pin, task::Context, task::Poll};
use std::{io, task};

use std::net::{TcpStream};
use std::net::TcpStream;

#[cfg(unix)]
use std::{mem::ManuallyDrop, os::unix::io::{AsRawFd, RawFd, FromRawFd}, os::unix::prelude::*};
use std::{
mem::ManuallyDrop,
os::unix::io::{AsRawFd, FromRawFd},
};
#[cfg(unix)]
use std::os::unix::net::{UnixStream};

use std::future::Future;
use std::{
mem::ManuallyDrop,
os::unix::io::{AsRawFd, FromRawFd, RawFd},
os::unix::prelude::*,
};

#[cfg(unix)]
use crate::syscore::Processor;
use crate::syscore::*;
use std::sync::Arc;
use crate::Proactor;
use futures::{AsyncBufRead, AsyncSeek};
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
use futures_util::pending_once;
use lever::prelude::TTas;
use crate::Proactor;
use std::future::Future;
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;

//
// Proxy operations for Future registration via AsyncRead, AsyncWrite and others.
Expand All @@ -39,7 +47,7 @@ macro_rules! impl_async_read {
Pin::new(&mut &*Pin::get_mut(self)).poll_read(cx, buf)
}
}
}
};
}

macro_rules! impl_async_write {
Expand All @@ -61,12 +69,12 @@ macro_rules! impl_async_write {
Pin::new(&mut &*Pin::get_mut(self)).poll_close(cx)
}
}
}
};
}

#[cfg(not(all(feature = "iouring", target_os = "linux")))]
#[cfg(all(not(feature = "iouring"), target_os = "linux"))]
impl_async_read!(File);
#[cfg(not(all(feature = "iouring", target_os = "linux")))]
#[cfg(all(not(feature = "iouring"), target_os = "linux"))]
impl_async_write!(File);

impl_async_read!(TcpStream);
Expand All @@ -77,14 +85,17 @@ impl_async_read!(UnixStream);
#[cfg(unix)]
impl_async_write!(UnixStream);


///////////////////////////////////
///// Non proactive File
///////////////////////////////////

#[cfg(not(all(feature = "iouring", target_os = "linux")))]
#[cfg(all(not(feature = "iouring"), target_os = "linux"))]
impl AsyncRead for &Handle<File> {
fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let raw_fd = self.as_raw_fd();
let buf_len = buf.len();
let buf = buf.as_mut_ptr();
Expand All @@ -103,9 +114,13 @@ impl AsyncRead for &Handle<File> {
}
}

#[cfg(not(all(feature = "iouring", target_os = "linux")))]
#[cfg(all(not(feature = "iouring"), target_os = "linux"))]
impl AsyncWrite for &Handle<File> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let raw_fd = self.as_raw_fd();
let buf_len = buf.len();
let buf = buf.as_ptr();
Expand All @@ -132,7 +147,6 @@ impl AsyncWrite for &Handle<File> {
}
}


///////////////////////////////////
///// IO URING / Proactive / Linux
///////////////////////////////////
Expand All @@ -153,10 +167,13 @@ impl Handle<File> {
}
}


#[cfg(all(feature = "iouring", target_os = "linux"))]
impl AsyncRead for Handle<File> {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let mut inner = futures::ready!(self.as_mut().poll_fill_buf(cx))?;
let len = io::Read::read(&mut inner, buf)?;
self.consume(len);
Expand Down Expand Up @@ -185,7 +202,7 @@ impl AsyncBufRead for Handle<File> {
match fut.as_mut().poll(cx)? {
Poll::Ready(n) => {
*pos += n;
break Poll::Ready(Ok(n))
break Poll::Ready(Ok(n));
}
_ => {}
}
Expand All @@ -202,10 +219,13 @@ impl AsyncBufRead for Handle<File> {
}
}


#[cfg(all(feature = "iouring", target_os = "linux"))]
impl AsyncWrite for Handle<File> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
todo!()
}

Expand All @@ -220,7 +240,11 @@ impl AsyncWrite for Handle<File> {

#[cfg(all(feature = "iouring", target_os = "linux"))]
impl AsyncSeek for Handle<File> {
fn poll_seek(self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<io::Result<u64>> {
fn poll_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
let mut store = &mut self.get_mut().store_file.as_mut().unwrap();

let (whence, offset) = match pos {
Expand All @@ -229,7 +253,7 @@ impl AsyncSeek for Handle<File> {
return Poll::Ready(Ok(*store.pos() as u64));
}
io::SeekFrom::Current(n) => (*store.pos(), n),
io::SeekFrom::End(n) => {
io::SeekFrom::End(n) => {
let fut = store.poll_file_size();
futures::pin_mut!(fut);
(futures::ready!(fut.as_mut().poll(cx))?, n)
Expand Down Expand Up @@ -263,7 +287,11 @@ impl AsyncSeek for Handle<File> {

#[cfg(unix)]
impl AsyncRead for &Handle<TcpStream> {
fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let raw_fd = self.as_raw_fd();
let buf_len = buf.len();
let buf = buf.as_mut_ptr();
Expand Down Expand Up @@ -311,7 +339,6 @@ impl AsyncWrite for &Handle<TcpStream> {
}
}


#[cfg(unix)]
impl AsyncRead for &Handle<UnixStream> {
fn poll_read(
Expand Down Expand Up @@ -339,11 +366,7 @@ impl AsyncRead for &Handle<UnixStream> {

#[cfg(unix)]
impl AsyncWrite for &Handle<UnixStream> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<io::Result<usize>> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
let raw_fd = self.as_raw_fd();
let buf_len = buf.len();
let buf = buf.as_ptr();
Expand Down
15 changes: 11 additions & 4 deletions src/handle.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
use std::fmt;
use std::{pin::Pin, future::Future, io, ops::{DerefMut, Deref}, sync::Arc};
use lever::prelude::*;

use pin_utils::unsafe_unpinned;
#[cfg(unix)]
use crate::syscore::{CompletionChan, StoreFile};
use pin_utils::unsafe_unpinned;
use std::fmt;
use std::{
future::Future,
io,
ops::{Deref, DerefMut},
pin::Pin,
sync::Arc,
};

pub type AsyncOp<T> = Pin<Box<dyn Future<Output = io::Result<T>>>>;

Expand All @@ -16,6 +23,7 @@ pub struct Handle<T> {
/// IO task element
pub(crate) io_task: Option<T>,
/// Notification channel
#[cfg(unix)]
pub(crate) chan: Option<CompletionChan>,
/// File operation storage
pub(crate) store_file: Option<StoreFile>,
Expand Down Expand Up @@ -69,7 +77,6 @@ impl<T> HandleOpRegisterer for &Handle<T> {
}
}


impl<T> Deref for Handle<T> {
type Target = T;

Expand Down