Skip to content

Commit fba1933

Browse files
committed
fix DurabilityWorker drop order bug
1 parent fe7d98b commit fba1933

File tree

1 file changed

+47
-17
lines changed

1 file changed

+47
-17
lines changed

crates/core/src/db/durability.rs

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::db::persistence::Durability;
2-
use futures::{channel::mpsc, StreamExt};
32
use spacetimedb_commitlog::payload::{
43
txdata::{Mutations, Ops},
54
Txdata,
@@ -9,11 +8,15 @@ use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData};
98
use spacetimedb_durability::DurableOffset;
109
use spacetimedb_primitives::TableId;
1110
use std::sync::Arc;
12-
13-
/// A request to persist a transaction.
14-
pub struct DurabilityRequest {
15-
reducer_context: Option<ReducerContext>,
16-
tx_data: Arc<TxData>,
11+
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
12+
13+
/// A request to persist a transaction or to terminate the actor.
14+
pub enum DurabilityRequest {
15+
Work {
16+
reducer_context: Option<ReducerContext>,
17+
tx_data: Arc<TxData>,
18+
},
19+
Close,
1720
}
1821

1922
/// Represents a handle to a background task that persists transactions
@@ -23,14 +26,27 @@ pub struct DurabilityRequest {
2326
/// before sending over to the `Durability` layer.
2427
#[derive(Clone)]
2528
pub struct DurabilityWorker {
26-
request_tx: mpsc::UnboundedSender<DurabilityRequest>,
29+
request_tx: UnboundedSender<DurabilityRequest>,
2730
durability: Arc<Durability>,
2831
}
2932

33+
/// Those who run seem to have all the fun... 🎶
34+
const HUNG_UP: &str = "durability actor hung up / panicked";
35+
36+
impl Drop for DurabilityWorker {
37+
fn drop(&mut self) {
38+
self.request_tx.send(DurabilityRequest::Close).expect(HUNG_UP);
39+
40+
// Wait until the actor's `Arc<Durability>` has been dropped.
41+
// After that, we drop `self.durability` as normal.
42+
futures::executor::block_on(self.request_tx.closed());
43+
}
44+
}
45+
3046
impl DurabilityWorker {
3147
/// Create a new [`DurabilityWorker`] using the given `durability` policy.
3248
pub fn new(durability: Arc<Durability>) -> Self {
33-
let (request_tx, request_rx) = mpsc::unbounded();
49+
let (request_tx, request_rx) = unbounded_channel();
3450

3551
let actor = DurabilityWorkerActor {
3652
request_rx,
@@ -57,11 +73,11 @@ impl DurabilityWorker {
5773
/// which is likely due to it having panicked.
5874
pub fn request_durability(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
5975
self.request_tx
60-
.unbounded_send(DurabilityRequest {
76+
.send(DurabilityRequest::Work {
6177
reducer_context,
6278
tx_data: tx_data.clone(),
6379
})
64-
.expect("durability worker panicked");
80+
.expect(HUNG_UP);
6581
}
6682

6783
/// Get the [`DurableOffset`] of this database.
@@ -71,19 +87,33 @@ impl DurabilityWorker {
7187
}
7288

7389
pub struct DurabilityWorkerActor {
74-
request_rx: mpsc::UnboundedReceiver<DurabilityRequest>,
90+
request_rx: UnboundedReceiver<DurabilityRequest>,
7591
durability: Arc<Durability>,
7692
}
7793

7894
impl DurabilityWorkerActor {
7995
/// Processes requests to do durability.
8096
async fn run(mut self) {
81-
while let Some(DurabilityRequest {
82-
reducer_context,
83-
tx_data,
84-
}) = self.request_rx.next().await
85-
{
86-
Self::do_durability(&*self.durability, reducer_context, &tx_data);
97+
while let Some(req) = self.request_rx.recv().await {
98+
match req {
99+
DurabilityRequest::Work {
100+
reducer_context,
101+
tx_data,
102+
} => Self::do_durability(&*self.durability, reducer_context, &tx_data),
103+
104+
// Terminate the actor
105+
// and make sure we drop `self.durability`
106+
// before we drop `self.request_tx`.
107+
//
108+
// After a `Close`,
109+
// there should be no more `Work` incoming or buffered,
110+
// as `Close` is sent in `Drop`.
111+
DurabilityRequest::Close => {
112+
drop(self.durability);
113+
drop(self.request_rx);
114+
return;
115+
}
116+
}
87117
}
88118
}
89119

0 commit comments

Comments
 (0)