Skip to content

Commit 3da1329

Browse files
committed
make JsInstance methods async
1 parent 81b862a commit 3da1329

File tree

2 files changed

+232
-232
lines changed

2 files changed

+232
-232
lines changed

crates/core/src/host/module_host.rs

Lines changed: 135 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use crate::error::DBError;
1111
use crate::estimation::estimate_rows_scanned;
1212
use crate::hash::Hash;
1313
use crate::host::scheduler::{handle_queued_call_reducer_params, QueueItem};
14+
use crate::host::v8::JsInstance;
15+
use crate::host::wasmtime::ModuleInstance;
1416
use crate::host::InvalidFunctionArguments;
1517
use crate::identity::Identity;
1618
use crate::messages::control_db::{Database, HostType};
@@ -386,76 +388,6 @@ impl Instance {
386388
}
387389
}
388390

389-
/// Update the module instance's database to match the schema of the module instance.
390-
fn update_database(
391-
&mut self,
392-
program: Program,
393-
old_module_info: Arc<ModuleInfo>,
394-
policy: MigrationPolicy,
395-
) -> anyhow::Result<UpdateDatabaseResult> {
396-
match self {
397-
Instance::Wasm(inst) => inst.update_database(program, old_module_info, policy),
398-
Instance::Js(inst) => inst.update_database(program, old_module_info, policy),
399-
}
400-
}
401-
402-
fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult {
403-
match self {
404-
Instance::Wasm(inst) => inst.call_reducer(tx, params),
405-
Instance::Js(inst) => inst.call_reducer(tx, params),
406-
}
407-
}
408-
409-
fn clear_all_clients(&self) -> anyhow::Result<()> {
410-
match self {
411-
Instance::Wasm(inst) => inst.clear_all_clients(),
412-
Instance::Js(inst) => inst.clear_all_clients(),
413-
}
414-
}
415-
416-
fn call_identity_connected(
417-
&mut self,
418-
caller_auth: ConnectionAuthCtx,
419-
caller_connection_id: ConnectionId,
420-
) -> Result<(), ClientConnectedError> {
421-
match self {
422-
Instance::Wasm(inst) => inst.call_identity_connected(caller_auth, caller_connection_id),
423-
Instance::Js(inst) => inst.call_identity_connected(caller_auth, caller_connection_id),
424-
}
425-
}
426-
427-
fn call_identity_disconnected(
428-
&mut self,
429-
caller_identity: Identity,
430-
caller_connection_id: ConnectionId,
431-
) -> Result<(), ReducerCallError> {
432-
match self {
433-
Instance::Wasm(inst) => inst.call_identity_disconnected(caller_identity, caller_connection_id),
434-
Instance::Js(inst) => inst.call_identity_disconnected(caller_identity, caller_connection_id),
435-
}
436-
}
437-
438-
fn disconnect_client(&mut self, client_id: ClientActorId) -> Result<(), ReducerCallError> {
439-
match self {
440-
Instance::Wasm(inst) => inst.disconnect_client(client_id),
441-
Instance::Js(inst) => inst.disconnect_client(client_id),
442-
}
443-
}
444-
445-
fn call_scheduled_reducer(&mut self, item: QueueItem) -> Result<ReducerCallResult, ReducerCallError> {
446-
match self {
447-
Instance::Wasm(inst) => inst.call_scheduled_reducer(item),
448-
Instance::Js(inst) => inst.call_scheduled_reducer(item),
449-
}
450-
}
451-
452-
fn init_database(&mut self, program: Program) -> anyhow::Result<Option<ReducerCallResult>> {
453-
match self {
454-
Instance::Wasm(inst) => inst.init_database(program),
455-
Instance::Js(inst) => inst.init_database(program),
456-
}
457-
}
458-
459391
async fn call_procedure(&mut self, params: CallProcedureParams) -> Result<ProcedureCallResult, ProcedureCallError> {
460392
match self {
461393
Instance::Wasm(inst) => inst.call_procedure(params).await,
@@ -1050,79 +982,119 @@ impl ModuleHost {
1050982
})
1051983
}
1052984

