Skip to content

Commit c8ed4d4

Browse files
committed
move call_identity_connected to Instance
1 parent efb330d commit c8ed4d4

File tree

3 files changed

+179
-87
lines changed

3 files changed

+179
-87
lines changed

crates/core/src/host/module_host.rs

Lines changed: 130 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -413,10 +413,14 @@ impl Instance {
413413
}
414414
}
415415

416-
async fn call_procedure(&mut self, params: CallProcedureParams) -> Result<ProcedureCallResult, ProcedureCallError> {
416+
fn call_identity_connected(
417+
&mut self,
418+
caller_auth: ConnectionAuthCtx,
419+
caller_connection_id: ConnectionId,
420+
) -> Result<(), ClientConnectedError> {
417421
match self {
418-
Instance::Wasm(inst) => inst.call_procedure(params).await,
419-
Instance::Js(inst) => inst.call_procedure(params).await,
422+
Instance::Wasm(inst) => inst.call_identity_connected(caller_auth, caller_connection_id),
423+
Instance::Js(inst) => inst.call_identity_connected(caller_auth, caller_connection_id),
420424
}
421425
}
422426

@@ -433,6 +437,13 @@ impl Instance {
433437
Instance::Js(inst) => inst.init_database(program),
434438
}
435439
}
440+
441+
async fn call_procedure(&mut self, params: CallProcedureParams) -> Result<ProcedureCallResult, ProcedureCallError> {
442+
match self {
443+
Instance::Wasm(inst) => inst.call_procedure(params).await,
444+
Instance::Js(inst) => inst.call_procedure(params).await,
445+
}
446+
}
436447
}
437448

438449
/// Creates the table for `table_def` in `stdb`.
@@ -550,6 +561,95 @@ fn init_database_inner(
550561
Ok(rcr)
551562
}
552563

564+
pub fn call_identity_connected(
565+
caller_auth: ConnectionAuthCtx,
566+
caller_connection_id: ConnectionId,
567+
module: &ModuleInfo,
568+
call_reducer: impl FnOnce(Option<MutTxId>, CallReducerParams) -> (ReducerCallResult, bool),
569+
trapped_slot: &mut bool,
570+
) -> Result<(), ClientConnectedError> {
571+
let reducer_lookup = module.module_def.lifecycle_reducer(Lifecycle::OnConnect);
572+
let stdb = module.relational_db();
573+
let workload = Workload::reducer_no_args(
574+
"call_identity_connected",
575+
caller_auth.claims.identity,
576+
caller_connection_id,
577+
);
578+
let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload);
579+
let mut mut_tx = scopeguard::guard(mut_tx, |mut_tx| {
580+
// If we crash before committing, we need to ensure that the transaction is rolled back.
581+
// This is necessary to avoid leaving the database in an inconsistent state.
582+
log::debug!("call_identity_connected: rolling back transaction");
583+
let (_, metrics, reducer_name) = mut_tx.rollback();
584+
stdb.report_mut_tx_metrics(reducer_name, metrics, None);
585+
});
586+
587+
mut_tx
588+
.insert_st_client(
589+
caller_auth.claims.identity,
590+
caller_connection_id,
591+
&caller_auth.jwt_payload,
592+
)
593+
.map_err(DBError::from)?;
594+
595+
if let Some((reducer_id, reducer_def)) = reducer_lookup {
596+
// The module defined a lifecycle reducer to handle new connections.
597+
// Call this reducer.
598+
// If the call fails (as in, something unexpectedly goes wrong with guest execution),
599+
// abort the connection: we can't really recover.
600+
let tx = Some(ScopeGuard::into_inner(mut_tx));
601+
let params = ModuleHost::call_reducer_params(
602+
module,
603+
caller_auth.claims.identity,
604+
Some(caller_connection_id),
605+
None,
606+
None,
607+
None,
608+
reducer_id,
609+
reducer_def,
610+
FunctionArgs::Nullary,
611+
)
612+
.map_err(ReducerCallError::from)?;
613+
let (reducer_outcome, trapped) = call_reducer(tx, params);
614+
*trapped_slot = trapped;
615+
616+
match reducer_outcome.outcome {
617+
// If the reducer committed successfully, we're done.
618+
// `WasmModuleInstance::call_reducer_with_tx` has already ensured
619+
// that `st_client` is updated appropriately.
620+
//
621+
// It's necessary to spread out the responsibility for updating `st_client` in this way
622+
// because it's important that `call_identity_connected` commit at most one transaction.
623+
// A naive implementation of this method would just run the reducer first,
624+
// then insert into `st_client`,
625+
// but if we crashed in between, we'd be left in an inconsistent state
626+
// where the reducer had run but `st_client` was not yet updated.
627+
ReducerOutcome::Committed => Ok(()),
628+
629+
// If the reducer returned an error or couldn't run due to insufficient energy,
630+
// abort the connection: the module code has decided it doesn't want this client.
631+
ReducerOutcome::Failed(message) => Err(ClientConnectedError::Rejected(message)),
632+
ReducerOutcome::BudgetExceeded => Err(ClientConnectedError::OutOfEnergy),
633+
}
634+
} else {
635+
// The module doesn't define a client_connected reducer.
636+
// We need to commit the transaction to update st_clients and st_connection_credentials.
637+
//
638+
// This is necessary to be able to disconnect clients after a server crash.
639+
640+
// TODO: Is this being broadcast? Does it need to be, or are st_client table subscriptions
641+
// not allowed?
642+
// I (jsdt) don't think it was being broadcast previously. See:
643+
// https://github.com/clockworklabs/SpacetimeDB/issues/3130
644+
stdb.finish_tx(ScopeGuard::into_inner(mut_tx), Ok(()))
645+
.map_err(|e: DBError| {
646+
log::error!("`call_identity_connected`: finish transaction failed: {e:#?}");
647+
ClientConnectedError::DBError(e)
648+
})?;
649+
Ok(())
650+
}
651+
}
652+
553653
// Only for logging purposes.
554654
const SCHEDULED_REDUCER: &str = "scheduled_reducer";
555655

