Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions crates/core/src/db/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ impl UpdateLogger for SystemLogger {
pub enum UpdateResult {
Success,
RequiresClientDisconnect,
EvaluateSubscribedViews,
}

/// Update the database according to the migration plan.
Expand All @@ -52,7 +53,7 @@ pub fn update_database(
let old_module_def = plan.old_def();
for table in existing_tables
.iter()
.filter(|table| table.table_type != StTableType::System)
.filter(|table| table.table_type != StTableType::System && !table.is_view())
{
let old_def = old_module_def
.table(&table.table_name[..])
Expand Down Expand Up @@ -146,7 +147,11 @@ fn auto_migrate_database(
stdb.drop_view(tx, view_id)?;
}
spacetimedb_schema::auto_migrate::AutoMigrateStep::UpdateView(_) => {
unimplemented!("Recompute view and update its backing table")
// if we already have to disconnect clients, no need to set
// `EvaluateSubscribedViews` as clients will be disconnected anyway
if !matches!(res, UpdateResult::RequiresClientDisconnect) {
res = UpdateResult::EvaluateSubscribedViews;
}
}
spacetimedb_schema::auto_migrate::AutoMigrateStep::AddIndex(index_name) => {
let table_def = plan.new.stored_in_table_def(index_name).unwrap();
Expand Down
11 changes: 11 additions & 0 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,16 @@ pub struct ModuleFunctionCall {
pub args: ArgsTuple,
}

impl ModuleFunctionCall {
pub fn update() -> Self {
Self {
reducer: String::from("update"),
reducer_id: u32::MAX.into(),
args: ArgsTuple::nullary(),
}
}
}

#[derive(Debug, Clone)]
pub struct ModuleEvent {
pub timestamp: Timestamp,
Expand Down Expand Up @@ -716,6 +726,7 @@ pub enum ReducerCallError {
LifecycleReducer(Lifecycle),
}

#[derive(Debug, PartialEq, Eq)]
pub enum ViewOutcome {
Success,
Failed(String),
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/host/v8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ fn spawn_instance_worker(
policy,
} => {
// Update the database.
let res = instance_common.update_database(replica_ctx, program, old_module_info, policy);
let res = instance_common.update_database(program, old_module_info, policy, &mut inst);

// Reply to `JsInstance::update_database`.
if let Err(e) = update_response_tx.send(res) {
Expand Down
177 changes: 138 additions & 39 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use spacetimedb_primitives::ViewFnPtr;
use spacetimedb_primitives::ViewId;
use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError};
use spacetimedb_schema::def::ModuleDef;
use spacetimedb_schema::def::ViewDef;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -308,9 +309,8 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
old_module_info: Arc<ModuleInfo>,
policy: MigrationPolicy,
) -> anyhow::Result<UpdateDatabaseResult> {
let replica_ctx = self.instance.replica_ctx();
self.common
.update_database(replica_ctx, program, old_module_info, policy)
.update_database(program, old_module_info, policy, &mut self.instance)
}

pub fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult {
Expand Down Expand Up @@ -365,13 +365,14 @@ impl InstanceCommon {
}

#[tracing::instrument(level = "trace", skip_all)]
pub(crate) fn update_database(
&self,
replica_ctx: &ReplicaContext,
pub(crate) fn update_database<I: WasmInstance>(
&mut self,
program: Program,
old_module_info: Arc<ModuleInfo>,
policy: MigrationPolicy,
inst: &mut I,
) -> Result<UpdateDatabaseResult, anyhow::Error> {
let replica_ctx = inst.replica_ctx().clone();
let system_logger = replica_ctx.logger.system_logger();
let stdb = &replica_ctx.relational_db;

Expand All @@ -398,6 +399,8 @@ impl InstanceCommon {

let auth_ctx = AuthCtx::for_current(replica_ctx.database.owner_identity);
let res = crate::db::update::update_database(stdb, &mut tx, auth_ctx, plan, system_logger);
let mut energy_quanta_used = FunctionBudget::ZERO;
let mut host_execution_duration = Duration::ZERO;

match res {
Err(e) => {
Expand All @@ -408,19 +411,103 @@ impl InstanceCommon {
Ok(UpdateDatabaseResult::ErrorExecutingMigration(e))
}
Ok(res) => {
if let Some((_tx_offset, tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? {
stdb.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data));
}
system_logger.info("Database updated");
log::info!("Database updated, {}", stdb.database_identity());
match res {
crate::db::update::UpdateResult::Success => Ok(UpdateDatabaseResult::UpdatePerformed),
let res: UpdateDatabaseResult = match res {
crate::db::update::UpdateResult::Success => UpdateDatabaseResult::UpdatePerformed,
crate::db::update::UpdateResult::EvaluateSubscribedViews => {
let (out, trapped) = self.evaluate_subscribed_views(tx, inst)?;
tx = out.tx;
energy_quanta_used = out.energy_used;
host_execution_duration = out.total_duration;

if trapped || out.outcome != ViewOutcome::Success {
let msg = match trapped {
true => "Trapped while evaluating views during database update".to_string(),
false => format!(
"Views evaluation did not complete successfully during database update: {:?}",
out.outcome
),
};

UpdateDatabaseResult::ErrorExecutingMigration(anyhow::anyhow!(msg))
} else {
UpdateDatabaseResult::UpdatePerformed
}
}
crate::db::update::UpdateResult::RequiresClientDisconnect => {
Ok(UpdateDatabaseResult::UpdatePerformedWithClientDisconnect)
UpdateDatabaseResult::UpdatePerformedWithClientDisconnect
}
};

if res.was_successful() {
let event = ModuleEvent {
timestamp: Timestamp::now(),
caller_identity: self.info.owner_identity,
caller_connection_id: None,
function_call: ModuleFunctionCall::update(),
status: EventStatus::Committed(DatabaseUpdate::default()),
energy_quanta_used: energy_quanta_used.into(),
host_execution_duration,
request_id: None,
timer: None,
};
//TODO: Return back event in `UpdateDatabaseResult`?
let _ = commit_and_broadcast_event(&self.info, None, event, tx);
} else {
let (_, tx_metrics, reducer) = stdb.rollback_mut_tx(tx);
stdb.report_mut_tx_metrics(reducer, tx_metrics, None);
}
Ok(res)
}
}
}

/// Re-evaluates all views which have entries in `st_view_subs`.
fn evaluate_subscribed_views<I: WasmInstance>(
&mut self,
tx: MutTxId,
inst: &mut I,
) -> Result<(ViewCallResult, bool), anyhow::Error> {
let views = self.info.module_def.views().collect::<Vec<_>>();
let owner_identity = self.info.owner_identity;

let mut view_calls = Vec::new();

for view in views {
let ViewDef {
name: view_name,
is_anonymous,
fn_ptr,
product_type_ref,
..
} = view;

let st_view = tx
.view_from_name(view_name)?
.ok_or_else(|| anyhow::anyhow!("view {} not found in database", &view_name))?;

let view_id = st_view.view_id;
let table_id = st_view
.table_id
.ok_or_else(|| anyhow::anyhow!("view {} does not have a backing table in database", &view_name))?;

for sub in tx.lookup_st_view_subs(view_id)? {
view_calls.push(CallViewParams {
view_name: view_name.to_owned().into(),
view_id,
table_id,
fn_ptr: *fn_ptr,
caller: owner_identity,
sender: if *is_anonymous { None } else { Some(sub.identity.into()) },
args: ArgsTuple::nullary(),
row_type: *product_type_ref,
timestamp: Timestamp::now(),
});
}
}

Ok(self.execute_view_calls(tx, view_calls, inst))
}

async fn call_procedure<I: WasmInstance>(
Expand Down Expand Up @@ -873,47 +960,59 @@ impl InstanceCommon {
inst: &mut I,
timestamp: Timestamp,
) -> (ViewCallResult, bool) {
let mut trapped = false;
let view_calls = tx
.view_for_update()
.cloned()
.map(|info| {
let view_def = module_def
.view(&*info.view_name)
.unwrap_or_else(|| panic!("view `{}` not found", info.view_name));

CallViewParams {
view_name: info.view_name,
view_id: info.view_id,
table_id: info.table_id,
fn_ptr: view_def.fn_ptr,
caller,
sender: info.sender,
args: ArgsTuple::nullary(),
row_type: view_def.product_type_ref,
timestamp,
}
})
.collect::<Vec<_>>();

self.execute_view_calls(tx, view_calls, inst)
}

/// Executes view calls and accumulate results.
/// Returns early if any call traps or fails.
fn execute_view_calls<I: WasmInstance>(
&mut self,
tx: MutTxId,
view_calls: Vec<CallViewParams>,
inst: &mut I,
) -> (ViewCallResult, bool) {
let mut out = ViewCallResult::default(tx);
for ViewCallInfo {
view_id,
table_id,
view_name,
sender,
} in out.tx.view_for_update().cloned().collect::<Vec<_>>()
{
let view_def = module_def
.view(&*view_name)
.unwrap_or_else(|| panic!("view `{}` not found", view_name));
let fn_ptr = view_def.fn_ptr;
let args = ArgsTuple::nullary();
let row_type = view_def.product_type_ref;
let params = CallViewParams {
view_name,
view_id,
table_id,
fn_ptr,
caller,
sender,
args,
row_type,
timestamp,
};
let (result, ok) = self.call_view_with_tx(out.tx, params, inst);
let mut trapped = false;

for params in view_calls {
let (result, call_trapped) = self.call_view_with_tx(out.tx, params, inst);

// Increment execution stats
out.tx = result.tx;
out.outcome = result.outcome;
out.energy_used += result.energy_used;
out.total_duration += result.total_duration;
out.abi_duration += result.abi_duration;
trapped = trapped || ok;

trapped = trapped || call_trapped;

// Terminate early if execution failed
if trapped || !matches!(out.outcome, ViewOutcome::Success) {
break;
}
}

(out, trapped)
}
}
Expand Down
9 changes: 9 additions & 0 deletions crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2046,6 +2046,15 @@ impl MutTxId {
Ok(())
}

/// Get all view subscriptions for a given view.
pub fn lookup_st_view_subs(&self, view_id: ViewId) -> Result<Vec<StViewSubRow>> {
let cols = StViewSubFields::ViewId;
let value = view_id.into();
self.iter_by_col_eq(ST_VIEW_SUB_ID, cols, &value)?
.map(StViewSubRow::try_from)
.collect::<Result<Vec<_>>>()
}

/// Lookup a row in `st_view` by its primary key
fn st_view_row(&self, view_id: ViewId) -> Result<Option<StViewRow>> {
self.iter_by_col_eq(ST_VIEW_ID, col_list![StViewFields::ViewId], &view_id.into())?
Expand Down
6 changes: 6 additions & 0 deletions crates/datastore/src/system_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,12 @@ impl From<Identity> for IdentityViaU256 {
}
}

impl From<IdentityViaU256> for Identity {
fn from(id: IdentityViaU256) -> Self {
id.0
}
}

impl From<IdentityViaU256> for AlgebraicValue {
fn from(val: IdentityViaU256) -> Self {
AlgebraicValue::U256(val.0.to_u256().into())
Expand Down
Loading
Loading