Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ pub mod raw {
pub fn get_jwt(connection_id_ptr: *const u8, bytes_source_id: *mut BytesSource) -> u16;
}

#[cfg(feature = "unstable")]
#[link(wasm_import_module = "spacetime_10.3")]
extern "C" {
/// Suspends execution of this WASM instance until approximately `wake_at_micros_since_unix_epoch`.
Expand All @@ -661,8 +662,76 @@ pub mod raw {
/// - The calling WASM instance is holding open a transaction.
/// - The calling WASM instance is not executing a procedure.
// TODO(procedure-sleep-until): remove this
#[cfg(feature = "unstable")]
pub fn procedure_sleep_until(wake_at_micros_since_unix_epoch: i64) -> i64;

/// Starts a mutable transaction,
/// suspending execution of this WASM instance until
/// a mutable transaction lock is aquired.
///
/// Upon resuming, returns `0` on success,
/// enabling further calls that require a pending transaction,
/// or an error code otherwise.
///
/// # Traps
///
/// This function does not trap.
///
/// # Errors
///
/// Returns an error:
///
/// - `WOULD_BLOCK_TRANSACTION`, if there's already an ongoing transaction.
pub fn procedure_start_mut_transaction() -> u16;

/// Commits a mutable transaction,
/// suspending execution of this WASM instance until
/// the transaction has been committed
/// and subscription queries have been run and broadcast.
///
/// Upon resuming, returns `0` on success, or an error code otherwise.
///
/// # Traps
///
/// This function does not trap.
///
/// # Errors
///
/// Returns an error:
///
/// - `TRANSACTION_NOT_ANONYMOUS`,
/// if the transaction was not started in [`procedure_start_mut_transaction`].
/// This can happen if this syscall is erroneously called by a reducer.
/// The code `NOT_IN_TRANSACTION` does not happen,
/// as it is subsumed by `TRANSACTION_NOT_ANONYMOUS`.
/// - `TRANSACTION_IS_READ_ONLY`, if the pending transaction is read-only.
/// This currently does not happen as anonymous read transactions
/// are not exposed to modules.
pub fn procedure_commit_mut_transaction() -> u16;

/// Aborts a mutable transaction,
/// suspending execution of this WASM instance until
/// the transaction has been rolled back.
///
/// Upon resuming, returns `0` on success, or an error code otherwise.
///
/// # Traps
///
/// This function does not trap.
///
/// # Errors
///
/// Returns an error:
///
/// - `TRANSACTION_NOT_ANONYMOUS`,
/// if the transaction was not started in [`procedure_start_mut_transaction`].
/// This can happen if this syscall is erroneously called by a reducer.
/// The code `NOT_IN_TRANSACTION` does not happen,
/// as it is subsumed by `TRANSACTION_NOT_ANONYMOUS`.
/// - `TRANSACTION_IS_READ_ONLY`, if the pending transaction is read-only.
/// This currently does not happen as anonymous read transactions
/// are not exposed to modules.
pub fn procedure_abort_mut_transaction() -> u16;

}

/// What strategy does the database index use?
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ pub enum NodesError {
BadColumn,
#[error("can't perform operation; not inside transaction")]
NotInTransaction,
#[error("can't perform operation; a transaction already exists")]
WouldBlockTransaction,
#[error("table with name {0:?} already exists")]
AlreadyExists(String),
#[error("table with name `{0}` start with 'st_' and that is reserved for internal system tables.")]
Expand Down
69 changes: 53 additions & 16 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ use crate::db::relational_db::{MutTx, RelationalDB};
use crate::error::{DBError, DatastoreError, IndexError, NodesError};
use crate::host::wasm_common::TimingSpan;
use crate::replica_context::ReplicaContext;
use crate::util::asyncify;
use chrono::{DateTime, Utc};
use core::mem;
use parking_lot::{Mutex, MutexGuard};
use smallvec::SmallVec;
use spacetimedb_datastore::execution_context::Workload;
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCall};
use spacetimedb_datastore::traits::IsolationLevel;
use spacetimedb_lib::{ConnectionId, Identity, Timestamp};
use spacetimedb_primitives::{ColId, ColList, IndexId, TableId};
use spacetimedb_sats::{
Expand Down Expand Up @@ -198,6 +201,14 @@ impl InstanceEnv {
self.tx.get()
}

pub(crate) fn take_tx(&self) -> Result<MutTxId, GetTxError> {
self.tx.take()
}

pub(crate) fn relational_db(&self) -> &Arc<RelationalDB> {
&self.replica_ctx.relational_db
}

pub(crate) fn get_jwt_payload(&self, connection_id: ConnectionId) -> Result<Option<String>, NodesError> {
let tx = &mut *self.get_tx()?;
Ok(tx.get_jwt_payload(connection_id).map_err(DBError::from)?)
Expand Down Expand Up @@ -268,7 +279,7 @@ impl InstanceEnv {
}

pub fn insert(&self, table_id: TableId, buffer: &mut [u8]) -> Result<usize, NodesError> {
let stdb = &*self.replica_ctx.relational_db;
let stdb = self.relational_db();
let tx = &mut *self.get_tx()?;

let (row_len, row_ptr, insert_flags) = stdb
Expand Down Expand Up @@ -337,7 +348,7 @@ impl InstanceEnv {
}

pub fn update(&self, table_id: TableId, index_id: IndexId, buffer: &mut [u8]) -> Result<usize, NodesError> {
let stdb = &*self.replica_ctx.relational_db;
let stdb = self.relational_db();
let tx = &mut *self.get_tx()?;

let (row_len, row_ptr, update_flags) = stdb
Expand Down Expand Up @@ -380,8 +391,8 @@ impl InstanceEnv {
rstart: &[u8],
rend: &[u8],
) -> Result<u32, NodesError> {
let stdb = &*self.replica_ctx.relational_db;
let tx = &mut *self.tx.get()?;
let stdb = self.relational_db();
let tx = &mut *self.get_tx()?;

// Find all rows in the table to delete.
let (table_id, _, _, iter) = stdb.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
Expand Down Expand Up @@ -410,7 +421,7 @@ impl InstanceEnv {
/// - a row couldn't be decoded to the table schema type.
#[tracing::instrument(level = "trace", skip(self, relation))]
pub fn datastore_delete_all_by_eq_bsatn(&self, table_id: TableId, relation: &[u8]) -> Result<u32, NodesError> {
let stdb = &*self.replica_ctx.relational_db;
let stdb = self.relational_db();
let tx = &mut *self.get_tx()?;

// Track the number of bytes coming from the caller
Expand All @@ -437,7 +448,7 @@ impl InstanceEnv {
/// and `TableNotFound` if the table does not exist.
#[tracing::instrument(level = "trace", skip_all)]
pub fn table_id_from_name(&self, table_name: &str) -> Result<TableId, NodesError> {
let stdb = &*self.replica_ctx.relational_db;
let stdb = self.relational_db();
let tx = &mut *self.get_tx()?;

// Query the table id from the name.
Expand All @@ -451,7 +462,7 @@ impl InstanceEnv {
/// and `IndexNotFound` if the index does not exist.
#[tracing::instrument(level = "trace", skip_all)]
pub fn index_id_from_name(&self, index_name: &str) -> Result<IndexId, NodesError> {
let stdb = &*self.replica_ctx.relational_db;
let stdb = self.relational_db();
let tx = &mut *self.get_tx()?;

// Query the index id from the name.
Expand All @@ -465,7 +476,7 @@ impl InstanceEnv {
/// and `TableNotFound` if the table does not exist.
#[tracing::instrument(level = "trace", skip_all)]
pub fn datastore_table_row_count(&self, table_id: TableId) -> Result<u64, NodesError> {
let stdb = &*self.replica_ctx.relational_db;
let stdb = self.relational_db();
let tx = &mut *self.get_tx()?;

// Query the row count for id.
Expand All @@ -482,8 +493,8 @@ impl InstanceEnv {
pool: &mut ChunkPool,
table_id: TableId,
) -> Result<Vec<Vec<u8>>, NodesError> {
let stdb = &*self.replica_ctx.relational_db;
let tx = &mut *self.tx.get()?;
let stdb = self.relational_db();
let tx = &mut *self.get_tx()?;

// Track the number of rows and the number of bytes scanned by the iterator
let mut rows_scanned = 0;
Expand Down Expand Up @@ -515,8 +526,8 @@ impl InstanceEnv {
rstart: &[u8],
rend: &[u8],
) -> Result<Vec<Vec<u8>>, NodesError> {
let stdb = &*self.replica_ctx.relational_db;
let tx = &mut *self.tx.get()?;
let stdb = self.relational_db();
let tx = &mut *self.get_tx()?;

// Track rows and bytes scanned by the iterator
let mut rows_scanned = 0;
Expand Down Expand Up @@ -563,26 +574,52 @@ impl InstanceEnv {

written
}

pub async fn start_mutable_tx(&mut self) -> Result<(), NodesError> {
if self.get_tx().is_ok() {
return Err(NodesError::WouldBlockTransaction);
}

let stdb = self.replica_ctx.relational_db.clone();
// TODO(procedure-tx): should we add a new workload, e.g., `AnonTx`?
let tx = asyncify(move || stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal)).await;
self.tx.set_raw(tx);

Ok(())
}
}

impl TxSlot {
pub fn set<T>(&mut self, tx: MutTxId, f: impl FnOnce() -> T) -> (MutTxId, T) {
/// Sets the slot to `tx`, ensuring that there was no tx before.
pub fn set_raw(&mut self, tx: MutTxId) {
let prev = self.inner.lock().replace(tx);
assert!(prev.is_none(), "reentrant TxSlot::set");
let remove_tx = || self.inner.lock().take();
}

/// Sets the slot to `tx` runs `work`, and returns back `tx`.
pub fn set<T>(&mut self, tx: MutTxId, work: impl FnOnce() -> T) -> (MutTxId, T) {
self.set_raw(tx);

let remove_tx = || self.take().expect("tx was removed during transaction");

let res = {
scopeguard::defer_on_unwind! { remove_tx(); }
f()
work()
};

let tx = remove_tx().expect("tx was removed during transaction");
let tx = remove_tx();
(tx, res)
}

/// Returns the tx in the slot.
pub fn get(&self) -> Result<impl DerefMut<Target = MutTxId> + '_, GetTxError> {
MutexGuard::try_map(self.inner.lock(), |map| map.as_mut()).map_err(|_| GetTxError)
}

/// Steals th tx from the slot.
pub fn take(&self) -> Result<MutTxId, GetTxError> {
self.inner.lock().take().ok_or(GetTxError)
}
}

#[derive(Debug)]
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,7 @@ pub enum AbiCall {
VolatileNonatomicScheduleImmediate,

ProcedureSleepUntil,
ProcedureStartMutTransaction,
ProcedureCommitMutTransaction,
ProcedureAbortMutTransaction,
}
4 changes: 4 additions & 0 deletions crates/core/src/host/wasm_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ pub(super) type TimingSpanSet = ResourceSlab<TimingSpanIdx>;
pub fn err_to_errno(err: &NodesError) -> Option<NonZeroU16> {
match err {
NodesError::NotInTransaction => Some(errno::NOT_IN_TRANSACTION),
NodesError::WouldBlockTransaction => Some(errno::WOULD_BLOCK_TRANSACTION),
NodesError::DecodeRow(_) => Some(errno::BSATN_DECODE_ERROR),
NodesError::TableNotFound => Some(errno::NO_SUCH_TABLE),
NodesError::IndexNotFound => Some(errno::NO_SUCH_INDEX),
Expand Down Expand Up @@ -422,6 +423,9 @@ macro_rules! abi_funcs {

$link_async! {
"spacetime_10.3"::procedure_sleep_until,
"spacetime_10.3"::procedure_start_mut_transaction,
"spacetime_10.3"::procedure_commit_mut_transaction,
"spacetime_10.3"::procedure_abort_mut_transaction,
}
};
}
Expand Down
52 changes: 14 additions & 38 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,13 @@
use bytes::Bytes;
use prometheus::{Histogram, IntCounter, IntGauge};
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
use spacetimedb_lib::de::DeserializeSeed as _;
use spacetimedb_primitives::ProcedureId;
use spacetimedb_primitives::ViewDatabaseId;
use spacetimedb_primitives::ViewId;
use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tracing::span::EnteredSpan;

use super::instrumentation::CallTimes;
use crate::client::ClientConnectionSender;
use super::*;
use crate::database_logger;
use crate::energy::{EnergyMonitor, FunctionBudget, FunctionFingerprint};
use crate::host::host_controller::ViewOutcome;
use crate::host::instance_env::InstanceEnv;
use crate::host::module_common::{build_common_module_from_raw, ModuleCommon};
use crate::host::module_host::ViewCallResult;
use crate::host::module_host::{
CallProcedureParams, CallReducerParams, CallViewParams, DatabaseUpdate, EventStatus, ModuleEvent,
ModuleFunctionCall, ModuleInfo,
ModuleFunctionCall, ModuleInfo, ViewCallResult,
};
use crate::host::{
ArgsTuple, ProcedureCallError, ProcedureCallResult, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler,
Expand All @@ -31,18 +17,26 @@ use crate::identity::Identity;
use crate::messages::control_db::HostType;
use crate::module_host_context::ModuleCreationContextLimited;
use crate::replica_context::ReplicaContext;
use crate::subscription::module_subscription_actor::WriteConflict;
use crate::subscription::module_subscription_actor::commit_and_broadcast_event;
use crate::util::prometheus_handle::{HistogramExt, TimerGuard};
use crate::worker_metrics::WORKER_METRICS;
use bytes::Bytes;
use core::future::Future;
use core::time::Duration;
use prometheus::{Histogram, IntCounter, IntGauge};
use spacetimedb_datastore::db_metrics::DB_METRICS;
use spacetimedb_datastore::execution_context::{self, ReducerContext, Workload};
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
use spacetimedb_datastore::traits::{IsolationLevel, Program};
use spacetimedb_lib::buffer::DecodeError;
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
use spacetimedb_lib::de::DeserializeSeed;
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_lib::{bsatn, ConnectionId, RawModuleDef, Timestamp};

use super::*;
use spacetimedb_primitives::{ProcedureId, ViewDatabaseId, ViewId};
use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError};
use std::sync::Arc;
use tracing::span::EnteredSpan;

pub trait WasmModule: Send + 'static {
type Instance: WasmInstance;
Expand Down Expand Up @@ -692,7 +686,7 @@ impl InstanceCommon {
request_id,
timer,
};
let event = commit_and_broadcast_event(&self.info, client, event, tx);
let event = commit_and_broadcast_event(&info.subscriptions, client, event, tx).event;

let res = ReducerCallResult {
outcome: ReducerOutcome::from(&event.status),
Expand Down Expand Up @@ -971,24 +965,6 @@ fn lifecyle_modifications_to_tx(
}
*/

/// Commits the transaction
/// and evaluates and broadcasts subscriptions updates.
fn commit_and_broadcast_event(
info: &ModuleInfo,
client: Option<Arc<ClientConnectionSender>>,
event: ModuleEvent,
tx: MutTxId,
) -> Arc<ModuleEvent> {
match info
.subscriptions
.commit_and_broadcast_event(client, event, tx)
.unwrap()
{
Ok(res) => res.event,
Err(WriteConflict) => todo!("Write skew, you need to implement retries my man, T-dawg."),
}
}

/// Describes a view call in a cheaply shareable way.
#[derive(Clone, Debug)]
pub struct ViewOp<'a> {
Expand Down
Loading
Loading