@@ -1035,85 +1135,8 @@ impl ModuleHost {
10351135
caller_auth: ConnectionAuthCtx,
10361136
caller_connection_id: ConnectionId,
10371137
) -> Result<(), ClientConnectedError> {
1038-
let me = self.clone();
10391138
self.call("call_identity_connected", move |inst| {
1040-
let reducer_lookup = me.info.module_def.lifecycle_reducer(Lifecycle::OnConnect);
1041-
let stdb = &me.module.replica_ctx().relational_db;
1042-
let workload = Workload::reducer_no_args(
1043-
"call_identity_connected",
1044-
caller_auth.claims.identity,
1045-
caller_connection_id,
1046-
);
1047-
let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload);
1048-
let mut mut_tx = scopeguard::guard(mut_tx, |mut_tx| {
1049-
// If we crash before committing, we need to ensure that the transaction is rolled back.
1050-
// This is necessary to avoid leaving the database in an inconsistent state.
1051-
log::debug!("call_identity_connected: rolling back transaction");
1052-
let (_, metrics, reducer_name) = mut_tx.rollback();
1053-
stdb.report_mut_tx_metrics(reducer_name, metrics, None);
1054-
});
1055-
1056-
mut_tx
1057-
.insert_st_client(
1058-
caller_auth.claims.identity,
1059-
caller_connection_id,
1060-
&caller_auth.jwt_payload,
1061-
)
1062-
.map_err(DBError::from)?;
1063-
1064-
if let Some((reducer_id, reducer_def)) = reducer_lookup {
1065-
// The module defined a lifecycle reducer to handle new connections.
1066-
// Call this reducer.
1067-
// If the call fails (as in, something unexpectedly goes wrong with guest execution),
1068-
// abort the connection: we can't really recover.
1069-
let reducer_outcome = me.call_reducer_inner_with_inst(
1070-
Some(ScopeGuard::into_inner(mut_tx)),
1071-
caller_auth.claims.identity,
1072-
Some(caller_connection_id),
1073-
None,
1074-
None,
1075-
None,
1076-
reducer_id,
1077-
reducer_def,
1078-
FunctionArgs::Nullary,
1079-
inst,
1080-
)?;
1081-
1082-
match reducer_outcome.outcome {
1083-
// If the reducer committed successfully, we're done.
1084-
// `WasmModuleInstance::call_reducer_with_tx` has already ensured
1085-
// that `st_client` is updated appropriately.
1086-
//
1087-
// It's necessary to spread out the responsibility for updating `st_client` in this way
1088-
// because it's important that `call_identity_connected` commit at most one transaction.
1089-
// A naive implementation of this method would just run the reducer first,
1090-
// then insert into `st_client`,
1091-
// but if we crashed in between, we'd be left in an inconsistent state
1092-
// where the reducer had run but `st_client` was not yet updated.
1093-
ReducerOutcome::Committed => Ok(()),
1094-
1095-
// If the reducer returned an error or couldn't run due to insufficient energy,
1096-
// abort the connection: the module code has decided it doesn't want this client.
1097-
ReducerOutcome::Failed(message) => Err(ClientConnectedError::Rejected(message)),
1098-
ReducerOutcome::BudgetExceeded => Err(ClientConnectedError::OutOfEnergy),
1099-
}
1100-
} else {
1101-
// The module doesn't define a client_connected reducer.
1102-
// We need to commit the transaction to update st_clients and st_connection_credentials.
1103-
//
1104-
// This is necessary to be able to disconnect clients after a server crash.
1105-
1106-
// TODO: Is this being broadcast? Does it need to be, or are st_client table subscriptions
1107-
// not allowed?
1108-
// I (jsdt) don't think it was being broadcast previously. See:
1109-
// https://github.com/clockworklabs/SpacetimeDB/issues/3130
1110-
stdb.finish_tx(ScopeGuard::into_inner(mut_tx), Ok(()))
1111-
.map_err(|e: DBError| {
1112-
log::error!("`call_identity_connected`: finish transaction failed: {e:#?}");
1113-
ClientConnectedError::DBError(e)
1114-
})?;
1115-
Ok(())
1116-
}
1139+
inst.call_identity_connected(caller_auth, caller_connection_id)
11171140
})
11181141
.await
11191142
.map_err(ReducerCallError::from)?
@@ -1260,6 +1283,32 @@ impl ModuleHost {
12601283
.await?
12611284
}
12621285

