Skip to content

Commit f109e8b

Browse files
committed
Merge remote-tracking branch 'origin/shub/view-from-host' into noa/ts-views
2 parents 5c42b09 + 9e02e7e commit f109e8b

File tree

31 files changed

+1534
-341
lines changed

31 files changed

+1534
-341
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/client-api-messages/src/energy.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,21 +121,21 @@ impl fmt::Debug for EnergyBalance {
121121
}
122122
}
123123

124-
/// A measure of energy representing the energy budget for a reducer.
124+
/// A measure of energy representing the energy budget for a reducer or any callable function.
125125
///
126126
/// In contrast to [`EnergyQuanta`], this is represented by a 64-bit integer. This makes energy handling
127127
/// for reducers easier, while still providing a unlikely-to-ever-be-reached maximum value (e.g. for wasmtime:
128128
/// `(u64::MAX eV / 1000 eV/instruction) * 3 ns/instruction = 640 days`)
129129
#[derive(Copy, Clone, From, Add, Sub)]
130-
pub struct ReducerBudget(u64);
130+
pub struct FunctionBudget(u64);
131131

132-
impl ReducerBudget {
132+
impl FunctionBudget {
133133
// 1 second of wasm runtime is roughly 2 TeV, so this is
134134
// roughly 1 minute of wasm runtime
135-
pub const DEFAULT_BUDGET: Self = ReducerBudget(120_000_000_000_000);
135+
pub const DEFAULT_BUDGET: Self = FunctionBudget(120_000_000_000_000);
136136

137-
pub const ZERO: Self = ReducerBudget(0);
138-
pub const MAX: Self = ReducerBudget(u64::MAX);
137+
pub const ZERO: Self = FunctionBudget(0);
138+
pub const MAX: Self = FunctionBudget(u64::MAX);
139139

140140
pub fn new(v: u64) -> Self {
141141
Self(v)
@@ -151,13 +151,13 @@ impl ReducerBudget {
151151
}
152152
}
153153

154-
impl From<ReducerBudget> for EnergyQuanta {
155-
fn from(value: ReducerBudget) -> Self {
154+
impl From<FunctionBudget> for EnergyQuanta {
155+
fn from(value: FunctionBudget) -> Self {
156156
EnergyQuanta::new(value.0.into())
157157
}
158158
}
159159

160-
impl fmt::Debug for ReducerBudget {
160+
impl fmt::Debug for FunctionBudget {
161161
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162162
f.debug_tuple("ReducerBudget")
163163
.field(&EnergyQuanta::from(*self))

crates/client-api/src/lib.rs

Lines changed: 46 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -79,57 +79,52 @@ impl Host {
7979

8080
let (tx_offset, durable_offset, json) = self
8181
.host_controller
82-
.using_database(
83-
database,
84-
self.replica_id,
85-
move |db| -> axum::response::Result<_, (StatusCode, String)> {
86-
tracing::info!(sql = body);
87-
88-
// We need a header for query results
89-
let mut header = vec![];
90-
91-
let sql_start = std::time::Instant::now();
92-
let sql_span =
93-
tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,).entered();
94-
95-
let result = sql::execute::run(
96-
// Returns an empty result set for mutations
97-
db,
98-
&body,
99-
auth,
100-
Some(&module_host.info().subscriptions),
101-
&mut header,
102-
)
103-
.map_err(|e| {
104-
log::warn!("{e}");
105-
if let Some(auth_err) = e.get_auth_error() {
106-
(StatusCode::UNAUTHORIZED, auth_err.to_string())
107-
} else {
108-
(StatusCode::BAD_REQUEST, e.to_string())
109-
}
110-
})?;
111-
112-
let total_duration = sql_start.elapsed();
113-
sql_span.record("total_duration", tracing::field::debug(total_duration));
114-
115-
// Turn the header into a `ProductType`
116-
let schema = header
117-
.into_iter()
118-
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
119-
.collect();
120-
121-
Ok((
122-
result.tx_offset,
123-
db.durable_tx_offset(),
124-
vec![SqlStmtResult {
125-
schema,
126-
rows: result.rows,
127-
total_duration_micros: total_duration.as_micros() as u64,
128-
stats: SqlStmtStats::from_metrics(&result.metrics),
129-
}],
130-
))
131-
},
132-
)
82+
.using_database(database, self.replica_id, move |db| async move {
83+
tracing::info!(sql = body);
84+
let mut header = vec![];
85+
let sql_start = std::time::Instant::now();
86+
let sql_span = tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,);
87+
let _guard = sql_span.enter();
88+
89+
let result = sql::execute::run(
90+
&db,
91+
&body,
92+
auth,
93+
Some(&module_host.info.subscriptions),
94+
Some(&module_host),
95+
auth.caller,
96+
&mut header,
97+
)
98+
.await
99+
.map_err(|e| {
100+
log::warn!("{e}");
101+
if let Some(auth_err) = e.get_auth_error() {
102+
(StatusCode::UNAUTHORIZED, auth_err.to_string())
103+
} else {
104+
(StatusCode::BAD_REQUEST, e.to_string())
105+
}
106+
})?;
107+
108+
let total_duration = sql_start.elapsed();
109+
drop(_guard);
110+
sql_span.record("total_duration", tracing::field::debug(total_duration));
111+
112+
let schema = header
113+
.into_iter()
114+
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
115+
.collect();
116+
117+
Ok::<_, (StatusCode, String)>((
118+
result.tx_offset,
119+
db.durable_tx_offset(),
120+
vec![SqlStmtResult {
121+
schema,
122+
rows: result.rows,
123+
total_duration_micros: total_duration.as_micros() as u64,
124+
stats: SqlStmtStats::from_metrics(&result.metrics),
125+
}],
126+
))
127+
})
133128
.await
134129
.map_err(log_and_500)??;
135130

@@ -155,7 +150,6 @@ impl Host {
155150
.await
156151
}
157152
}
158-
159153
/// Parameters for publishing a database.
160154
///
161155
/// See [`ControlStateDelegate::publish_database`].

crates/client-api/src/routes/database.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
126126
};
127127

128128
module
129-
.call_identity_disconnected(caller_identity, connection_id)
129+
// We don't clear views after reducer calls
130+
.call_identity_disconnected(caller_identity, connection_id, false)
130131
.await
131132
.map_err(client_disconnected_error_to_response)?;
132133

@@ -299,7 +300,8 @@ async fn procedure<S: ControlStateDelegate + NodeDelegate>(
299300
};
300301

301302
module
302-
.call_identity_disconnected(caller_identity, connection_id)
303+
// We don't clear views after procedure calls
304+
.call_identity_disconnected(caller_identity, connection_id, false)
303305
.await
304306
.map_err(client_disconnected_error_to_response)?;
305307

crates/core/src/db/relational_db.rs

Lines changed: 123 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,28 @@
11
use crate::db::MetricsRecorderQueue;
22
use crate::error::{DBError, DatabaseError, RestoreSnapshotError};
3+
use crate::host::ArgsTuple;
34
use crate::messages::control_db::HostType;
45
use crate::subscription::ExecutionCounters;
56
use crate::util::{asyncify, spawn_rayon};
67
use crate::worker_metrics::WORKER_METRICS;
78
use anyhow::{anyhow, Context};
9+
use bytes::Bytes;
810
use enum_map::EnumMap;
911
use fs2::FileExt;
12+
use log::trace;
1013
use spacetimedb_commitlog as commitlog;
1114
use spacetimedb_commitlog::repo::OnNewSegmentFn;
1215
use spacetimedb_data_structures::map::IntSet;
1316
use spacetimedb_datastore::db_metrics::DB_METRICS;
14-
use spacetimedb_datastore::error::{DatastoreError, TableError};
17+
use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError};
1518
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
1619
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
1720
use spacetimedb_datastore::locking_tx_datastore::state_view::{
1821
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView,
1922
};
2023
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
21-
use spacetimedb_datastore::system_tables::{system_tables, StModuleRow};
24+
use spacetimedb_datastore::system_tables::ST_VIEW_ID;
25+
use spacetimedb_datastore::system_tables::{system_tables, StModuleRow, StViewRow};
2226
use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID};
2327
use spacetimedb_datastore::traits::{
2428
InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
@@ -32,16 +36,18 @@ use spacetimedb_datastore::{
3236
traits::TxData,
3337
};
3438
use spacetimedb_durability as durability;
39+
use spacetimedb_lib::bsatn::ToBsatn;
3540
use spacetimedb_lib::db::auth::StAccess;
3641
use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, RawSql};
42+
use spacetimedb_lib::de::DeserializeSeed as _;
3743
use spacetimedb_lib::st_var::StVarValue;
38-
use spacetimedb_lib::ConnectionId;
3944
use spacetimedb_lib::Identity;
45+
use spacetimedb_lib::{bsatn, ConnectionId};
4046
use spacetimedb_paths::server::{CommitLogDir, ReplicaDir, SnapshotsPath};
4147
use spacetimedb_primitives::*;
4248
use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type;
4349
use spacetimedb_sats::memory_usage::MemoryUsage;
44-
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue};
50+
use spacetimedb_sats::{product, AlgebraicType, AlgebraicValue, ProductType, ProductValue, Typespace};
4551
use spacetimedb_schema::def::{ModuleDef, TableDef, ViewDef};
4652
use spacetimedb_schema::schema::{
4753
ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema,
@@ -1403,6 +1409,20 @@ impl RelationalDB {
14031409
Ok(rows_deleted)
14041410
}
14051411

1412+
/// Clear all rows from all view tables without dropping them.
1413+
pub fn clear_all_views(&self, tx: &mut MutTx) -> Result<(), DBError> {
1414+
for table_id in tx
1415+
.iter(ST_VIEW_ID)?
1416+
.map(StViewRow::try_from)
1417+
.collect::<Result<Vec<_>, _>>()?
1418+
.into_iter()
1419+
.filter_map(|row| row.table_id)
1420+
{
1421+
tx.clear_table(table_id)?;
1422+
}
1423+
Ok(())
1424+
}
1425+
14061426
pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result<SequenceId, DBError> {
14071427
Ok(self.inner.create_sequence_mut_tx(tx, sequence_schema)?)
14081428
}
@@ -1498,8 +1518,106 @@ impl RelationalDB {
14981518
.into()
14991519
})
15001520
}
1501-
}
15021521