1053-
async fn call_async_with_instance<Fun, Fut, R>(&self, label: &str, f: Fun) -> Result<R, NoSuchModule>
985+
/// Run a function for this module which has access to the module instance.
986+
async fn with_instance<'a, Guard, R, F>(
987+
&'a self,
988+
kind: &str,
989+
label: &str,
990+
timer: impl FnOnce(&str) -> Guard,
991+
work: impl FnOnce(Guard, &'a SingleCoreExecutor, Instance) -> F,
992+
) -> Result<R, NoSuchModule>
1054993
where
1055-
Fun: (FnOnce(Instance) -> Fut) + Send + 'static,
1056-
Fut: Future<Output = (R, Instance)> + Send + 'static,
1057-
R: Send + 'static,
994+
F: Future<Output = (R, Instance)>,
1058995
{
1059996
self.guard_closed()?;
1060-
let timer_guard = self.start_call_timer(label);
997+
let timer_guard = timer(label);
1061998

999+
// Operations on module instances (e.g. calling reducers) is blocking,
1000+
// partially because the computation can potentially take a long time
1001+
// and partially because interacting with the database requires taking
1002+
// a blocking lock. So, we run `f` on a dedicated thread with `self.executor`.
1003+
// This will bubble up any panic that may occur.
1004+
1005+
// If a function call panics, we **must** ensure to call `self.on_panic`
1006+
// so that the module is discarded by the host controller.
10621007
scopeguard::defer_on_unwind!({
1063-
log::warn!("procedure {label} panicked");
1008+
log::warn!("{kind} {label} panicked");
10641009
(self.on_panic)();
10651010
});
10661011

10671012
// TODO: should we be calling and/or `await`-ing `get_instance` within the below `run_job`?
10681013
// Unclear how much overhead this call can have.
1069-
let instance = self.instance_manager.lock().await.get_instance().await;
1014+
let inst = self.instance_manager.lock().await.get_instance().await;
10701015

1071-
let (res, instance) = self
1072-
.executor
1073-
.run_job(async move {
1074-
drop(timer_guard);
1075-
f(instance).await
1076-
})
1077-
.await;
1016+
let (res, inst) = work(timer_guard, &self.executor, inst).await;
10781017

1079-
self.instance_manager.lock().await.return_instance(instance);
1018+
self.instance_manager.lock().await.return_instance(inst);
10801019

10811020
Ok(res)
10821021
}
10831022

1084-
/// Run a function on the JobThread for this module which has access to the module instance.
1085-
async fn call<F, R>(&self, label: &str, f: F) -> Result<R, NoSuchModule>
1023+
async fn call_async_with_instance<Fun, Fut, R>(&self, label: &str, work: Fun) -> Result<R, NoSuchModule>
10861024
where
1087-
F: FnOnce(&mut Instance) -> R + Send + 'static,
1025+
Fun: (FnOnce(Instance) -> Fut) + Send + 'static,
1026+
Fut: Future<Output = (R, Instance)> + Send + 'static,
10881027
R: Send + 'static,
10891028
{
1090-
self.guard_closed()?;
1091-
let timer_guard = self.start_call_timer(label);
1092-
1093-
// Operations on module instances (e.g. calling reducers) is blocking,
1094-
// partially because the computation can potentially take a long time
1095-
// and partially because interacting with the database requires taking
1096-
// a blocking lock. So, we run `f` on a dedicated thread with `self.executor`.
1097-
// This will bubble up any panic that may occur.
1098-
1099-
// If a reducer call panics, we **must** ensure to call `self.on_panic`
1100-
// so that the module is discarded by the host controller.
1101-
scopeguard::defer_on_unwind!({
1102-
log::warn!("reducer {label} panicked");
1103-
(self.on_panic)();
1104-
});
1105-
1106-
let mut instance = self.instance_manager.lock().await.get_instance().await;
1107-
1108-
let (res, instance) = self
1109-
.executor
1110-
.run_sync_job(move || {
1111-
drop(timer_guard);
1112-
let res = f(&mut instance);
1113-
(res, instance)
1114-
})
1115-
.await;
1116-
1117-
self.instance_manager.lock().await.return_instance(instance);
1029+
self.with_instance(
1030+
"procedure",
1031+
label,
1032+
|l| self.start_call_timer(l),
1033+
|timer_guard, executor, inst| {
1034+
executor.run_job(async move {
1035+
drop(timer_guard);
1036+
work(inst).await
1037+
})
1038+
},
1039+
)
1040+
.await
1041+
}
11181042

