Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions crates/core/src/host/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use core::time::Duration;
use futures::{FutureExt, StreamExt};
use rustc_hash::FxHashMap;
use spacetimedb_client_api_messages::energy::EnergyQuanta;
use spacetimedb_datastore::execution_context::Workload;
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload};
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
use spacetimedb_datastore::system_tables::{StFields, StScheduledFields, ST_SCHEDULED_ID};
use spacetimedb_datastore::traits::IsolationLevel;
Expand Down Expand Up @@ -394,7 +394,7 @@ pub(super) async fn call_scheduled_function(
})
};

let tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);

// Determine the call params.
// This also lets us know whether to call a reducer or procedure.
Expand All @@ -418,6 +418,24 @@ pub(super) async fn call_scheduled_function(
// so we must remove the schedule row before executing.
match params {
CallParams::Reducer(params) => {
// Patch the transaction context with ReducerContext so the commitlog
// records the reducer's name, caller, timestamp, and arguments.
//
// Background: Scheduled reducers start with Workload::Internal, but
// call_reducer_with_tx only sets ReducerContext when tx is None.
// Since we pass Some(tx), we must set it here.
let reducer_name = &*module_info.module_def.reducer_by_id(params.reducer_id).name;
tx.ctx = ExecutionContext::with_workload(
tx.ctx.database_identity(),
Workload::Reducer(ReducerContext {
name: reducer_name.into(),
caller_identity: params.caller_identity,
caller_connection_id: params.caller_connection_id,
timestamp: params.timestamp,
arg_bsatn: params.args.get_bsatn().clone(),
}),
);

// We don't want a panic in the module host to affect the scheduler, as unlikely
// as it might be, so catch it so we can handle it "gracefully". Panics will
// print their message and backtrace when they occur, so we don't need to do
Expand Down