1522+
/// Materialize View backing table.
1523+
///
1524+
/// # Process
1525+
/// 1. Serializes view arguments into `ST_VIEW_ARG_ID`
1526+
/// 2. Deletes stale rows matching the view arguments
1527+
/// 3. Deserializes the new view execution results
1528+
/// 4. Inserts fresh rows with the corresponding arg_id
1529+
///
1530+
/// # Arguments
1531+
/// * `tx` - Mutable transaction context
1532+
/// * `view` - Name of the view to update
1533+
/// * `args` - Arguments passed to the view call
1534+
/// * `return_type` - Expected return type of the view
1535+
/// * `bytes` - Serialized (bsatn encoded) return value from view execution
1536+
/// * `typespace` - Type information for deserialization
1537+
/// * `caller_identity` - Identity of the caller (for non-anonymous views)
1538+
#[allow(clippy::too_many_arguments)]
1539+
pub fn materialize_view(
1540+
&self,
1541+
tx: &mut MutTxId,
1542+
view: &str,
1543+
args: ArgsTuple,
1544+
return_type: AlgebraicType,
1545+
bytes: Bytes,
1546+
typespace: &Typespace,
1547+
caller_identity: Identity,
1548+
) -> Result<(), DBError> {
1549+
// Fetch view metadata
1550+
let st_view_row = tx.lookup_st_view_by_name(view)?;
1551+
let table_id = st_view_row.table_id.expect("View table must exist for materialization");
1552+
let is_anonymous = st_view_row.is_anonymous;
1553+
1554+
let arg_id = tx.get_or_insert_st_view_arg(args.get_bsatn())?;
1555+
1556+
// Build the filter key for identifying rows to update
1557+
let input_args = product![
1558+
if is_anonymous {
1559+
AlgebraicValue::OptionNone()
1560+
} else {
1561+
AlgebraicValue::OptionSome(caller_identity.into())
1562+
},
1563+
AlgebraicValue::U64(arg_id)
1564+
];
1565+
1566+
// Remove stale View entries
1567+
let rows_to_delete: Vec<_> = self
1568+
.iter_by_col_eq_mut(tx, table_id, [0, 1], &input_args.clone().into())?
1569+
.map(|res| res.pointer())
1570+
.collect();
1571+
1572+
let deleted_count = self.delete(tx, table_id, rows_to_delete);
1573+
trace!("Deleted {deleted_count} stale rows from view table {table_id} for arg_id {arg_id}");
1574+
1575+
// Deserialize the return value
1576+
let seed = spacetimedb_sats::WithTypespace::new(typespace, &return_type);
1577+
let return_val = seed
1578+
.deserialize(bsatn::Deserializer::new(&mut &bytes[..]))
1579+
.map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string())))?;
1580+
1581+
// Extract products from return value (must be array or option)
1582+
let products: Vec<ProductValue> = if return_type.is_array() {
1583+
let arr = return_val
1584+
.into_array()
1585+
.expect("return_type.is_array() ensures this is an array");
1586+
1587+
arr.into_iter().map(|v| v.into_product().unwrap()).collect()
1588+
} else if return_type.is_option() {
1589+
let opt = return_val
1590+
.into_option()
1591+
.expect("return_type.is_option() ensures this is an option");
1592+
opt.into_iter().map(|v| v.into_product().unwrap()).collect()
1593+
} else {
1594+
return Err(DatastoreError::from(ViewError::InvalidReturnType(return_type)).into());
1595+
};
1596+
1597+
// Insert fresh results into the view table
1598+
let mut elements: Vec<AlgebraicValue> =
1599+
Vec::with_capacity(input_args.elements.len() + products.first().map_or(0, |p| p.elements.len()));
1600+
for product in products {
1601+
elements.clear();
1602+
// Build complete row by prepending filter key to product data
1603+
let mut elements = Vec::with_capacity(input_args.elements.len() + product.elements.len());
1604+
elements.extend_from_slice(&input_args.elements);
1605+
elements.extend_from_slice(&product.elements);
1606+
1607+
let row = ProductValue {
1608+
elements: elements.into_boxed_slice(),
1609+
};
1610+
1611+
let row_bytes = row
1612+
.to_bsatn_vec()
1613+
.map_err(|_| DatastoreError::from(ViewError::SerializeRow))?;
1614+
1615+
self.insert(tx, table_id, &row_bytes)?;
1616+
}
1617+
1618+
Ok(())
1619+
}
1620+
}
15031621
#[allow(unused)]
15041622
#[derive(Clone)]
15051623
struct LockFile {

0 commit comments

Comments
 (0)