1286+
fn call_reducer_params(
1287+
module: &ModuleInfo,
1288+
caller_identity: Identity,
1289+
caller_connection_id: Option<ConnectionId>,
1290+
client: Option<Arc<ClientConnectionSender>>,
1291+
request_id: Option<RequestId>,
1292+
timer: Option<Instant>,
1293+
reducer_id: ReducerId,
1294+
reducer_def: &ReducerDef,
1295+
args: FunctionArgs,
1296+
) -> Result<CallReducerParams, InvalidReducerArguments> {
1297+
let reducer_seed = ArgsSeed(module.module_def.typespace().with_type(reducer_def));
1298+
let args = args.into_tuple(reducer_seed).map_err(InvalidReducerArguments)?;
1299+
let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);
1300+
Ok(CallReducerParams {
1301+
timestamp: Timestamp::now(),
1302+
caller_identity,
1303+
caller_connection_id,
1304+
client,
1305+
request_id,
1306+
timer,
1307+
reducer_id,
1308+
args,
1309+
})
1310+
}
1311+
12631312
async fn call_reducer_inner(
12641313
&self,
12651314
caller_identity: Identity,

crates/core/src/host/v8/mod.rs

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ use super::module_common::{build_common_module_from_raw, run_describer, ModuleCo
1010
use super::module_host::{CallProcedureParams, CallReducerParams, Module, ModuleInfo, ModuleRuntime};
1111
use super::UpdateDatabaseResult;
1212
use crate::host::instance_env::{ChunkPool, InstanceEnv};
13-
use crate::host::module_host::{call_scheduled_reducer, init_database, Instance};
13+
use crate::host::module_host::{
14+
call_identity_connected, call_scheduled_reducer, init_database, ClientConnectedError, Instance,
15+
};
1416
use crate::host::scheduler::QueueItem;
1517
use crate::host::wasm_common::instrumentation::CallTimes;
1618
use crate::host::wasm_common::module_host_actor::{DescribeError, ExecuteResult, ExecutionTimings, InstanceCommon};
@@ -23,9 +25,10 @@ use core::any::type_name;
2325
use core::str;
2426
use enum_as_inner::EnumAsInner;
2527
use itertools::Either;
28+
use spacetimedb_auth::identity::ConnectionAuthCtx;
2629
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
2730
use spacetimedb_datastore::traits::Program;
28-
use spacetimedb_lib::{RawModuleDef, Timestamp};
31+
use spacetimedb_lib::{ConnectionId, RawModuleDef, Timestamp};
2932
use spacetimedb_schema::auto_migrate::MigrationPolicy;
3033
use std::sync::mpsc::{Receiver, SyncSender};
3134
use std::sync::{mpsc, Arc, LazyLock};
@@ -317,6 +320,19 @@ impl JsInstance {
317320
self.send_recv(JsWorkerReply::into_clear_all_clients, JsWorkerRequest::ClearAllClients)
318321
}
319322

323+
pub fn call_identity_connected(
324+
&mut self,
325+
caller_auth: ConnectionAuthCtx,
326+
caller_connection_id: ConnectionId,
327+
) -> Result<(), ClientConnectedError> {
328+
self.can_trap(move |this| {
329+
this.send_recv(
330+
JsWorkerReply::into_call_identity_connected,
331+
JsWorkerRequest::CallIdentityConnected(caller_auth, caller_connection_id),
332+
)
333+
})
334+
}
335+
320336
pub(crate) fn call_scheduled_reducer(&mut self, item: QueueItem) -> Result<ReducerCallResult, ReducerCallError> {
321337
self.can_trap(move |this| {
322338
this.send_recv(
@@ -349,6 +365,7 @@ enum JsWorkerReply {
349365
UpdateDatabase(anyhow::Result<UpdateDatabaseResult>),
350366
CallReducer((ReducerCallResult, bool)),
351367
ClearAllClients(anyhow::Result<()>),
368+
CallIdentityConnected((Result<(), ClientConnectedError>, bool)),
352369
CallScheduledReducer((Result<ReducerCallResult, ReducerCallError>, bool)),
353370
InitDatabase((anyhow::Result<Option<ReducerCallResult>>, bool)),
354371
}
@@ -371,6 +388,8 @@ enum JsWorkerRequest {
371388
},
372389
/// See [`JsInstance::clear_all_clients`].
373390
ClearAllClients,
391+
/// See [`JsInstance::call_identity_connected`].
392+
CallIdentityConnected(ConnectionAuthCtx, ConnectionId),
374393
/// See [`JsInstance::call_scheduled_reducer`].
375394
CallScheduledReducer(QueueItem),
376395
/// See [`JsInstance::init_database`].
@@ -465,6 +484,7 @@ fn spawn_instance_worker(
465484
};
466485

467486
// Setup the instance common and environment.
487+
let info = &module_common.info();
468488
let mut instance_common = InstanceCommon::new(&module_common);
469489
let replica_ctx: &Arc<ReplicaContext> = module_common.replica_ctx();
470490
let scheduler = module_common.scheduler().clone();
@@ -506,12 +526,21 @@ fn spawn_instance_worker(
506526
let res = instance_common.clear_all_clients();
507527
reply("clear_all_clients", JsWorkerReply::ClearAllClients(res));
508528
}
529+
JsWorkerRequest::CallIdentityConnected(caller_auth, caller_connection_id) => {
530+
let call_reducer = |tx, params| {
531+
call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params)
532+
};
533+
let mut trapped = false;
534+
let res =
535+
call_identity_connected(caller_auth, caller_connection_id, info, call_reducer, &mut trapped);
536+
let res = (res, trapped);
537+
reply("call_identity_connected", JsWorkerReply::CallIdentityConnected(res));
538+
}
509539
JsWorkerRequest::CallScheduledReducer(queue_item) => {
510540
let call_reducer = |tx, params| {
511541
call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params)
512542
};
513-
let module = module_common.info();
514-
let res = call_scheduled_reducer(&module, queue_item, call_reducer);
543+
let res = call_scheduled_reducer(info, queue_item, call_reducer);
515544
reply("call_scheduled_reducer", JsWorkerReply::CallScheduledReducer(res));
516545
}
517546
JsWorkerRequest::InitDatabase(program) => {

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use bytes::Bytes;
22
use prometheus::{Histogram, IntCounter, IntGauge};
3+
use spacetimedb_auth::identity::ConnectionAuthCtx;
34
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
45
use spacetimedb_lib::de::DeserializeSeed;
56
use spacetimedb_primitives::ProcedureId;
@@ -20,8 +21,8 @@ use crate::host::module_host::{
2021
};
2122
use crate::host::scheduler::QueueItem;
2223
use crate::host::{
23-
call_scheduled_reducer, init_database, ArgsTuple, ProcedureCallError, ProcedureCallResult, ReducerCallResult,
24-
ReducerId, ReducerOutcome, Scheduler,
24+
call_identity_connected, call_scheduled_reducer, init_database, ArgsTuple, ProcedureCallError, ProcedureCallResult,
25+
ReducerCallResult, ReducerId, ReducerOutcome, Scheduler,
2526
};
2627
use crate::identity::Identity;
2728
use crate::messages::control_db::HostType;
@@ -281,6 +282,19 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
281282
self.common.clear_all_clients()
282283
}
283284

285+
pub fn call_identity_connected(
286+
&mut self,
287+
caller_auth: ConnectionAuthCtx,
288+
caller_connection_id: ConnectionId,
289+
) -> Result<(), ClientConnectedError> {
290+
let module = &self.common.info.clone();
291+
let call_reducer = |tx, params| self.call_reducer_inner(tx, params);
292+
let mut trapped = false;
293+
let res = call_identity_connected(caller_auth, caller_connection_id, module, call_reducer, &mut trapped);
294+
self.trapped = trapped;
295+
res
296+
}
297+
284298
pub(crate) fn call_scheduled_reducer(&mut self, item: QueueItem) -> Result<ReducerCallResult, ReducerCallError> {
285299
let module = &self.common.info.clone();
286300
let call_reducer = |tx, params| self.call_reducer_inner(tx, params);

0 commit comments

Comments
 (0)