Skip to content

Commit 1abf255

Browse files
committed
invoke view
1 parent 3849a49 commit 1abf255

File tree

24 files changed

+999
-195
lines changed

24 files changed

+999
-195
lines changed

crates/client-api-messages/src/energy.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,21 +121,21 @@ impl fmt::Debug for EnergyBalance {
121121
}
122122
}
123123

124-
/// A measure of energy representing the energy budget for a reducer.
124+
/// A measure of energy representing the energy budget for a reducer or any callable function.
125125
///
126126
/// In contrast to [`EnergyQuanta`], this is represented by a 64-bit integer. This makes energy handling
127127
/// for reducers easier, while still providing a unlikely-to-ever-be-reached maximum value (e.g. for wasmtime:
128128
/// `(u64::MAX eV / 1000 eV/instruction) * 3 ns/instruction = 640 days`)
129129
#[derive(Copy, Clone, From, Add, Sub)]
130-
pub struct ReducerBudget(u64);
130+
pub struct FunctionBudget(u64);
131131

132-
impl ReducerBudget {
132+
impl FunctionBudget {
133133
// 1 second of wasm runtime is roughly 2 TeV, so this is
134134
// roughly 1 minute of wasm runtime
135-
pub const DEFAULT_BUDGET: Self = ReducerBudget(120_000_000_000_000);
135+
pub const DEFAULT_BUDGET: Self = FunctionBudget(120_000_000_000_000);
136136

137-
pub const ZERO: Self = ReducerBudget(0);
138-
pub const MAX: Self = ReducerBudget(u64::MAX);
137+
pub const ZERO: Self = FunctionBudget(0);
138+
pub const MAX: Self = FunctionBudget(u64::MAX);
139139

140140
pub fn new(v: u64) -> Self {
141141
Self(v)
@@ -151,13 +151,13 @@ impl ReducerBudget {
151151
}
152152
}
153153

154-
impl From<ReducerBudget> for EnergyQuanta {
155-
fn from(value: ReducerBudget) -> Self {
154+
impl From<FunctionBudget> for EnergyQuanta {
155+
fn from(value: FunctionBudget) -> Self {
156156
EnergyQuanta::new(value.0.into())
157157
}
158158
}
159159

160-
impl fmt::Debug for ReducerBudget {
160+
impl fmt::Debug for FunctionBudget {
161161
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162162
f.debug_tuple("ReducerBudget")
163163
.field(&EnergyQuanta::from(*self))

crates/core/src/db/relational_db.rs

Lines changed: 120 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
use crate::db::MetricsRecorderQueue;
2-
use crate::error::{DBError, DatabaseError, RestoreSnapshotError};
2+
use crate::error::{DBError, DatabaseError, RestoreSnapshotError, ViewError};
3+
use crate::host::ArgsTuple;
34
use crate::messages::control_db::HostType;
45
use crate::subscription::ExecutionCounters;
56
use crate::util::{asyncify, spawn_rayon};
67
use crate::worker_metrics::WORKER_METRICS;
78
use anyhow::{anyhow, Context};
9+
use bytes::Bytes;
810
use enum_map::EnumMap;
911
use fs2::FileExt;
12+
use log::trace;
1013
use spacetimedb_commitlog as commitlog;
1114
use spacetimedb_commitlog::repo::OnNewSegmentFn;
1215
use spacetimedb_data_structures::map::IntSet;
@@ -20,6 +23,7 @@ use spacetimedb_datastore::locking_tx_datastore::state_view::{
2023
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
2124
use spacetimedb_datastore::system_tables::{system_tables, StModuleRow, StViewRow, ST_VIEW_ID};
2225
use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID};
26+
use spacetimedb_datastore::system_tables::{StViewArgRow, ST_VIEW_ARG_ID};
2327
use spacetimedb_datastore::traits::{
2428
InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
2529
UpdateFlags,
@@ -32,16 +36,18 @@ use spacetimedb_datastore::{
3236
traits::TxData,
3337
};
3438
use spacetimedb_durability as durability;
39+
use spacetimedb_lib::bsatn::ToBsatn;
3540
use spacetimedb_lib::db::auth::StAccess;
3641
use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, RawSql};
42+
use spacetimedb_lib::de::DeserializeSeed as _;
3743
use spacetimedb_lib::st_var::StVarValue;
38-
use spacetimedb_lib::ConnectionId;
3944
use spacetimedb_lib::Identity;
45+
use spacetimedb_lib::{bsatn, ConnectionId};
4046
use spacetimedb_paths::server::{CommitLogDir, ReplicaDir, SnapshotsPath};
4147
use spacetimedb_primitives::*;
4248
use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type;
4349
use spacetimedb_sats::memory_usage::MemoryUsage;
44-
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue};
50+
use spacetimedb_sats::{product, AlgebraicType, AlgebraicValue, ProductType, ProductValue, Typespace};
4551
use spacetimedb_schema::def::{ModuleDef, TableDef, ViewDef};
4652
use spacetimedb_schema::schema::{
4753
ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema,
@@ -1512,6 +1518,117 @@ impl RelationalDB {
15121518
.into()
15131519
})
15141520
}
1521+
1522+
/// Get or insert view argument into `ST_VIEW_ARG_ID`.
1523+
pub fn get_or_insert_st_view_arg(&self, tx: &mut MutTxId, args: &Bytes) -> Result<u64, DBError> {
1524+
let bytes_av = AlgebraicValue::Bytes(args.to_vec().into());
1525+
let mut rows = self.iter_by_col_eq_mut(tx, ST_VIEW_ARG_ID, col_list![1], &bytes_av)?;
1526+
1527+
// Extract the first matching `arg_id`, if any.
1528+
if let Some(res) = rows.next() {
1529+
let row = StViewArgRow::try_from(res).expect("valid StViewArgRow");
1530+
return Ok(row.id);
1531+
}
1532+
1533+
let view_arg_bytes = product![0u64, bytes_av]
1534+
.to_bsatn_vec()
1535+
.map_err(|_| ViewError::SerializeArgs)?;
1536+
1537+
let (_, view_arg_row, _) = self.insert(tx, ST_VIEW_ARG_ID, &view_arg_bytes)?;
1538+
let StViewArgRow { id: arg_id, .. } = view_arg_row.try_into().expect("valid StViewArgRow");
1539+
1540+
Ok(arg_id)
1541+
}
1542+
1543+
/// Evaluate and update View.
1544+
/// This involves:
1545+
/// 1. Serializing the view arguments into `ST_VIEW_ARG_ID`
1546+
/// 2. Deleting all rows in the view table matching the view arguments
1547+
/// 3. Deserializing the return value from the view execution
1548+
/// 4. Inserting all rows from the return value into the view table, with the arg_id
1549+
/// set to the inserted view argument's id.
1550+
/// The `typespace` is needed for deserializing the return value.
1551+
pub fn evaluate_view(
1552+
&self,
1553+
tx: &mut MutTxId,
1554+
// Name of the view to update
1555+
view: &str,
1556+
// Arguments passed to the view call
1557+
args: ArgsTuple,
1558+
// Return type of the view call
1559+
return_type: AlgebraicType,
1560+
// Serialized bytes of the return value from the view call
1561+
//TODO: pass arg_id; do the insertion during starting of invoking view
1562+
bytes: Bytes,
1563+
typespace: &Typespace,
1564+
// Identity of the caller (for non-anonymous views)
1565+
caller_identity: Identity,
1566+
) -> Result<(), DBError> {
1567+
let st_view_row = tx.lookup_st_view_by_name(view)?;
1568+
1569+
let (table_id, is_anonymous) = (
1570+
st_view_row
1571+
.table_id
1572+
.ok_or_else(|| ViewError::ViewNotFound(view.to_string()))?,
1573+
st_view_row.is_anonymous,
1574+
);
1575+
1576+
// Insert the view arguments into ST_VIEW_ARG_ID
1577+
let arg_id = self.get_or_insert_st_view_arg(tx, &args.get_bsatn())?;
1578+
1579+
// Delete all existing rows in the view table matching the view arguments
1580+
let av: AlgebraicValue = args.tuple.into();
1581+
let rows_to_delete: Vec<_> = self
1582+
.iter_by_col_eq_mut(tx, table_id, col_list![0], &av)?
1583+
.map(|res| res.pointer())
1584+
.collect();
1585+
let count = self.delete(tx, table_id, rows_to_delete);
1586+
trace!("Deleted {count} rows from view table {table_id} for arg_id {arg_id}");
1587+
1588+
// Deserialize the return value
1589+
let seed = spacetimedb_sats::WithTypespace::new(typespace, &return_type);
1590+
let return_val = seed
1591+
.deserialize(bsatn::Deserializer::new(&mut &bytes[..]))
1592+
.map_err(|e| ViewError::DeserializeReturn(e.to_string()))?;
1593+
let products = Self::extract_products(return_val, &return_type)?;
1594+
1595+
// Insert all rows from the return value into the view table
1596+
for product in products {
1597+
let row = {
1598+
let mut elements = Vec::with_capacity(2 + product.elements.len());
1599+
elements.push(if is_anonymous {
1600+
AlgebraicValue::OptionNone()
1601+
} else {
1602+
AlgebraicValue::OptionSome(caller_identity.into())
1603+
});
1604+
elements.push(AlgebraicValue::U64(arg_id));
1605+
elements.extend_from_slice(&product.elements);
1606+
1607+
ProductValue {
1608+
elements: elements.into_boxed_slice(),
1609+
}
1610+
};
1611+
let row_bytes = row.to_bsatn_vec().map_err(|_| ViewError::SerializeRow)?;
1612+
self.insert(tx, table_id, &row_bytes)?;
1613+
}
1614+
1615+
Ok(())
1616+
}
1617+
1618+
fn extract_products(
1619+
return_val: AlgebraicValue,
1620+
return_type: &AlgebraicType,
1621+
) -> Result<Vec<ProductValue>, ViewError> {
1622+
if return_type.is_array() {
1623+
let arr = return_val.into_array().expect("return type is array");
1624+
Ok(arr.into_iter().map(|v| v.into_product().unwrap()).collect())
1625+
} else if return_type.is_option() {
1626+
let opt = return_val.into_option().expect("return type is option");
1627+
Ok(opt.into_iter().map(|v| v.into_product().unwrap()).collect())
1628+
} else {
1629+
Err(ViewError::InvalidReturnType(return_type.clone()))
1630+
}
1631+
}
15151632
}
15161633

15171634
#[allow(unused)]

crates/core/src/energy.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,18 @@ use spacetimedb_lib::{Hash, Identity};
55
use crate::messages::control_db::Database;
66

77
pub use spacetimedb_client_api_messages::energy::*;
8-
pub struct ReducerFingerprint<'a> {
8+
pub struct FunctionFingerprint<'a> {
99
pub module_hash: Hash,
1010
pub module_identity: Identity,
1111
pub caller_identity: Identity,
12-
pub reducer_name: &'a str,
12+
pub function_name: &'a str,
1313
}
1414

1515
pub trait EnergyMonitor: Send + Sync + 'static {
16-
fn reducer_budget(&self, fingerprint: &ReducerFingerprint<'_>) -> ReducerBudget;
16+
fn reducer_budget(&self, fingerprint: &FunctionFingerprint<'_>) -> FunctionBudget;
1717
fn record_reducer(
1818
&self,
19-
fingerprint: &ReducerFingerprint<'_>,
19+
fingerprint: &FunctionFingerprint<'_>,
2020
energy_used: EnergyQuanta,
2121
execution_duration: Duration,
2222
);
@@ -29,13 +29,13 @@ pub trait EnergyMonitor: Send + Sync + 'static {
2929
pub struct NullEnergyMonitor;
3030

3131
impl EnergyMonitor for NullEnergyMonitor {
32-
fn reducer_budget(&self, _fingerprint: &ReducerFingerprint<'_>) -> ReducerBudget {
33-
ReducerBudget::DEFAULT_BUDGET
32+
fn reducer_budget(&self, _fingerprint: &FunctionFingerprint<'_>) -> FunctionBudget {
33+
FunctionBudget::DEFAULT_BUDGET
3434
}
3535

3636
fn record_reducer(
3737
&self,
38-
_fingerprint: &ReducerFingerprint<'_>,
38+
_fingerprint: &FunctionFingerprint<'_>,
3939
_energy_used: EnergyQuanta,
4040
_execution_duration: Duration,
4141
) {

crates/core/src/error.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use hex::FromHexError;
88
use spacetimedb_commitlog::repo::TxOffset;
99
use spacetimedb_durability::DurabilityExited;
1010
use spacetimedb_expr::errors::TypingError;
11-
use spacetimedb_lib::Identity;
11+
use spacetimedb_lib::{AlgebraicType, Identity};
1212
use spacetimedb_schema::error::ValidationErrors;
1313
use spacetimedb_snapshot::SnapshotError;
1414
use spacetimedb_table::table::ReadViaBsatnError;
@@ -147,6 +147,8 @@ pub enum DBError {
147147
RestoreSnapshot(#[from] RestoreSnapshotError),
148148
#[error(transparent)]
149149
DurabilityGone(#[from] DurabilityExited),
150+
#[error("ViewError: {0}")]
151+
View(#[from] ViewError),
150152
}
151153

152154
impl DBError {
@@ -190,6 +192,28 @@ impl<'a, T: ?Sized + 'a> From<PoisonError<std::sync::MutexGuard<'a, T>>> for DBE
190192
}
191193
}
192194

195+
#[derive(Error, Debug)]
196+
pub enum ViewError {
197+
#[error("view '{0}' not found")]
198+
ViewNotFound(String),
199+
#[error("failed to deserialize view arguments from row")]
200+
DeserializeArgs,
201+
#[error("failed to deserialize view return value: {0}")]
202+
DeserializeReturn(String),
203+
#[error("failed to serialize row to BSATN")]
204+
SerializeRow,
205+
#[error("invalid return type: expected Array or Option, got {0:?}")]
206+
InvalidReturnType(AlgebraicType),
207+
#[error("return type is Array but deserialized value is not Array")]
208+
TypeMismatchArray,
209+
#[error("return type is Option but deserialized value is not Option")]
210+
TypeMismatchOption,
211+
#[error("expected ProductValue in view result")]
212+
ExpectedProduct,
213+
#[error("failed to serialize view arguments")]
214+
SerializeArgs,
215+
}
216+
193217
#[derive(Debug, Error)]
194218
pub enum LogReplayError {
195219
#[error(

crates/core/src/host/host_controller.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use spacetimedb_data_structures::error_stream::ErrorStream;
2727
use spacetimedb_data_structures::map::IntMap;
2828
use spacetimedb_datastore::db_metrics::data_size::DATA_SIZE_METRICS;
2929
use spacetimedb_datastore::db_metrics::DB_METRICS;
30+
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
3031
use spacetimedb_datastore::traits::Program;
3132
use spacetimedb_durability::{self as durability};
3233
use spacetimedb_lib::{hash_bytes, AlgebraicValue, Identity, Timestamp};
@@ -170,6 +171,29 @@ impl From<&EventStatus> for ReducerOutcome {
170171
}
171172
}
172173

174+
pub enum ViewOutcome {
175+
Success,
176+
Failed(String),
177+
BudgetExceeded,
178+
}
179+
180+
impl From<EventStatus> for ViewOutcome {
181+
fn from(status: EventStatus) -> Self {
182+
match status {
183+
EventStatus::Committed(_) => ViewOutcome::Success,
184+
EventStatus::Failed(e) => ViewOutcome::Failed(e),
185+
EventStatus::OutOfEnergy => ViewOutcome::BudgetExceeded,
186+
}
187+
}
188+
}
189+
190+
pub struct ViewCallResult {
191+
pub outcome: ViewOutcome,
192+
pub tx: MutTxId,
193+
pub energy_used: EnergyQuanta,
194+
pub execution_duration: Duration,
195+
}
196+
173197
#[derive(Clone, Debug)]
174198
pub struct ProcedureCallResult {
175199
pub return_val: AlgebraicValue,

0 commit comments

Comments
 (0)