Skip to content

Commit 240c5d9

Browse files
goffrieConvex, Inc.
authored and
Convex, Inc.
committed
Make SpawnHandle shutdown on drop by default (#36297)
This prevents a common class of bugs on startup where a spawn handle can be created, but then the initialization process hits an error and the task gets leaked. GitOrigin-RevId: 31a4b3f4dc37bce00bee1379d3e9e5bdfb866a25
1 parent 0d8e136 commit 240c5d9

File tree

10 files changed

+124
-47
lines changed

10 files changed

+124
-47
lines changed

crates/application/src/cron_jobs/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,8 @@ impl<RT: Runtime> CronJobExecutor<RT> {
231231
);
232232
let context = self.clone();
233233
let tx = job_finished_tx.clone();
234-
self.rt.spawn(
234+
// TODO: cancel this handle with the application
235+
self.rt.spawn_background(
235236
"spawn_cron_job",
236237
async move {
237238
select_biased! {

crates/application/src/lib.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1220,7 +1220,8 @@ impl<RT: Runtime> Application<RT> {
12201220
// Spawn running the action in a separate future. This way, even if we
12211221
// get cancelled, it will continue to run to completion.
12221222
let (tx, rx) = oneshot::channel();
1223-
self.runtime.spawn("run_action", async move {
1223+
// TODO: cancel this handle with the application
1224+
self.runtime.spawn_background("run_action", async move {
12241225
let result = run_action.await;
12251226
// Don't log errors if the caller has gone away.
12261227
_ = tx.send(result);
@@ -1278,24 +1279,26 @@ impl<RT: Runtime> Application<RT> {
12781279
.map(|ctx| Span::root(format!("{}::http_actions_future", func_path!()), ctx))
12791280
.unwrap_or(Span::noop());
12801281
let response_streamer_ = response_streamer.clone();
1281-
self.runtime.spawn("run_http_action", async move {
1282-
let result = runner
1283-
.run_http_action(
1284-
request_id,
1285-
http_request,
1286-
response_streamer_,
1287-
identity,
1288-
caller,
1289-
)
1290-
.in_span(span)
1291-
.await;
1292-
if let Err(Err(mut e)) = tx.send(result) {
1293-
// If the caller has gone away, and the result is a system error,
1294-
// log to sentry.
1295-
report_error(&mut e).await;
1296-
}
1297-
rt_.pause_client().wait("end_run_http_action").await;
1298-
});
1282+
// TODO: cancel this handle with the application
1283+
self.runtime
1284+
.spawn_background("run_http_action", async move {
1285+
let result = runner
1286+
.run_http_action(
1287+
request_id,
1288+
http_request,
1289+
response_streamer_,
1290+
identity,
1291+
caller,
1292+
)
1293+
.in_span(span)
1294+
.await;
1295+
if let Err(Err(mut e)) = tx.send(result) {
1296+
// If the caller has gone away, and the result is a system error,
1297+
// log to sentry.
1298+
report_error(&mut e).await;
1299+
}
1300+
rt_.pause_client().wait("end_run_http_action").await;
1301+
});
12991302
let result = rx
13001303
.await
13011304
.context("run_http_action one shot sender dropped prematurely?")?;

crates/application/src/scheduled_jobs/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,8 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
328328
&mut self.rt.rng(),
329329
BTreeMap::new(),
330330
);
331-
self.rt.spawn(
331+
// TODO: cancel this handle with the application
332+
self.rt.spawn_background(
332333
"spawn_scheduled_job",
333334
async move {
334335
context.execute_job(job, job_id).await;

crates/aws_s3/src/storage.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -672,19 +672,21 @@ impl<RT: Runtime> Drop for S3Upload<RT> {
672672
fn drop(&mut self) {
673673
if self.needs_abort_on_drop {
674674
let fut = self._abort();
675-
self.runtime.spawn("abort_multipart_upload", async move {
676-
if let Err(e) = fut.await {
677-
// abort-multipart-upload is idempotent. It has the following properties.
678-
//
679-
// abort after a successful abort - succeeds
680-
// abort after a successful complete - succeeds
681-
// complete after a successful abort - fails with a descriptive error.
682-
report_error(
683-
&mut anyhow::anyhow!(e).context("Couldn't async abort multipart upload"),
684-
)
685-
.await;
686-
}
687-
});
675+
self.runtime
676+
.spawn_background("abort_multipart_upload", async move {
677+
if let Err(e) = fut.await {
678+
// abort-multipart-upload is idempotent. It has the following properties.
679+
//
680+
// abort after a successful abort - succeeds
681+
// abort after a successful complete - succeeds
682+
// complete after a successful abort - fails with a descriptive error.
683+
report_error(
684+
&mut anyhow::anyhow!(e)
685+
.context("Couldn't async abort multipart upload"),
686+
)
687+
.await;
688+
}
689+
});
688690
}
689691
}
690692
}

crates/common/src/runtime/mod.rs

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,18 @@ impl From<tokio::task::JoinError> for JoinError {
9696
}
9797
}
9898

99+
#[must_use = "Tasks are canceled when their `SpawnHandle` is dropped."]
99100
pub trait SpawnHandle: Send + Sync {
101+
/// Stops the spawned task "soon". This happens asynchronously.
100102
fn shutdown(&mut self);
103+
/// Wait for the spawned task to finish. Don't use this function directly,
104+
/// call `.join()` instead.
101105
fn poll_join(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), JoinError>>;
106+
/// Allows the spawned task to keep running indefinitely. By default, a task
107+
/// is shut down on drop.
108+
fn detach(self: Box<Self>);
109+
/// Wait for the spawned task to finish. Returns an error if the task was
110+
/// canceled (using `.shutdown()`) or panicked.
102111
fn join<'a>(mut self) -> impl Future<Output = Result<(), JoinError>> + 'a
103112
where
104113
Self: Sized + 'a,
@@ -115,6 +124,10 @@ impl<T: SpawnHandle + ?Sized> SpawnHandle for Box<T> {
115124
fn poll_join(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), JoinError>> {
116125
(**self).poll_join(cx)
117126
}
127+
128+
fn detach(self: Box<Self>) {
129+
(*self).detach()
130+
}
118131
}
119132

120133
impl dyn SpawnHandle {
@@ -123,27 +136,61 @@ impl dyn SpawnHandle {
123136
pub fn join(self: Box<Self>) -> impl Future<Output = Result<(), JoinError>> {
124137
SpawnHandle::join(self)
125138
}
139+
140+
/// Wait for the spawn task to finish, but if the returned future is
141+
/// canceled, the spawned task continues running as though it were
142+
/// `detach()`ed.
143+
pub fn join_or_detach(self: Box<Self>) -> impl Future<Output = Result<(), JoinError>> {
144+
struct DetachOnDrop(Option<Box<dyn SpawnHandle>>);
145+
impl Drop for DetachOnDrop {
146+
fn drop(&mut self) {
147+
self.0.take().expect("lost spawn handle?").detach();
148+
}
149+
}
150+
let mut handle = DetachOnDrop(Some(self));
151+
future::poll_fn(move |cx| handle.0.as_mut().expect("lost spawn handle?").poll_join(cx))
152+
}
153+
154+
pub fn shutdown_and_join(self: Box<Self>) -> impl Future<Output = anyhow::Result<()>> {
155+
shutdown_and_join(self)
156+
}
126157
}
127158

128159
pub struct TokioSpawnHandle {
129-
handle: tokio::task::JoinHandle<()>,
160+
handle: Option<tokio::task::JoinHandle<()>>,
130161
}
131162

132163
impl From<tokio::task::JoinHandle<()>> for TokioSpawnHandle {
133164
fn from(handle: tokio::task::JoinHandle<()>) -> Self {
134-
Self { handle }
165+
Self {
166+
handle: Some(handle),
167+
}
135168
}
136169
}
137170

138171
impl SpawnHandle for TokioSpawnHandle {
139172
fn shutdown(&mut self) {
140-
self.handle.abort();
173+
self.handle.as_ref().expect("shutdown after detach").abort();
141174
}
142175

143176
fn poll_join(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), JoinError>> {
144-
std::task::ready!(Pin::new(&mut self.handle).poll(cx))?;
177+
std::task::ready!(
178+
Pin::new(&mut self.handle.as_mut().expect("poll after detach")).poll(cx)
179+
)?;
145180
Poll::Ready(Ok(()))
146181
}
182+
183+
fn detach(mut self: Box<Self>) {
184+
self.handle.take();
185+
}
186+
}
187+
188+
impl Drop for TokioSpawnHandle {
189+
fn drop(&mut self) {
190+
if let Some(handle) = &self.handle {
191+
handle.abort();
192+
}
193+
}
147194
}
148195

149196
/// Shutdown the associated future, preempting it at its next yield point, and
@@ -230,16 +277,29 @@ pub trait Runtime: Clone + Sync + Send + 'static {
230277
fn wait(&self, duration: Duration) -> Pin<Box<dyn FusedFuture<Output = ()> + Send + 'static>>;
231278

232279
/// Spawn a future on the runtime's executor.
280+
///
281+
/// The spawned task will be canceled if the returned `SpawnHandle` is
282+
/// dropped, unless `detach()` is called on it.
233283
fn spawn(
234284
&self,
235285
name: &'static str,
236286
f: impl Future<Output = ()> + Send + 'static,
237287
) -> Box<dyn SpawnHandle>;
238288

289+
/// Shorthand for `spawn().detach()`
290+
///
291+
/// This should only be used for tasks that are best-effort (e.g. cleaning
292+
/// up partial progress) or that are truly process-global.
293+
fn spawn_background(&self, name: &'static str, f: impl Future<Output = ()> + Send + 'static) {
294+
self.spawn(name, f).detach()
295+
}
296+
239297
/// Spawn a future on a reserved OS thread. This is only really necessary
240298
/// for libraries like `V8` that care about being called from a
241299
/// particular thread.
242-
#[must_use = "Threads are canceled when their `SpawnHandle` is dropped."]
300+
///
301+
/// The spawned task will be canceled if the returned `SpawnHandle` is
302+
/// dropped, unless `detach()` is called on it.
243303
fn spawn_thread<Fut: Future<Output = ()>, F: FnOnce() -> Fut + Send + 'static>(
244304
&self,
245305
name: &str,

crates/isolate/src/tests/fetch.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ async fn test_fetch_basic(rt: ProdRuntime) -> anyhow::Result<()> {
215215
.expect("invalid response")
216216
}),
217217
);
218-
rt.spawn("test_server", serve(router, 4545));
218+
let _server = rt.spawn("test_server", serve(router, 4545));
219219
let redirected_router = Router::new().route(
220220
"/print_auth",
221221
get(|req: Request<Body>| async move {
@@ -232,7 +232,7 @@ async fn test_fetch_basic(rt: ProdRuntime) -> anyhow::Result<()> {
232232
.expect("invalid response")
233233
}),
234234
);
235-
rt.spawn("test_router", serve(redirected_router, 4547));
235+
let _router = rt.spawn("test_router", serve(redirected_router, 4547));
236236

237237
let t = UdfTest::default(rt).await?;
238238
must_let!(let (ConvexValue::String(r), _outcome, log_lines) = t.action_outcome_and_log_lines(
@@ -301,7 +301,7 @@ async fn test_fetch_timing(rt: ProdRuntime) -> anyhow::Result<()> {
301301
.expect("invalid response")
302302
}),
303303
);
304-
rt.spawn("test_router", serve(router, 4546));
304+
let _router = rt.spawn("test_router", serve(router, 4546));
305305

306306
let t = UdfTest::default(rt.clone()).await?;
307307

@@ -354,7 +354,7 @@ async fn test_fetch_abort(rt: ProdRuntime) -> anyhow::Result<()> {
354354
Response::builder().body(Body::from("ok")).unwrap()
355355
}),
356356
);
357-
rt.spawn("test_router", serve(router, 4548));
357+
let _router = rt.spawn("test_router", serve(router, 4548));
358358

359359
// fetchAbort is an action that fetches from /pause, and in parallel it
360360
// waits for triggerAbort. When triggerAbort is called, it aborts the fetch

crates/load_generator/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -451,9 +451,9 @@ fn main() -> Result<(), MainError> {
451451
.map(|addr| register_prometheus_exporter(runtime.clone(), addr));
452452
let load_generator = async move {
453453
run(&config).await?;
454-
if let Some((mut handle, flush)) = maybe_metrics {
454+
if let Some((handle, flush)) = maybe_metrics {
455455
flush().await;
456-
handle.shutdown();
456+
handle.shutdown_and_join().await?;
457457
}
458458
Ok::<_, MainError>(())
459459
};

crates/local_backend/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ pub async fn make_app(
243243
config.beacon_tag.clone(),
244244
config.beacon_fields.clone(),
245245
);
246-
runtime.spawn("beacon_worker", beacon_future);
246+
runtime.spawn_background("beacon_worker", beacon_future);
247247
}
248248

249249
let app_state = LocalAppState {

crates/mysql/src/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ impl<RT: Runtime> Drop for ConvexMySqlPool<RT> {
523523
return;
524524
};
525525
let pool = self.pool.clone();
526-
runtime.spawn("mysql_pool_disconnect", async move {
526+
runtime.spawn_background("mysql_pool_disconnect", async move {
527527
let _ = pool.disconnect().await;
528528
tracing::info!("ConvexMySqlPool pool successfully closed");
529529
});

crates/runtime/src/prod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,16 @@ impl SpawnHandle for ThreadHandle {
8888
Poll::Ready(Ok(()))
8989
}
9090
}
91+
92+
fn detach(mut self: Box<Self>) {
93+
self.cancel.take();
94+
}
95+
}
96+
97+
impl Drop for ThreadHandle {
98+
fn drop(&mut self) {
99+
self.shutdown();
100+
}
91101
}
92102

93103
impl ThreadHandle {

0 commit comments

Comments
 (0)