Skip to content

Commit 8af2b9b

Browse files
update views on sql dml
1 parent 9171ef1 commit 8af2b9b

File tree

4 files changed

+90
-35
lines changed

4 files changed

+90
-35
lines changed

crates/core/src/host/host_controller.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -171,22 +171,6 @@ impl From<&EventStatus> for ReducerOutcome {
171171
}
172172
}
173173

174-
pub enum ViewOutcome {
175-
Success,
176-
Failed(String),
177-
BudgetExceeded,
178-
}
179-
180-
impl From<EventStatus> for ViewOutcome {
181-
fn from(status: EventStatus) -> Self {
182-
match status {
183-
EventStatus::Committed(_) => ViewOutcome::Success,
184-
EventStatus::Failed(e) => ViewOutcome::Failed(e),
185-
EventStatus::OutOfEnergy => ViewOutcome::BudgetExceeded,
186-
}
187-
}
188-
}
189-
190174
#[derive(Clone, Debug)]
191175
pub struct ProcedureCallResult {
192176
pub return_val: AlgebraicValue,

crates/core/src/host/module_host.rs

Lines changed: 70 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use crate::energy::EnergyQuanta;
1010
use crate::error::DBError;
1111
use crate::estimation::estimate_rows_scanned;
1212
use crate::hash::Hash;
13-
use crate::host::host_controller::ViewOutcome;
1413
use crate::host::{InvalidFunctionArguments, InvalidViewArguments};
1514
use crate::identity::Identity;
1615
use crate::messages::control_db::{Database, HostType};
@@ -40,7 +39,7 @@ use spacetimedb_data_structures::error_stream::ErrorStream;
4039
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
4140
use spacetimedb_datastore::error::DatastoreError;
4241
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
43-
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
42+
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo};
4443
use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID};
4544
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
4645
use spacetimedb_durability::DurableOffset;
@@ -717,6 +716,22 @@ pub enum ReducerCallError {
717716
LifecycleReducer(Lifecycle),
718717
}
719718

