Skip to content

Commit 07015ec

Browse files
authored
Add spawn_blocking to runtime (#4623)
We had hard-coded using the unblock crate, now we can use tokio's spawn_blocking in that context --------- Signed-off-by: Nicholas Gates <nick@nickgates.com>
1 parent 17a0f72 commit 07015ec

File tree

11 files changed

+115
-38
lines changed

11 files changed

+115
-38
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ async-trait = "0.1.88"
7575
bindgen = "0.72.0"
7676
bit-vec = "0.8.0"
7777
bitvec = "1.0.1"
78-
blocking = "1.6.2"
7978
bytes = "1.10"
8079
bzip2 = "0.6.0"
8180
cbindgen = "0.29.0"

vortex-io/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ all-features = true
1919
[dependencies]
2020
async-compat = { workspace = true }
2121
async-stream = { workspace = true }
22-
blocking = { workspace = true }
2322
bytes = { workspace = true }
2423
cfg-if = { workspace = true }
2524
futures = { workspace = true, features = ["std", "executor"] }

vortex-io/src/file/object_store.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use std::os::unix::fs::FileExt;
66
use std::sync::Arc;
77

88
use async_compat::Compat;
9-
use blocking::unblock;
109
use futures::future::BoxFuture;
1110
use futures::stream::BoxStream;
1211
use futures::{FutureExt, StreamExt};
@@ -101,6 +100,7 @@ impl IoSource for ObjectStoreIoSource {
101100
let self2 = self.clone();
102101
requests
103102
.map(move |req| {
103+
let handle = self.handle.clone();
104104
let store = self.io.store.clone();
105105
let path = self.io.path.clone();
106106

@@ -130,12 +130,13 @@ impl IoSource for ObjectStoreIoSource {
130130
// The read_exact_at call will either fill the entire buffer or return an error,
131131
// ensuring no uninitialized memory is exposed.
132132
unsafe { buffer.set_len(len) };
133-
unblock(move || {
134-
file.read_exact_at(&mut buffer, range.start)?;
135-
Ok::<_, io::Error>(buffer)
136-
})
137-
.await
138-
.map_err(io::Error::other)?
133+
handle
134+
.spawn_blocking(move || {
135+
file.read_exact_at(&mut buffer, range.start)?;
136+
Ok::<_, io::Error>(buffer)
137+
})
138+
.await
139+
.map_err(io::Error::other)?
139140
}
140141
object_store::GetResultPayload::Stream(mut byte_stream) => {
141142
while let Some(bytes) = byte_stream.next().await {

vortex-io/src/file/std_file.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use std::os::unix::fs::FileExt;
66
use std::path::{Path, PathBuf};
77
use std::sync::Arc;
88

9-
use blocking::unblock;
109
use futures::future::BoxFuture;
1110
use futures::stream::BoxStream;
1211
use futures::{FutureExt, StreamExt};
@@ -29,16 +28,17 @@ impl IntoIoSource for PathBuf {
2928
}
3029

3130
impl IntoIoSource for &Path {
32-
fn into_io_source(self, _handle: Handle) -> VortexResult<IoSourceRef> {
31+
fn into_io_source(self, handle: Handle) -> VortexResult<IoSourceRef> {
3332
let uri = self.to_string_lossy().to_string().into();
3433
let file = Arc::new(File::open(self)?);
35-
Ok(Arc::new(FileIoSource { uri, file }))
34+
Ok(Arc::new(FileIoSource { uri, file, handle }))
3635
}
3736
}
3837

3938
pub(crate) struct FileIoSource {
4039
uri: Arc<str>,
4140
file: Arc<File>,
41+
handle: Handle,
4242
}
4343

4444
impl IoSource for FileIoSource {
@@ -66,20 +66,22 @@ impl IoSource for FileIoSource {
6666
requests
6767
.map(move |req| {
6868
let file = self.file.clone();
69+
let handle = self.handle.clone();
6970
async move {
7071
let offset = req.offset();
7172
let len = req.len();
7273
let alignment = req.alignment();
7374

74-
let result = unblock(move || {
75-
let mut buffer = ByteBufferMut::with_capacity_aligned(len, alignment);
76-
unsafe { buffer.set_len(len) };
77-
match file.read_exact_at(&mut buffer, offset) {
78-
Ok(()) => Ok(buffer.freeze()),
79-
Err(e) => Err(VortexError::from(e)),
80-
}
81-
})
82-
.await;
75+
let result = handle
76+
.spawn_blocking(move || {
77+
let mut buffer = ByteBufferMut::with_capacity_aligned(len, alignment);
78+
unsafe { buffer.set_len(len) };
79+
match file.read_exact_at(&mut buffer, offset) {
80+
Ok(()) => Ok(buffer.freeze()),
81+
Err(e) => Err(VortexError::from(e)),
82+
}
83+
})
84+
.await;
8385
req.resolve(result);
8486
}
8587
})

vortex-io/src/runtime/handle.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,33 @@ impl Handle {
9191
F: FnOnce() -> R + Send + 'static,
9292
R: Send + 'static,
9393
{
94-
// TODO(ngates): we want a droppable handle for this.
9594
let (send, recv) = oneshot::channel();
9695
let abort_handle = self.runtime.spawn_cpu(Box::new(move || {
97-
// Task::detach allows the receiver to be dropped, so we ignore send errors.
98-
let _ = send.send(f());
96+
// Optimistically avoid the work if the result won't be used.
97+
if !send.is_closed() {
98+
// Task::detach allows the receiver to be dropped, so we ignore send errors.
99+
let _ = send.send(f());
100+
}
101+
}));
102+
Task {
103+
recv,
104+
abort_handle: Some(abort_handle),
105+
}
106+
}
107+
108+
/// Spawn a blocking I/O task for execution on the runtime.
109+
pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
110+
where
111+
F: FnOnce() -> R + Send + 'static,
112+
R: Send + 'static,
113+
{
114+
let (send, recv) = oneshot::channel();
115+
let abort_handle = self.runtime.spawn_blocking(Box::new(move || {
116+
// Optimistically avoid the work if the result won't be used.
117+
if !send.is_closed() {
118+
// Task::detach allows the receiver to be dropped, so we ignore send errors.
119+
let _ = send.send(f());
120+
}
99121
}));
100122
Task {
101123
recv,

vortex-io/src/runtime/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ pub(crate) trait Runtime: Send + Sync {
5050
/// yet started executing.
5151
fn spawn_cpu(&self, task: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef;
5252

53+
/// Spawns a blocking I/O task for execution on the runtime.
54+
///
55+
/// The returned `AbortHandle` may be used to optimistically cancel the task if it has not
56+
/// yet started executing.
57+
fn spawn_blocking(&self, task: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef;
58+
5359
/// Spawns an I/O task for execution on the runtime.
5460
/// The runtime can choose to invoke the task's `Send` or `!Send` versions.
5561
///

vortex-io/src/runtime/single.rs

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,17 @@ impl Default for SingleThreadRuntime {
3232
}
3333

3434
struct Sender {
35-
scheduling: kanal::Sender<SpawnFuture<'static>>,
36-
cpu: kanal::Sender<SpawnCpu<'static>>,
35+
scheduling: kanal::Sender<SpawnAsync<'static>>,
36+
cpu: kanal::Sender<SpawnSync<'static>>,
37+
blocking: kanal::Sender<SpawnSync<'static>>,
3738
io: kanal::Sender<IoTask>,
3839
}
3940

4041
impl Sender {
4142
fn new(local: &Rc<LocalExecutor<'static>>) -> Self {
42-
let (scheduling_send, scheduling_recv) = kanal::unbounded::<SpawnFuture>();
43-
let (cpu_send, cpu_recv) = kanal::unbounded::<SpawnCpu>();
43+
let (scheduling_send, scheduling_recv) = kanal::unbounded::<SpawnAsync>();
44+
let (cpu_send, cpu_recv) = kanal::unbounded::<SpawnSync>();
45+
let (blocking_send, blocking_recv) = kanal::unbounded::<SpawnSync>();
4446
let (io_send, io_recv) = kanal::unbounded::<IoTask>();
4547

4648
// We pass weak references to the local executor into the async tasks such that the task's
@@ -68,10 +70,26 @@ impl Sender {
6870
.spawn(async move {
6971
while let Ok(spawn) = cpu_recv.as_async().recv().await {
7072
if let Some(local) = weak_local2.upgrade() {
71-
let cpu = spawn.cpu;
73+
let work = spawn.sync;
7274
// Ignore send errors since it means the caller immediately detached.
7375
let _ = spawn.task_callback.send(SmolAbortHandle::new_handle(
74-
local.spawn(async move { cpu() }),
76+
local.spawn(async move { work() }),
77+
));
78+
}
79+
}
80+
})
81+
.detach();
82+
83+
// Drive blocking tasks.
84+
let weak_local2 = weak_local.clone();
85+
local
86+
.spawn(async move {
87+
while let Ok(spawn) = blocking_recv.as_async().recv().await {
88+
if let Some(local) = weak_local2.upgrade() {
89+
let work = spawn.sync;
90+
// Ignore send errors since it means the caller immediately detached.
91+
let _ = spawn.task_callback.send(SmolAbortHandle::new_handle(
92+
local.spawn(async move { work() }),
7593
));
7694
}
7795
}
@@ -93,6 +111,7 @@ impl Sender {
93111
Self {
94112
scheduling: scheduling_send,
95113
cpu: cpu_send,
114+
blocking: blocking_send,
96115
io: io_send,
97116
}
98117
}
@@ -105,7 +124,7 @@ impl Sender {
105124
impl Runtime for Sender {
106125
fn spawn(&self, future: BoxFuture<'static, ()>) -> AbortHandleRef {
107126
let (send, recv) = oneshot::channel();
108-
if let Err(e) = self.scheduling.send(SpawnFuture {
127+
if let Err(e) = self.scheduling.send(SpawnAsync {
109128
future,
110129
task_callback: send,
111130
}) {
@@ -118,8 +137,21 @@ impl Runtime for Sender {
118137

119138
fn spawn_cpu(&self, cpu: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
120139
let (send, recv) = oneshot::channel();
121-
if let Err(e) = self.cpu.send(SpawnCpu {
122-
cpu,
140+
if let Err(e) = self.cpu.send(SpawnSync {
141+
sync: cpu,
142+
task_callback: send,
143+
}) {
144+
vortex_panic!("Executor missing: {}", e);
145+
}
146+
Box::new(LazyAbortHandle {
147+
task: Mutex::new(recv),
148+
})
149+
}
150+
151+
fn spawn_blocking(&self, work: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
152+
let (send, recv) = oneshot::channel();
153+
if let Err(e) = self.blocking.send(SpawnSync {
154+
sync: work,
123155
task_callback: send,
124156
}) {
125157
vortex_panic!("Executor missing: {}", e);
@@ -198,14 +230,14 @@ where
198230
/// we invert the behaviour of abort and drop. Dropping the abort handle results in the task being
199231
/// detached, whereas dropping the smol::Task results in the task being canceled. This helps avoid
200232
/// a race where the caller detaches the LazyAbortHandle before the smol::Task has been launched.
201-
struct SpawnFuture<'rt> {
233+
struct SpawnAsync<'rt> {
202234
future: BoxFuture<'rt, ()>,
203235
task_callback: oneshot::Sender<AbortHandleRef>,
204236
}
205237

206-
// A spawn request for a CPU job.
207-
struct SpawnCpu<'rt> {
208-
cpu: Box<dyn FnOnce() + Send + 'rt>,
238+
// A spawn request for a synchronous job.
239+
struct SpawnSync<'rt> {
240+
sync: Box<dyn FnOnce() + Send + 'rt>,
209241
task_callback: oneshot::Sender<AbortHandleRef>,
210242
}
211243

vortex-io/src/runtime/smol.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ impl Runtime for Executor<'static> {
1616
SmolAbortHandle::new_handle(self.spawn(async move { task() }))
1717
}
1818

19+
fn spawn_blocking(&self, task: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
20+
SmolAbortHandle::new_handle(smol::unblock(task))
21+
}
22+
1923
fn spawn_io(&self, task: IoTask) {
2024
self.spawn(task.source.drive_send(task.stream)).detach()
2125
}

vortex-io/src/runtime/tokio.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ impl Runtime for CurrentTokioRuntime {
4141
)
4242
}
4343

44+
fn spawn_blocking(&self, task: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
45+
Box::new(TokioHandle::current().spawn_blocking(task).abort_handle())
46+
}
47+
4448
fn spawn_io(&self, task: IoTask) {
4549
TokioHandle::current().spawn(task.source.drive_send(task.stream));
4650
}
@@ -55,6 +59,10 @@ impl Runtime for TokioHandle {
5559
Box::new(TokioHandle::spawn(self, async move { cpu() }).abort_handle())
5660
}
5761

62+
fn spawn_blocking(&self, task: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
63+
Box::new(TokioHandle::spawn_blocking(self, task).abort_handle())
64+
}
65+
5866
fn spawn_io(&self, task: IoTask) {
5967
TokioHandle::spawn(self, task.source.drive_send(task.stream));
6068
}

0 commit comments

Comments
 (0)