Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions crates/core/src/host/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use core::time::Duration;
use futures::{FutureExt, StreamExt};
use rustc_hash::FxHashMap;
use spacetimedb_client_api_messages::energy::EnergyQuanta;
use spacetimedb_datastore::execution_context::Workload;
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload};
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
use spacetimedb_datastore::system_tables::{StFields, StScheduledFields, ST_SCHEDULED_ID};
use spacetimedb_datastore::traits::IsolationLevel;
Expand Down Expand Up @@ -394,7 +394,7 @@ pub(super) async fn call_scheduled_function(
})
};

let tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);

// Determine the call params.
// This also lets us know whether to call a reducer or procedure.
Expand All @@ -417,11 +417,31 @@ pub(super) async fn call_scheduled_function(
// as for scheduled procedures, it's incorrect to retry them if execution aborts midway,
// so we must remove the schedule row before executing.
match params {
CallParams::Reducer(params) => {
CallParams::Reducer(ref reducer_params) => {
// Patch the transaction context with the proper ReducerContext so that
// the reducer's timestamp and args are recorded to the commitlog.
// This is necessary for temporal queries to extract timestamps.
let reducer_def = module_info.module_def.reducer_by_id(reducer_params.reducer_id);
let reducer_name = &*reducer_def.name;
tx.ctx = ExecutionContext::with_workload(
tx.ctx.database_identity(),
Workload::Reducer(ReducerContext {
name: reducer_name.into(),
caller_identity: reducer_params.caller_identity,
caller_connection_id: reducer_params.caller_connection_id,
timestamp: reducer_params.timestamp,
arg_bsatn: reducer_params.args.get_bsatn().clone(),
}),
);

// We don't want a panic in the module host to affect the scheduler, as unlikely
// as it might be, so catch it so we can handle it "gracefully". Panics will
// print their message and backtrace when they occur, so we don't need to do
// anything with the error payload.
let params = match params {
CallParams::Reducer(p) => p,
_ => unreachable!(),
};
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
inst_common.call_reducer_with_tx(Some(tx), params, inst)
}));
Expand Down