719+
pub enum ViewOutcome {
720+
Success,
721+
Failed(String),
722+
BudgetExceeded,
723+
}
724+
725+
impl From<EventStatus> for ViewOutcome {
726+
fn from(status: EventStatus) -> Self {
727+
match status {
728+
EventStatus::Committed(_) => ViewOutcome::Success,
729+
EventStatus::Failed(e) => ViewOutcome::Failed(e),
730+
EventStatus::OutOfEnergy => ViewOutcome::BudgetExceeded,
731+
}
732+
}
733+
}
734+
720735
pub struct ViewCallResult {
721736
pub outcome: ViewOutcome,
722737
pub tx: MutTxId,
@@ -751,6 +766,8 @@ pub enum ViewCallError {
751766
MissingClientConnection,
752767
#[error("DB error during view call: {0}")]
753768
DatastoreError(#[from] DatastoreError),
769+
#[error("The module instance encountered a fatal error: {0}")]
770+
InternalError(String),
754771
}
755772

756773
#[derive(thiserror::Error, Debug)]
@@ -1515,49 +1532,84 @@ impl ModuleHost {
15151532
&self,
15161533
mut tx: MutTxId,
15171534
view_collector: &impl CollectViews,
1518-
sender: Identity,
1535+
caller: Identity,
15191536
workload: Workload,
15201537
) -> Result<MutTxId, ViewCallError> {
15211538
use FunctionArgs::*;
15221539
let mut view_ids = HashSet::new();
15231540
view_collector.collect_views(&mut view_ids);
15241541
for view_id in view_ids {
1525-
let name = tx.lookup_st_view(view_id)?.view_name;
1526-
if !tx.is_view_materialized(view_id, ArgId::SENTINEL, sender)? {
1527-
tx = self.call_view(tx, &name, Nullary, sender).await?.tx;
1542+
let st_view_row = tx.lookup_st_view(view_id)?;
1543+
let view_name = st_view_row.view_name;
1544+
let view_id = st_view_row.view_id;
1545+
let table_id = st_view_row.table_id.ok_or(ViewCallError::TableDoesNotExist(view_id))?;
1546+
if !tx.is_view_materialized(view_id, ArgId::SENTINEL, caller)? {
1547+
tx = self
1548+
.call_view(tx, &view_name, view_id, table_id, Nullary, caller, Some(caller))
1549+
.await?
1550+
.tx;
15281551
}
15291552
// If this is a sql call, we only update this view's "last called" timestamp
15301553
if let Workload::Sql = workload {
1531-
tx.update_view_timestamp(view_id, ArgId::SENTINEL, sender)?;
1554+
tx.update_view_timestamp(view_id, ArgId::SENTINEL, caller)?;
15321555
}
15331556
// If this is a subscribe call, we also increment this view's subscriber count
15341557
if let Workload::Subscribe = workload {
1535-
tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?;
1558+
tx.subscribe_view(view_id, ArgId::SENTINEL, caller)?;
15361559
}
15371560
}
15381561
Ok(tx)
15391562
}
15401563

1564+
pub async fn call_views_with_tx(&self, tx: MutTxId, caller: Identity) -> Result<ViewCallResult, ViewCallError> {
1565+
use FunctionArgs::*;
1566+
let mut out = ViewCallResult::default(tx);
1567+
for ViewCallInfo {
1568+
view_id,
1569+
table_id,
1570+
view_name,
1571+
sender,
1572+
} in out.tx.view_for_update().cloned().collect::<Vec<_>>()
1573+
{
1574+
let result = self
1575+
.call_view(out.tx, &view_name, view_id, table_id, Nullary, caller, sender)
1576+
.await?;
1577+
1578+
// Increment execution stats
1579+
out.tx = result.tx;
1580+
out.outcome = result.outcome;
1581+
out.energy_used += result.energy_used;
1582+
out.total_duration += result.total_duration;
1583+
out.abi_duration += result.abi_duration;
1584+
1585+
// Terminate early if execution failed
1586+
if !matches!(out.outcome, ViewOutcome::Success) {
1587+
break;
1588+
}
1589+
}
1590+
Ok(out)
1591+
}
1592+
15411593
pub async fn call_view(
15421594
&self,
15431595
tx: MutTxId,
15441596
view_name: &str,
1597+
view_id: ViewId,
1598+
table_id: TableId,
15451599
args: FunctionArgs,
1546-
sender: Identity,
1600+
caller: Identity,
1601+
sender: Option<Identity>,
15471602
) -> Result<ViewCallResult, ViewCallError> {
15481603
let module_def = &self.info.module_def;
15491604
let view_def = module_def.view(view_name).ok_or(ViewCallError::NoSuchView)?;
1550-
let st_view_row = tx.lookup_st_view_by_name(view_name)?;
1551-
let view_id = st_view_row.view_id;
1552-
let table_id = st_view_row.table_id.ok_or(ViewCallError::TableDoesNotExist(view_id))?;
15531605
let fn_ptr = view_def.fn_ptr;
15541606
let row_type = view_def.product_type_ref;
15551607
let typespace = module_def.typespace().with_type(view_def);
15561608
let view_seed = ArgsSeed(typespace);
15571609
let args = args.into_tuple(view_seed).map_err(InvalidViewArguments)?;
15581610

15591611
match self
1560-
.call_view_inner(tx, view_name, view_id, table_id, fn_ptr, sender, args, row_type)
1612+
.call_view_inner(tx, view_name, view_id, table_id, fn_ptr, caller, sender, args, row_type)
15611613
.await
15621614
{
15631615
err @ Err(ViewCallError::NoSuchView) => {
@@ -1581,7 +1633,8 @@ impl ModuleHost {
15811633
view_id: ViewId,
15821634
table_id: TableId,
15831635
fn_ptr: ViewFnPtr,
1584-
sender: Identity,
1636+
caller: Identity,
1637+
sender: Option<Identity>,
15851638
args: ArgsTuple,
15861639
row_type: AlgebraicTypeRef,
15871640
) -> Result<ViewCallResult, ViewCallError> {
@@ -1591,15 +1644,15 @@ impl ModuleHost {
15911644
inst.call_view(
15921645
tx,
15931646
CallViewParams {
1647+
timestamp: Timestamp::now(),
15941648
view_name,
15951649
view_id,
15961650
table_id,
15971651
fn_ptr,
1598-
caller: sender,
1599-
sender: Some(sender),
1652+
caller,
1653+
sender,
16001654
args,
16011655
row_type,
1602-
timestamp: Timestamp::now(),
16031656
},
16041657
)
16051658
})

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ use super::instrumentation::CallTimes;
1919
use crate::client::ClientConnectionSender;
2020
use crate::database_logger;
2121
use crate::energy::{EnergyMonitor, FunctionBudget, FunctionFingerprint};
22-
use crate::host::host_controller::ViewOutcome;
2322
use crate::host::instance_env::InstanceEnv;
2423
use crate::host::instance_env::TxSlot;
2524
use crate::host::module_common::{build_common_module_from_raw, ModuleCommon};
2625
use crate::host::module_host::ViewCallResult;
26+
use crate::host::module_host::ViewOutcome;
2727
use crate::host::module_host::{
2828
CallProcedureParams, CallReducerParams, CallViewParams, DatabaseUpdate, EventStatus, ModuleEvent,
2929
ModuleFunctionCall, ModuleInfo,

crates/core/src/sql/execute.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ use crate::db::relational_db::{RelationalDB, Tx};
66
use crate::energy::EnergyQuanta;
77
use crate::error::DBError;
88
use crate::estimation::estimate_rows_scanned;
9-
use crate::host::module_host::{DatabaseTableUpdate, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall};
9+
use crate::host::module_host::{
10+
DatabaseTableUpdate, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ViewCallError, ViewCallResult,
11+
ViewOutcome,
12+
};
1013
use crate::host::{ArgsTuple, ModuleHost};
1114
use crate::subscription::module_subscription_actor::{ModuleSubscriptions, WriteConflict};
1215
use crate::subscription::module_subscription_manager::TransactionOffset;
@@ -264,6 +267,21 @@ pub async fn run(
264267
// Update transaction metrics
265268
tx.metrics.merge(metrics);
266269

270+
// Update views
271+
let result = match module {
272+
Some(module) => module.call_views_with_tx(tx, auth.caller).await?,
273+
None => ViewCallResult::default(tx),
274+
};
275+
276+
// Rollback transaction and report metrics if view execution failed
277+
if let ViewOutcome::Failed(err) = result.outcome {
278+
let (_, metrics, reducer) = db.rollback_mut_tx(result.tx);
279+
db.report_mut_tx_metrics(reducer, metrics, None);
280+
return Err(DBError::View(ViewCallError::InternalError(err)));
281+
}
282+
283+
let tx = result.tx;
284+
267285
// Commit the tx if there are no deltas to process
268286
if subs.is_none() {
269287
let metrics = tx.metrics;

0 commit comments

Comments
 (0)