-
Notifications
You must be signed in to change notification settings - Fork 122
/
runtime.rs
91 lines (79 loc) · 2.47 KB
/
runtime.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use crate::driver::Driver;
use std::future::Future;
use std::io;
use tokio::io::unix::AsyncFd;
use tokio::task::LocalSet;
pub(crate) struct Runtime {
/// io-uring driver
driver: AsyncFd<Driver>,
/// LocalSet for !Send tasks
local: LocalSet,
/// Tokio runtime, always current-thread
rt: tokio::runtime::Runtime,
}
/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
///
/// Spawning a task enables the task to execute concurrently to other tasks.
/// There is no guarantee that a spawned task will execute to completion. When a
/// runtime is shutdown, all outstanding tasks are dropped, regardless of the
/// lifecycle of that task.
///
/// This function must be called from the context of a `tokio-uring` runtime.
///
/// [`JoinHandle`]: tokio::task::JoinHandle
///
/// # Examples
///
/// In this example, a server is started and `spawn` is used to start a new task
/// that processes each received connection.
///
/// ```no_run
/// fn main() {
/// tokio_uring::start(async {
/// let handle = tokio_uring::spawn(async {
/// println!("hello from a background task");
/// });
///
/// // Let the task complete
/// handle.await.unwrap();
/// });
/// }
/// ```
pub fn spawn<T: std::future::Future + 'static>(task: T) -> tokio::task::JoinHandle<T::Output> {
tokio::task::spawn_local(task)
}
impl Runtime {
pub(crate) fn new() -> io::Result<Runtime> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let local = LocalSet::new();
let driver = {
let _guard = rt.enter();
AsyncFd::new(Driver::new()?)?
};
Ok(Runtime { driver, local, rt })
}
pub(crate) fn block_on<F>(&mut self, future: F) -> F::Output
where
F: Future,
{
self.driver.get_ref().with(|| {
let drive = async {
loop {
// Wait for read-readiness
let mut guard = self.driver.readable().await.unwrap();
self.driver.get_ref().tick();
guard.clear_ready();
}
};
tokio::pin!(drive);
tokio::pin!(future);
self.rt
.block_on(self.local.run_until(crate::future::poll_fn(|cx| {
assert!(drive.as_mut().poll(cx).is_pending());
future.as_mut().poll(cx)
})))
})
}
}