1119-
Ok(res)
1043+
/// Run a function for this module which has access to the module instance.
1044+
///
1045+
/// For WASM, the function is run on the module's JobThread.
1046+
/// For V8/JS, the function is run in the current task.
1047+
async fn call<A, R, JF>(
1048+
&self,
1049+
label: &str,
1050+
arg: A,
1051+
wasm: impl FnOnce(A, &mut ModuleInstance) -> R + Send + 'static,
1052+
js: impl FnOnce(A, Box<JsInstance>) -> JF,
1053+
) -> Result<R, NoSuchModule>
1054+
where
1055+
JF: Future<Output = (R, Box<JsInstance>)>,
1056+
R: Send + 'static,
1057+
A: Send + 'static,
1058+
{
1059+
self.with_instance(
1060+
"reducer",
1061+
label,
1062+
|l| self.start_call_timer(l),
1063+
// Operations on module instances (e.g. calling reducers) is blocking,
1064+
// partially because the computation can potentially take a long time
1065+
// and partially because interacting with the database requires taking a blocking lock.
1066+
// So, we run `work` on a dedicated thread with `self.executor`.
1067+
// This will bubble up any panic that may occur.
1068+
|timer_guard, executor, inst| async move {
1069+
match inst {
1070+
Instance::Wasm(mut inst) => {
1071+
executor
1072+
.run_sync_job(move || {
1073+
drop(timer_guard);
1074+
(wasm(arg, &mut inst), Instance::Wasm(inst))
1075+
})
1076+
.await
1077+
}
1078+
Instance::Js(inst) => {
1079+
drop(timer_guard);
1080+
let (res, inst) = js(arg, inst).await;
1081+
(res, Instance::Js(inst))
1082+
}
1083+
}
1084+
},
1085+
)
1086+
.await
11201087
}
11211088

