Skip to content

Commit 48f2e1f

Browse files
committed
move call_identity_disconnected + disconnect_clien to Instance
1 parent c8ed4d4 commit 48f2e1f

File tree

3 files changed

+139
-64
lines changed

3 files changed

+139
-64
lines changed

crates/core/src/host/module_host.rs

Lines changed: 54 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,24 @@ impl Instance {
424424
}
425425
}
426426

427+
fn call_identity_disconnected(
428+
&mut self,
429+
caller_identity: Identity,
430+
caller_connection_id: ConnectionId,
431+
) -> Result<(), ReducerCallError> {
432+
match self {
433+
Instance::Wasm(inst) => inst.call_identity_disconnected(caller_identity, caller_connection_id),
434+
Instance::Js(inst) => inst.call_identity_disconnected(caller_identity, caller_connection_id),
435+
}
436+
}
437+
438+
fn disconnect_client(&mut self, client_id: ClientActorId) -> Result<(), ReducerCallError> {
439+
match self {
440+
Instance::Wasm(inst) => inst.disconnect_client(client_id),
441+
Instance::Js(inst) => inst.disconnect_client(client_id),
442+
}
443+
}
444+
427445
fn call_scheduled_reducer(&mut self, item: QueueItem) -> Result<ReducerCallResult, ReducerCallError> {
428446
match self {
429447
Instance::Wasm(inst) => inst.call_scheduled_reducer(item),
@@ -1103,20 +1121,32 @@ impl ModuleHost {
11031121

11041122
pub async fn disconnect_client(&self, client_id: ClientActorId) {
11051123
log::trace!("disconnecting client {client_id}");
1106-
let this = self.clone();
11071124
if let Err(e) = self
1108-
.call("disconnect_client", move |inst| {
1109-
// Call the `client_disconnected` reducer, if it exists.
1110-
// This is a no-op if the module doesn't define such a reducer.
1111-
this.subscriptions().remove_subscriber(client_id);
1112-
this.call_identity_disconnected_inner(client_id.identity, client_id.connection_id, inst)
1113-
})
1125+
.call("disconnect_client", move |inst| inst.disconnect_client(client_id))
11141126
.await
11151127
{
11161128
log::error!("Error from client_disconnected transaction: {e}");
11171129
}
11181130
}
11191131

1132+
pub fn disconnect_client_inner(
1133+
client_id: ClientActorId,
1134+
info: &ModuleInfo,
1135+
call_reducer: impl FnOnce(Option<MutTxId>, CallReducerParams) -> (ReducerCallResult, bool),
1136+
trapped_slot: &mut bool,
1137+
) -> Result<(), ReducerCallError> {
1138+
// Call the `client_disconnected` reducer, if it exists.
1139+
// This is a no-op if the module doesn't define such a reducer.
1140+
info.subscriptions.remove_subscriber(client_id);
1141+
Self::call_identity_disconnected_inner(
1142+
client_id.identity,
1143+
client_id.connection_id,
1144+
info,
1145+
call_reducer,
1146+
trapped_slot,
1147+
)
1148+
}
1149+
11201150
/// Invoke the module's `client_connected` reducer, if it has one,
11211151
/// and insert a new row into `st_client` for `(caller_identity, caller_connection_id)`.
11221152
///
@@ -1147,12 +1177,15 @@ impl ModuleHost {
11471177
/// If the reducer fails, the rows are still deleted.
11481178
/// Calling this on an already-disconnected client is a no-op.
11491179
pub fn call_identity_disconnected_inner(
1150-
&self,
11511180
caller_identity: Identity,
11521181
caller_connection_id: ConnectionId,
1153-
inst: &mut Instance,
1182+
info: &ModuleInfo,
1183+
call_reducer: impl FnOnce(Option<MutTxId>, CallReducerParams) -> (ReducerCallResult, bool),
1184+
trapped_slot: &mut bool,
11541185
) -> Result<(), ReducerCallError> {
1155-
let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect);
1186+
let stdb = info.relational_db();
1187+
1188+
let reducer_lookup = info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect);
11561189
let reducer_name = reducer_lookup
11571190
.as_ref()
11581191
.map(|(_, def)| &*def.name)
@@ -1162,12 +1195,9 @@ impl ModuleHost {
11621195

11631196
let workload = || Workload::reducer_no_args(reducer_name, caller_identity, caller_connection_id);
11641197

1165-
let me = self.clone();
1166-
let stdb = me.module.replica_ctx().relational_db.clone();
1167-
11681198
// A fallback transaction that deletes the client from `st_client`.
1199+
let database_identity = stdb.database_identity();
11691200
let fallback = || {
1170-
let database_identity = me.info.database_identity;
11711201
stdb.with_auto_commit(workload(), |mut_tx| {
11721202
if !is_client_exist(mut_tx) {
11731203
// The client is already gone. Nothing to do.
@@ -1207,8 +1237,9 @@ impl ModuleHost {
12071237
// The module defined a lifecycle reducer to handle disconnects. Call it.
12081238
// If it succeeds, `WasmModuleInstance::call_reducer_with_tx` has already ensured
12091239
// that `st_client` is updated appropriately.
1210-
let result = me.call_reducer_inner_with_inst(
1211-
Some(mut_tx),
1240+
let tx = Some(mut_tx);
1241+
let result = Self::call_reducer_params(
1242+
info,
12121243
caller_identity,
12131244
Some(caller_connection_id),
12141245
None,
@@ -1217,8 +1248,12 @@ impl ModuleHost {
12171248
reducer_id,
12181249
reducer_def,
12191250
FunctionArgs::Nullary,
1220-
inst,
1221-
);
1251+
)
1252+
.map(|params| {
1253+
let (res, trapped) = call_reducer(tx, params);
1254+
*trapped_slot = trapped;
1255+
res
1256+
});
12221257

12231258
// If it failed, we still need to update `st_client`: the client's not coming back.
12241259
// Commit a separate transaction that just updates `st_client`.
@@ -1270,9 +1305,8 @@ impl ModuleHost {
12701305
caller_identity: Identity,
12711306
caller_connection_id: ConnectionId,
12721307
) -> Result<(), ReducerCallError> {
1273-
let me = self.clone();
12741308
self.call("call_identity_disconnected", move |inst| {
1275-
me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst)
1309+
inst.call_identity_disconnected(caller_identity, caller_connection_id)
12761310
})
12771311
.await?
12781312
}
@@ -1340,37 +1374,6 @@ impl ModuleHost {
13401374
})
13411375
.await?)
13421376
}
1343-
fn call_reducer_inner_with_inst(
1344-
&self,
1345-
tx: Option<MutTxId>,
1346-
caller_identity: Identity,
1347-
caller_connection_id: Option<ConnectionId>,
1348-
client: Option<Arc<ClientConnectionSender>>,
1349-
request_id: Option<RequestId>,
1350-
timer: Option<Instant>,
1351-
reducer_id: ReducerId,
1352-
reducer_def: &ReducerDef,
1353-
args: FunctionArgs,
1354-
module_instance: &mut Instance,
1355-
) -> Result<ReducerCallResult, ReducerCallError> {
1356-
let reducer_seed = ArgsSeed(self.info.module_def.typespace().with_type(reducer_def));
1357-
let args = args.into_tuple(reducer_seed).map_err(InvalidReducerArguments)?;
1358-
let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);
1359-
1360-
Ok(module_instance.call_reducer(
1361-
tx,
1362-
CallReducerParams {
1363-
timestamp: Timestamp::now(),
1364-
caller_identity,
1365-
caller_connection_id,
1366-
client,
1367-
request_id,
1368-
timer,
1369-
reducer_id,
1370-
args,
1371-
},
1372-
))
1373-
}
13741377

13751378
pub async fn call_reducer(
13761379
&self,

crates/core/src/host/v8/mod.rs

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use self::syscall::{call_call_reducer, call_describe_module, call_reducer_fun, r
99
use super::module_common::{build_common_module_from_raw, run_describer, ModuleCommon};
1010
use super::module_host::{CallProcedureParams, CallReducerParams, Module, ModuleInfo, ModuleRuntime};
1111
use super::UpdateDatabaseResult;
12+
use crate::client::ClientActorId;
1213
use crate::host::instance_env::{ChunkPool, InstanceEnv};
1314
use crate::host::module_host::{
1415
call_identity_connected, call_scheduled_reducer, init_database, ClientConnectedError, Instance,
@@ -17,7 +18,7 @@ use crate::host::scheduler::QueueItem;
1718
use crate::host::wasm_common::instrumentation::CallTimes;
1819
use crate::host::wasm_common::module_host_actor::{DescribeError, ExecuteResult, ExecutionTimings, InstanceCommon};
1920
use crate::host::wasm_common::{RowIters, TimingSpanSet};
20-
use crate::host::{ReducerCallError, ReducerCallResult, Scheduler};
21+
use crate::host::{ModuleHost, ReducerCallError, ReducerCallResult, Scheduler};
2122
use crate::module_host_context::{ModuleCreationContext, ModuleCreationContextLimited};
2223
use crate::replica_context::ReplicaContext;
2324
use crate::util::asyncify;
@@ -28,7 +29,7 @@ use itertools::Either;
2829
use spacetimedb_auth::identity::ConnectionAuthCtx;
2930
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
3031
use spacetimedb_datastore::traits::Program;
31-
use spacetimedb_lib::{ConnectionId, RawModuleDef, Timestamp};
32+
use spacetimedb_lib::{ConnectionId, Identity, RawModuleDef, Timestamp};
3233
use spacetimedb_schema::auto_migrate::MigrationPolicy;
3334
use std::sync::mpsc::{Receiver, SyncSender};
3435
use std::sync::{mpsc, Arc, LazyLock};
@@ -333,6 +334,28 @@ impl JsInstance {
333334
})
334335
}
335336

337+
pub fn call_identity_disconnected(
338+
&mut self,
339+
caller_identity: Identity,
340+
caller_connection_id: ConnectionId,
341+
) -> Result<(), ReducerCallError> {
342+
self.can_trap(move |this| {
343+
this.send_recv(
344+
JsWorkerReply::into_call_identity_disconnected,
345+
JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id),
346+
)
347+
})
348+
}
349+
350+
pub fn disconnect_client(&mut self, client_id: ClientActorId) -> Result<(), ReducerCallError> {
351+
self.can_trap(move |this| {
352+
this.send_recv(
353+
JsWorkerReply::into_disconnect_client,
354+
JsWorkerRequest::DisconnectClient(client_id),
355+
)
356+
})
357+
}
358+
336359
pub(crate) fn call_scheduled_reducer(&mut self, item: QueueItem) -> Result<ReducerCallResult, ReducerCallError> {
337360
self.can_trap(move |this| {
338361
this.send_recv(
@@ -366,6 +389,8 @@ enum JsWorkerReply {
366389
CallReducer((ReducerCallResult, bool)),
367390
ClearAllClients(anyhow::Result<()>),
368391
CallIdentityConnected((Result<(), ClientConnectedError>, bool)),
392+
CallIdentityDisconnected((Result<(), ReducerCallError>, bool)),
393+
DisconnectClient((Result<(), ReducerCallError>, bool)),
369394
CallScheduledReducer((Result<ReducerCallResult, ReducerCallError>, bool)),
370395
InitDatabase((anyhow::Result<Option<ReducerCallResult>>, bool)),
371396
}
@@ -390,6 +415,10 @@ enum JsWorkerRequest {
390415
ClearAllClients,
391416
/// See [`JsInstance::call_identity_connected`].
392417
CallIdentityConnected(ConnectionAuthCtx, ConnectionId),
418+
/// See [`JsInstance::call_identity_disconnected`].
419+
CallIdentityDisconnected(Identity, ConnectionId),
420+
/// See [`JsInstance::disconnect_client`].
421+
DisconnectClient(ClientActorId),
393422
/// See [`JsInstance::call_scheduled_reducer`].
394423
CallScheduledReducer(QueueItem),
395424
/// See [`JsInstance::init_database`].
@@ -503,6 +532,9 @@ fn spawn_instance_worker(
503532
}
504533
};
505534
for request in request_rx.iter() {
535+
let mut call_reducer =
536+
|tx, params| call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params);
537+
506538
match request {
507539
JsWorkerRequest::UpdateDatabase {
508540
program,
@@ -519,34 +551,46 @@ fn spawn_instance_worker(
519551
// but rather let this happen by `return_instance` using `JsInstance::trapped`
520552
// which will cause `JsInstance` to be dropped,
521553
// which in turn results in the loop being terminated.
522-
let res = call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params);
554+
let res = call_reducer(tx, params);
523555
reply("call_reducer", JsWorkerReply::CallReducer(res));
524556
}
525557
JsWorkerRequest::ClearAllClients => {
526558
let res = instance_common.clear_all_clients();
527559
reply("clear_all_clients", JsWorkerReply::ClearAllClients(res));
528560
}
529561
JsWorkerRequest::CallIdentityConnected(caller_auth, caller_connection_id) => {
530-
let call_reducer = |tx, params| {
531-
call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params)
532-
};
533562
let mut trapped = false;
534563
let res =
535564
call_identity_connected(caller_auth, caller_connection_id, info, call_reducer, &mut trapped);
536565
let res = (res, trapped);
537566
reply("call_identity_connected", JsWorkerReply::CallIdentityConnected(res));
538567
}
568+
JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id) => {
569+
let mut trapped = false;
570+
let res = ModuleHost::call_identity_disconnected_inner(
571+
caller_identity,
572+
caller_connection_id,
573+
info,
574+
call_reducer,
575+
&mut trapped,
576+
);
577+
let res = (res, trapped);
578+
reply(
579+
"call_identity_disconnected",
580+
JsWorkerReply::CallIdentityDisconnected(res),
581+
);
582+
}
583+
JsWorkerRequest::DisconnectClient(client_id) => {
584+
let mut trapped = false;
585+
let res = ModuleHost::disconnect_client_inner(client_id, info, call_reducer, &mut trapped);
586+
let res = (res, trapped);
587+
reply("disconnect_client", JsWorkerReply::DisconnectClient(res));
588+
}
539589
JsWorkerRequest::CallScheduledReducer(queue_item) => {
540-
let call_reducer = |tx, params| {
541-
call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params)
542-
};
543590
let res = call_scheduled_reducer(info, queue_item, call_reducer);
544591
reply("call_scheduled_reducer", JsWorkerReply::CallScheduledReducer(res));
545592
}
546593
JsWorkerRequest::InitDatabase(program) => {
547-
let call_reducer = |tx, params| {
548-
call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params)
549-
};
550594
let res = init_database(replica_ctx, &module_common.info().module_def, program, call_reducer);
551595
reply("init_database", JsWorkerReply::InitDatabase(res));
552596
}

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::time::Duration;
1111
use tracing::span::EnteredSpan;
1212

1313
use super::instrumentation::CallTimes;
14-
use crate::client::ClientConnectionSender;
14+
use crate::client::{ClientActorId, ClientConnectionSender};
1515
use crate::database_logger;
1616
use crate::energy::{EnergyMonitor, ReducerBudget, ReducerFingerprint};
1717
use crate::host::instance_env::InstanceEnv;
@@ -295,6 +295,34 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
295295
res
296296
}
297297

298+
pub fn call_identity_disconnected(
299+
&mut self,
300+
caller_identity: Identity,
301+
caller_connection_id: ConnectionId,
302+
) -> Result<(), ReducerCallError> {
303+
let module = &self.common.info.clone();
304+
let call_reducer = |tx, params| self.call_reducer_inner(tx, params);
305+
let mut trapped = false;
306+
let res = ModuleHost::call_identity_disconnected_inner(
307+
caller_identity,
308+
caller_connection_id,
309+
module,
310+
call_reducer,
311+
&mut trapped,
312+
);
313+
self.trapped = trapped;
314+
res
315+
}
316+
317+
pub fn disconnect_client(&mut self, client_id: ClientActorId) -> Result<(), ReducerCallError> {
318+
let module = &self.common.info.clone();
319+
let call_reducer = |tx, params| self.call_reducer_inner(tx, params);
320+
let mut trapped = false;
321+
let res = ModuleHost::disconnect_client_inner(client_id, module, call_reducer, &mut trapped);
322+
self.trapped = trapped;
323+
res
324+
}
325+
298326
pub(crate) fn call_scheduled_reducer(&mut self, item: QueueItem) -> Result<ReducerCallResult, ReducerCallError> {
299327
let module = &self.common.info.clone();
300328
let call_reducer = |tx, params| self.call_reducer_inner(tx, params);

0 commit comments

Comments
 (0)