Skip to content

Commit 5a709e3

Browse files
authored
io_uring: change Completable to not return io::Result (#7702)
1 parent 5efb1c3 commit 5a709e3

File tree

4 files changed

+57
-30
lines changed

4 files changed

+57
-30
lines changed

tokio/src/fs/write.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> {
7272
let mut buf_offset: usize = 0;
7373
let mut file_offset: u64 = 0;
7474
while buf_offset < total {
75-
let (n, _buf, _fd) = Op::write_at(fd, buf, buf_offset, file_offset)?.await?;
75+
let (n, _buf, _fd) = Op::write_at(fd, buf, buf_offset, file_offset)?.await;
76+
// TODO: handle EINT here
77+
let n = n?;
7678
if n == 0 {
7779
return Err(io::ErrorKind::WriteZero.into());
7880
}

tokio/src/io/uring/open.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
use super::utils::cstr;
2-
use crate::{
3-
fs::UringOpenOptions,
4-
runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op},
5-
};
2+
3+
use crate::fs::UringOpenOptions;
4+
use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op};
5+
66
use io_uring::{opcode, types};
7-
use std::{ffi::CString, io, os::fd::FromRawFd, path::Path};
7+
use std::ffi::CString;
8+
use std::io::{self, Error};
9+
use std::os::fd::FromRawFd;
10+
use std::path::Path;
811

912
#[derive(Debug)]
1013
pub(crate) struct Open {
@@ -15,11 +18,14 @@ pub(crate) struct Open {
1518
}
1619

1720
impl Completable for Open {
18-
type Output = crate::fs::File;
19-
fn complete(self, cqe: CqeResult) -> io::Result<Self::Output> {
20-
let fd = cqe.result? as i32;
21-
let file = unsafe { crate::fs::File::from_raw_fd(fd) };
22-
Ok(file)
21+
type Output = io::Result<crate::fs::File>;
22+
fn complete(self, cqe: CqeResult) -> Self::Output {
23+
cqe.result
24+
.map(|fd| unsafe { crate::fs::File::from_raw_fd(fd as i32) })
25+
}
26+
27+
fn complete_with_error(self, err: Error) -> Self::Output {
28+
Err(err)
2329
}
2430
}
2531

tokio/src/io/uring/write.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
1-
use crate::{
2-
runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op},
3-
util::as_ref::OwnedBuf,
4-
};
1+
use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op};
2+
use crate::util::as_ref::OwnedBuf;
3+
54
use io_uring::{opcode, types};
6-
use std::{
7-
io,
8-
os::fd::{AsRawFd, OwnedFd},
9-
};
5+
use std::io::{self, Error};
6+
use std::os::fd::{AsRawFd, OwnedFd};
107

118
#[derive(Debug)]
129
pub(crate) struct Write {
@@ -15,9 +12,13 @@ pub(crate) struct Write {
1512
}
1613

1714
impl Completable for Write {
18-
type Output = (u32, OwnedBuf, OwnedFd);
19-
fn complete(self, cqe: CqeResult) -> io::Result<Self::Output> {
20-
Ok((cqe.result?, self.buf, self.fd))
15+
type Output = (io::Result<u32>, OwnedBuf, OwnedFd);
16+
fn complete(self, cqe: CqeResult) -> Self::Output {
17+
(cqe.result, self.buf, self.fd)
18+
}
19+
20+
fn complete_with_error(self, err: Error) -> Self::Output {
21+
(Err(err), self.buf, self.fd)
2122
}
2223
}
2324

tokio/src/runtime/driver/op.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
use crate::io::uring::open::Open;
22
use crate::io::uring::write::Write;
33
use crate::runtime::Handle;
4+
45
use io_uring::cqueue;
56
use io_uring::squeue::Entry;
67
use std::future::Future;
8+
use std::io::{self, Error};
9+
use std::mem;
710
use std::pin::Pin;
8-
use std::task::Context;
9-
use std::task::Poll;
10-
use std::task::Waker;
11-
use std::{io, mem};
11+
use std::task::{Context, Poll, Waker};
1212

1313
// This field isn't accessed directly, but it holds cancellation data,
1414
// so `#[allow(dead_code)]` is needed.
@@ -110,7 +110,13 @@ impl From<cqueue::Entry> for CqeResult {
110110
/// A trait that converts a CQE result into a usable value for each operation.
111111
pub(crate) trait Completable {
112112
type Output;
113-
fn complete(self, cqe: CqeResult) -> io::Result<Self::Output>;
113+
fn complete(self, cqe: CqeResult) -> Self::Output;
114+
115+
// This is used when you want to terminate an operation with an error.
116+
//
117+
// The `Op` type that implements this trait can return the passed error
118+
// upstream by embedding it in the `Output`.
119+
fn complete_with_error(self, error: Error) -> Self::Output;
114120
}
115121

116122
/// Extracts the `CancelData` needed to safely cancel an in-flight io_uring operation.
@@ -121,7 +127,7 @@ pub(crate) trait Cancellable {
121127
impl<T: Cancellable> Unpin for Op<T> {}
122128

123129
impl<T: Cancellable + Completable + Send> Future for Op<T> {
124-
type Output = io::Result<T::Output>;
130+
type Output = T::Output;
125131

126132
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
127133
let this = self.get_mut();
@@ -132,9 +138,21 @@ impl<T: Cancellable + Completable + Send> Future for Op<T> {
132138
State::Initialize(entry_opt) => {
133139
let entry = entry_opt.take().expect("Entry must be present");
134140
let waker = cx.waker().clone();
141+
135142
// SAFETY: entry is valid for the entire duration of the operation
136-
let idx = unsafe { driver.register_op(entry, waker)? };
137-
this.state = State::Polled(idx);
143+
match unsafe { driver.register_op(entry, waker) } {
144+
Ok(idx) => this.state = State::Polled(idx),
145+
Err(err) => {
146+
let data = this
147+
.take_data()
148+
.expect("Data must be present on Initialization");
149+
150+
this.state = State::Complete;
151+
152+
return Poll::Ready(data.complete_with_error(err));
153+
}
154+
};
155+
138156
Poll::Pending
139157
}
140158

0 commit comments

Comments
 (0)