11221089
pub async fn disconnect_client(&self, client_id: ClientActorId) {
11231090
log::trace!("disconnecting client {client_id}");
11241091
if let Err(e) = self
1125-
.call("disconnect_client", move |inst| inst.disconnect_client(client_id))
1092+
.call(
1093+
"disconnect_client",
1094+
client_id,
1095+
|client_id, inst| inst.disconnect_client(client_id),
1096+
|client_id, inst| inst.disconnect_client(client_id),
1097+
)
11261098
.await
11271099
{
11281100
log::error!("Error from client_disconnected transaction: {e}");
@@ -1165,9 +1137,12 @@ impl ModuleHost {
11651137
caller_auth: ConnectionAuthCtx,
11661138
caller_connection_id: ConnectionId,
11671139
) -> Result<(), ClientConnectedError> {
1168-
self.call("call_identity_connected", move |inst| {
1169-
inst.call_identity_connected(caller_auth, caller_connection_id)
1170-
})
1140+
self.call(
1141+
"call_identity_connected",
1142+
(caller_auth, caller_connection_id),
1143+
|(a, b), inst| inst.call_identity_connected(a, b),
1144+
|(a, b), inst| inst.call_identity_connected(a, b),
1145+
)
11711146
.await
11721147
.map_err(ReducerCallError::from)?
11731148
}
@@ -1305,16 +1280,24 @@ impl ModuleHost {
13051280
caller_identity: Identity,
13061281
caller_connection_id: ConnectionId,
13071282
) -> Result<(), ReducerCallError> {
1308-
self.call("call_identity_disconnected", move |inst| {
1309-
inst.call_identity_disconnected(caller_identity, caller_connection_id)
1310-
})
1283+
self.call(
1284+
"call_identity_disconnected",
1285+
(caller_identity, caller_connection_id),
1286+
|(a, b), inst| inst.call_identity_disconnected(a, b),
1287+
|(a, b), inst| inst.call_identity_disconnected(a, b),
1288+
)
13111289
.await?
13121290
}
13131291

13141292
/// Empty the system tables tracking clients without running any lifecycle reducers.
13151293
pub async fn clear_all_clients(&self) -> anyhow::Result<()> {
1316-
self.call("clear_all_clients", move |inst| inst.clear_all_clients())
1317-
.await?
1294+
self.call(
1295+
"clear_all_clients",
1296+
(),
1297+
|_, inst| inst.clear_all_clients(),
1298+
|_, inst| inst.clear_all_clients(),
1299+
)
1300+
.await?
13181301
}
13191302

13201303
fn call_reducer_params(
@@ -1369,9 +1352,12 @@ impl ModuleHost {
13691352
};
13701353

13711354
Ok(self
1372-
.call(&reducer_def.name, move |inst| {
1373-
inst.call_reducer(None, call_reducer_params)
1374-
})
1355+
.call(
1356+
&reducer_def.name,
1357+
call_reducer_params,
1358+
|p, inst| inst.call_reducer(None, p),
1359+
|p, inst| inst.call_reducer(None, p),
1360+
)
13751361
.await?)
13761362
}
13771363

@@ -1492,9 +1478,12 @@ impl ModuleHost {
14921478
// because their reducer arguments are stored in the database and need to be fetched
14931479
// within the same transaction as the reducer call.
14941480
pub(crate) async fn call_scheduled_reducer(&self, item: QueueItem) -> Result<ReducerCallResult, ReducerCallError> {
1495-
self.call(SCHEDULED_REDUCER, move |inst: &mut Instance| {
1496-
inst.call_scheduled_reducer(item)
1497-
})
1481+
self.call(
1482+
SCHEDULED_REDUCER,
1483+
item,
1484+
|item, inst| inst.call_scheduled_reducer(item),
1485+
|item, inst| inst.call_scheduled_reducer(item),
1486+
)
14981487
.await?
14991488
}
15001489

@@ -1503,9 +1492,14 @@ impl ModuleHost {
15031492
}
15041493

15051494
pub async fn init_database(&self, program: Program) -> Result<Option<ReducerCallResult>, InitDatabaseError> {
1506-
self.call("<init_database>", move |inst| inst.init_database(program))
1507-
.await?
1508-
.map_err(InitDatabaseError::Other)
1495+
self.call(
1496+
"<init_database>",
1497+
program,
1498+
|p, inst| inst.init_database(p),
1499+
|p, inst| inst.init_database(p),
1500+
)
1501+
.await?
1502+
.map_err(InitDatabaseError::Other)
15091503
}
15101504

15111505
pub async fn update_database(
@@ -1514,9 +1508,12 @@ impl ModuleHost {
15141508
old_module_info: Arc<ModuleInfo>,
15151509
policy: MigrationPolicy,
15161510
) -> Result<UpdateDatabaseResult, anyhow::Error> {
1517-
self.call("<update_database>", move |inst| {
1518-
inst.update_database(program, old_module_info, policy)
1519-
})
1511+
self.call(
1512+
"<update_database>",
1513+
(program, old_module_info, policy),
1514+
|(a, b, c), inst| inst.update_database(a, b, c),
1515+
|(a, b, c), inst| inst.update_database(a, b, c),
1516+
)
15201517
.await?
15211518
}
15221519

0 commit comments

Comments
 (0)